Skip to content

Commit

Permalink
Added message type to allow store not only events but also commands
Browse files Browse the repository at this point in the history
This is the first step to enable message storing and workflows
  • Loading branch information
oskardudycz committed Feb 8, 2025
1 parent 3eda928 commit 5296777
Show file tree
Hide file tree
Showing 15 changed files with 219 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ class MongoDBEventStoreImplementation implements MongoDBEventStore, Closeable {
const eventsToAppend: ReadEvent<EventType, MongoDBReadEventMetadata>[] =
events.map((event) => {
const metadata: MongoDBReadEventMetadata = {
eventId: uuid(),
messageId: uuid(),
streamName,
streamPosition: ++streamOffset,
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,12 @@ export const PostgreSQLProjectionSpec = {
globalPosition: ++globalPosition,
streamPosition: globalPosition,
streamName: `test-${uuid()}`,
eventId: uuid(),
messageId: uuid(),
};

allEvents.push({
...event,
kind: 'Event',
metadata: {
...metadata,
...('metadata' in event ? (event.metadata ?? {}) : {}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,10 @@ export const appendToStream = (

const eventsToAppend: ReadEvent[] = events.map((e, i) => ({
...e,
kind: e.kind ?? 'Event',
metadata: {
streamName,
eventId: uuid(),
messageId: uuid(),
streamPosition: BigInt(i),
...('metadata' in e ? (e.metadata ?? {}) : {}),
},
Expand Down Expand Up @@ -252,7 +253,7 @@ const appendEventsRaw = (
%s::bigint,
%L::text
)`,
events.map((e) => sql('%L', e.metadata.eventId)).join(','),
events.map((e) => sql('%L', e.metadata.messageId)).join(','),
events.map((e) => sql('%L', JSONParser.stringify(e.data))).join(','),
events
.map((e) => sql('%L', JSONParser.stringify(e.metadata ?? {})))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,15 @@ export const readMessagesBatch = async <

const metadata: ReadEventMetadataWithGlobalPosition = {
...('metadata' in rawEvent ? (rawEvent.metadata ?? {}) : {}),
eventId: row.event_id,
messageId: row.event_id,
streamName: row.stream_id,
streamPosition: BigInt(row.stream_position),
globalPosition: BigInt(row.global_position),
};

return {
...rawEvent,
kind: 'Event',
metadata: metadata as CombinedReadEventMetadata<
MessageType,
ReadEventMetadataType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,15 @@ export const readStream = async <EventType extends Event>(

const metadata: ReadEventMetadataWithGlobalPosition = {
...('metadata' in rawEvent ? (rawEvent.metadata ?? {}) : {}),
eventId: row.event_id,
messageId: row.event_id,
streamName: streamId,
streamPosition: BigInt(row.stream_position),
globalPosition: BigInt(row.global_position),
};

return {
...rawEvent,
kind: 'Event',
metadata: metadata as CombinedReadEventMetadata<
EventType,
ReadEventMetadataWithGlobalPosition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,10 @@ export const appendToStream = async (
const eventsToAppend: ReadEvent[] = events.map(
(e: Event, i: number): ReadEvent => ({
...e,
kind: e.kind ?? 'Event',
metadata: {
streamName,
eventId: uuid(),
messageId: uuid(),
streamPosition: BigInt(i + 1),
...('metadata' in e ? (e.metadata ?? {}) : {}),
},
Expand Down Expand Up @@ -268,7 +269,7 @@ const buildEventInsertQuery = (
JSONParser.stringify(event.metadata),
expectedStreamVersion?.toString() ?? 0,
event.type,
event.metadata.eventId,
event.metadata.messageId,
false,
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,24 @@ void describe('InMemoryEventStore onAfterCommit', () => {
let counter = 0;
const events: TestReadEvent[] = [
{
kind: 'Event',
type: 'test',
data: { counter: ++counter },
metadata: {
some: true,
eventId: uuid(),
messageId: uuid(),
globalPosition: 1n,
streamName,
streamPosition: 1n,
},
},
{
kind: 'Event',
type: 'test',
data: { counter: ++counter },
metadata: {
some: false,
eventId: uuid(),
messageId: uuid(),
globalPosition: 1n,
streamName,
streamPosition: 1n,
Expand Down Expand Up @@ -76,22 +78,24 @@ void describe('InMemoryEventStore onAfterCommit', () => {
let counter = 0;
const events: TestReadEvent[] = [
{
kind: 'Event',
type: 'test',
data: { counter: ++counter },
metadata: {
some: true,
eventId: uuid(),
messageId: uuid(),
globalPosition: 1n,
streamName,
streamPosition: 1n,
},
},
{
kind: 'Event',
type: 'test',
data: { counter: ++counter },
metadata: {
some: false,
eventId: uuid(),
messageId: uuid(),
globalPosition: 1n,
streamName,
streamPosition: 1n,
Expand All @@ -100,22 +104,24 @@ void describe('InMemoryEventStore onAfterCommit', () => {
];
const nextEvents: TestReadEvent[] = [
{
kind: 'Event',
type: 'test',
data: { counter: ++counter },
metadata: {
some: true,
eventId: uuid(),
messageId: uuid(),
globalPosition: 1n,
streamName,
streamPosition: 1n,
},
},
{
kind: 'Event',
type: 'test',
data: { counter: ++counter },
metadata: {
some: false,
eventId: uuid(),
messageId: uuid(),
globalPosition: 1n,
streamName,
streamPosition: 1n,
Expand Down Expand Up @@ -152,22 +158,24 @@ void describe('InMemoryEventStore onAfterCommit', () => {
let counter = 0;
const events: TestReadEvent[] = [
{
kind: 'Event',
type: 'test',
data: { counter: ++counter },
metadata: {
some: true,
eventId: uuid(),
messageId: uuid(),
globalPosition: 1n,
streamName,
streamPosition: 1n,
},
},
{
kind: 'Event',
type: 'test',
data: { counter: ++counter },
metadata: {
some: false,
eventId: uuid(),
messageId: uuid(),
globalPosition: 1n,
streamName,
streamPosition: 1n,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,24 @@ void describe('InMemoryEventStore onAfterCommit', () => {
let counter = 0;
const events: TestReadEvent[] = [
{
kind: 'Event',
type: 'test',
data: { counter: ++counter },
metadata: {
some: true,
eventId: uuid(),
messageId: uuid(),
globalPosition: 1n,
streamName,
streamPosition: 1n,
},
},
{
kind: 'Event',
type: 'test',
data: { counter: ++counter },
metadata: {
some: false,
eventId: uuid(),
messageId: uuid(),
globalPosition: 1n,
streamName,
streamPosition: 1n,
Expand Down Expand Up @@ -79,22 +81,24 @@ void describe('InMemoryEventStore onAfterCommit', () => {
let counter = 0;
const events: TestReadEvent[] = [
{
kind: 'Event',
type: 'test',
data: { counter: ++counter },
metadata: {
some: true,
eventId: uuid(),
messageId: uuid(),
globalPosition: 1n,
streamName,
streamPosition: 1n,
},
},
{
kind: 'Event',
type: 'test',
data: { counter: ++counter },
metadata: {
some: false,
eventId: uuid(),
messageId: uuid(),
globalPosition: 1n,
streamName,
streamPosition: 1n,
Expand All @@ -103,22 +107,24 @@ void describe('InMemoryEventStore onAfterCommit', () => {
];
const nextEvents: TestReadEvent[] = [
{
kind: 'Event',
type: 'test',
data: { counter: ++counter },
metadata: {
some: true,
eventId: uuid(),
messageId: uuid(),
globalPosition: 1n,
streamName,
streamPosition: 1n,
},
},
{
kind: 'Event',
type: 'test',
data: { counter: ++counter },
metadata: {
some: false,
eventId: uuid(),
messageId: uuid(),
globalPosition: 1n,
streamName,
streamPosition: 1n,
Expand Down Expand Up @@ -151,22 +157,24 @@ void describe('InMemoryEventStore onAfterCommit', () => {
let counter = 0;
const events: TestReadEvent[] = [
{
kind: 'Event',
type: 'test',
data: { counter: ++counter },
metadata: {
some: true,
eventId: uuid(),
messageId: uuid(),
globalPosition: 1n,
streamName,
streamPosition: 1n,
},
},
{
kind: 'Event',
type: 'test',
data: { counter: ++counter },
metadata: {
some: false,
eventId: uuid(),
messageId: uuid(),
globalPosition: 1n,
streamName,
streamPosition: 1n,
Expand Down
3 changes: 2 additions & 1 deletion src/packages/emmett/src/eventStore/inMemoryEventStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,13 @@ export const getInMemoryEventStore = (
>[] = events.map((event, index) => {
const metadata: ReadEventMetadataWithGlobalPosition = {
streamName,
eventId: uuid(),
messageId: uuid(),
streamPosition: BigInt(currentEvents.length + index + 1),
globalPosition: BigInt(getAllEventsCount() + index + 1),
};
return {
...event,
kind: event.kind ?? 'Event',
metadata: {
...('metadata' in event ? (event.metadata ?? {}) : {}),
...metadata,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ void describe('CaughtUpTransformStream', () => {
globalPosition: bigint,
): ReadEvent<ShoppingCartOpened, ReadEventMetadataWithGlobalPosition> => ({
type: 'ShoppingCartOpened',
kind: 'Event',
data: { cartId: 'cartId' },
metadata: {
eventId: uuid(),
messageId: uuid(),
globalPosition,
streamPosition: globalPosition,
streamName: 'test',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,11 @@ const createMockEvent = (
position: bigint,
): ReadEvent<MockEvent, ReadEventMetadataWithGlobalPosition> => ({
type: 'Mocked',
kind: 'Event',
data: { mocked: true },
metadata: {
streamName: 'testStream',
eventId: `event-${position}`,
messageId: `message-${position}`,
streamPosition: position,
globalPosition: position,
},
Expand Down
Loading

0 comments on commit 5296777

Please sign in to comment.