diff --git a/README.md b/README.md index 751ba368..fd61a32f 100644 --- a/README.md +++ b/README.md @@ -88,8 +88,8 @@ Castore is opiniated. It comes with a collection of best practices and documente - [Event-driven architecture](#--event-driven-architecture) - [Message queues](#--messagequeue) - [Message queue adapters](#--messagequeueadapter) - - [Message buses](#--message-bus) - - [Message bus adapters](#--message-bus) + - [Message buses](#--messagebus) + - [Message bus adapters](#--messagebusadapter) - [Snapshotting](#--snapshotting) - [Read Models](#--read-models) - [📖 Resources](#-resources) @@ -904,7 +904,7 @@ import type { SQSMessageQueueMessageBody, } from '@castore/sqs-message-queue-adapter'; -const appMessagesHandler = async ({ Records }: SQSMessageQueueMessage) => { +const appMessagesWorker = async ({ Records }: SQSMessageQueueMessage) => { Records.forEach(({ body }) => { // 👇 Correctly typed! const recordBody: SQSMessageQueueMessageBody = diff --git a/packages/event-bridge-message-bus-adapter/README.md b/packages/event-bridge-message-bus-adapter/README.md index 41645ea4..1ac7b895 100644 --- a/packages/event-bridge-message-bus-adapter/README.md +++ b/packages/event-bridge-message-bus-adapter/README.md @@ -24,4 +24,86 @@ yarn add @castore/core @aws-sdk/client-eventbridge ## 👩‍💻 Usage -_coming soon_ +```ts +import { EventBridgeClient } from '@aws-sdk/client-eventbridge'; + +import { EventBridgeMessageBusAdapter } from '@castore/event-bridge-message-bus-adapter'; + +const eventBridgeClient = new EventBridgeClient({}); + +const messageBusAdapter = new EventBridgeMessageBusAdapter({ + eventBusName: 'my-table-name', + eventBridgeClient, +}); + +// 👇 Alternatively, provide a getter +const messageBusAdapter = new EventBridgeMessageBusAdapter({ + eventBusName: () => process.env.MY_EVENT_BUS_NAME, + eventBridgeClient, +}); + +const appMessageBus = new NotificationMessageBus({ + ... + messageBusAdapter +}) +``` + +This will directly plug your MessageBus to EventBridge 🙌 + +## 🤔 How it works + +When publishing a message, its `eventStoreId` is used as the message `source` and its event `type` is used as `detail-type`. The whole message is passed to the `detail` property. + +```json +// 👇 Entry example +{ + "source": "USERS", // <= eventStoreId + "detail-type": "USER_CREATED", // <= event type + "detail": { + "eventSourceId": "USERS", + "event": { + "aggregateId": "123", + "version": 1, + "type": "USER_CREATED", + "timestamp": ... + ... + }, + "aggregate": ... // <= for state-carrying message buses + }, + ... // <= Other technical EventBridge properties +} +``` + +On the listeners side, you can use the `EventBridgeMessageBusMessage` TS type to type your argument: + +```ts +import type { EventBridgeMessageBusMessage } from '@castore/event-bridge-message-bus-adapter'; + +const listener = async ( + message: EventBridgeMessageBusMessage, +) => { + // 🙌 Correctly typed! + const { eventStoreId, event } = message.detail; +}; +``` + +You can provide event store ids and event types if you listener only listens to specific event types: + +```ts +import type { EventBridgeMessageBusMessage } from '@castore/event-bridge-message-bus-adapter'; + +const listener = async ( + message: EventBridgeMessageBusMessage< + typeof appMessageBus, + 'USERS', // <= Only listen to the 'USER' event store events (optional) + 'USER_CREATED' // <= Only listen to 'USER_CREATED' events (optional) + >, +) => { + // 🙌 Correctly typed! + const { eventStoreId, event } = message.detail; +}; +``` + +## 🔑 IAM + +The `publishMessage` method requires the `events:PutEvents` IAM permission on the provided event bus. diff --git a/packages/event-bridge-message-bus-adapter/package.json b/packages/event-bridge-message-bus-adapter/package.json index d0b6a1ed..cc08fc69 100644 --- a/packages/event-bridge-message-bus-adapter/package.json +++ b/packages/event-bridge-message-bus-adapter/package.json @@ -47,12 +47,14 @@ "@castore/core": "workspace:", "@types/node": "^17.0.29", "@zerollup/ts-transform-paths": "^1.7.18", + "aws-sdk-client-mock": "^2.1.0", "babel-plugin-module-resolver": "^4.1.0", "concurrently": "^7.1.0", "dependency-cruiser": "^11.7.0", "eslint": "^8.14.0", "prettier": "^2.6.2", "ts-node": "^10.7.0", + "ts-toolbelt": "^9.6.0", "ttypescript": "^1.5.13", "typescript": "^4.6.3", "vitest": "^0.26.2" diff --git a/packages/event-bridge-message-bus-adapter/src/eventBridge.unit.test.ts b/packages/event-bridge-message-bus-adapter/src/eventBridge.unit.test.ts new file mode 100644 index 00000000..033bb5ac --- /dev/null +++ b/packages/event-bridge-message-bus-adapter/src/eventBridge.unit.test.ts @@ -0,0 +1,87 @@ +import { + EventBridgeClient, + PutEventsCommand, +} from '@aws-sdk/client-eventbridge'; +import { mockClient } from 'aws-sdk-client-mock'; +import type { A } from 'ts-toolbelt'; + +import type { Message } from '@castore/core'; + +import { EventBridgeMessageBusAdapter } from './eventBridge'; + +const eventBridgeClientMock = mockClient(EventBridgeClient); + +describe('EventBridgeMessageBusAdapter', () => { + const eventBusNameMock = 'my-event-bus'; + + const eventStoreIdMock = 'my-event-store'; + + const eventMock = { + aggregateId: 'my-aggregate-id', + version: 1, + type: 'my-event-type', + timestamp: new Date().toISOString(), + }; + + const messageMock = { + eventStoreId: eventStoreIdMock, + event: eventMock, + }; + + beforeEach(() => { + eventBridgeClientMock.reset(); + eventBridgeClientMock.on(PutEventsCommand).resolves({}); + }); + + it('send a PutEventsCommand to event bridge client', async () => { + const adapter = new EventBridgeMessageBusAdapter({ + eventBusName: eventBusNameMock, + eventBridgeClient: eventBridgeClientMock as unknown as EventBridgeClient, + }); + + const assertMessage: A.Equals< + Parameters, + [Message] + > = 1; + assertMessage; + + await adapter.publishMessage(messageMock); + + // regularly check if vitest matchers are available (toHaveReceivedCommandWith) + // https://github.com/m-radzikowski/aws-sdk-client-mock/issues/139 + expect(eventBridgeClientMock.calls()).toHaveLength(1); + expect(eventBridgeClientMock.call(0).args[0].input).toMatchObject({ + Entries: [ + { + EventBusName: eventBusNameMock, + Source: eventStoreIdMock, + DetailType: eventMock.type, + Detail: JSON.stringify(messageMock), + }, + ], + }); + }); + + it('works with event bus name getters', async () => { + const adapter = new EventBridgeMessageBusAdapter({ + eventBusName: () => eventBusNameMock, + eventBridgeClient: eventBridgeClientMock as unknown as EventBridgeClient, + }); + + await adapter.publishMessage(messageMock); + + // regularly check if vitest matchers are available (toHaveReceivedCommandWith) + // https://github.com/m-radzikowski/aws-sdk-client-mock/issues/139 + expect(eventBridgeClientMock.calls()).toHaveLength(1); + expect(eventBridgeClientMock.call(0).args[0].input).toMatchObject({ + Entries: [ + { + EventBusName: eventBusNameMock, + Source: eventStoreIdMock, + DetailType: eventMock.type, + Detail: JSON.stringify(messageMock), + }, + ], + }); + }); +}); diff --git a/packages/sqs-message-queue-adapter/README.md b/packages/sqs-message-queue-adapter/README.md index a7d03313..bb57aa5a 100644 --- a/packages/sqs-message-queue-adapter/README.md +++ b/packages/sqs-message-queue-adapter/README.md @@ -24,4 +24,71 @@ yarn add @castore/core @aws-sdk/client-sqs ## 👩‍💻 Usage -_coming soon_ +```ts +import { SQSClient } from '@aws-sdk/client-sqs'; + +import { SQSMessageQueueAdapter } from '@castore/sqs-message-queue-adapter'; + +const sqsClient = new SQSClient({}); + +const messageQueueAdapter = new SQSMessageQueueAdapter({ + queueUrl: 'https://sqs.us-east-1.amazonaws.com/111122223333/my-super-queue', + sqsClient, +}); + +// 👇 Alternatively, provide a getter +const messageQueueAdapter = new SQSMessageQueueAdapter({ + queueUrl: () => process.env.MY_SQS_QUEUE_URL, + sqsClient, +}); + +const appMessageQueue = new NotificationMessageQueue({ + ... + messageQueueAdapter +}) +``` + +This will directly plug your MessageQueue to SQS 🙌 + +## 🤔 How it works + +When publishing a message, it is JSON stringified and passed as the record body. + +```json +// 👇 Record example +{ + "body": "{ + \"eventSourceId\": \"USERS\", + \"event\": { + \"aggregateId\": \"123\", + \"version\": 1, + \"type\": \"USER_CREATED\", + \"timestamp\": ... + ... + }, + \"aggregate\": ... // <= for state-carrying message queues + }", + ... // <= Other technical SQS properties +} +``` + +On the worker side, you can use the `SQSMessageQueueMessage` and `SQSMessageQueueMessageBody` TS types to type your argument: + +```ts +import type { + SQSMessageQueueMessage, + SQSMessageQueueMessageBody, +} from '@castore/sqs-message-queue-adapter'; + +const appMessagesWorker = async ({ Records }: SQSMessageQueueMessage) => { + Records.forEach(({ body }) => { + // 👇 Correctly typed! + const recordBody: SQSMessageQueueMessageBody = + JSON.parse(body); + }); +}; +``` + +## 🔑 IAM + +The `publishMessage` method requires the `sqs:SendMessage` IAM permission on the provided SQS queue. diff --git a/packages/sqs-message-queue-adapter/package.json b/packages/sqs-message-queue-adapter/package.json index d9350a08..5d336b32 100644 --- a/packages/sqs-message-queue-adapter/package.json +++ b/packages/sqs-message-queue-adapter/package.json @@ -47,12 +47,14 @@ "@castore/core": "workspace:", "@types/node": "^17.0.29", "@zerollup/ts-transform-paths": "^1.7.18", + "aws-sdk-client-mock": "^2.1.0", "babel-plugin-module-resolver": "^4.1.0", "concurrently": "^7.1.0", "dependency-cruiser": "^11.7.0", "eslint": "^8.14.0", "prettier": "^2.6.2", "ts-node": "^10.7.0", + "ts-toolbelt": "^9.6.0", "ttypescript": "^1.5.13", "typescript": "^4.6.3", "vitest": "^0.26.2" diff --git a/packages/sqs-message-queue-adapter/src/message.ts b/packages/sqs-message-queue-adapter/src/message.ts index 72cd359f..ad77155f 100644 --- a/packages/sqs-message-queue-adapter/src/message.ts +++ b/packages/sqs-message-queue-adapter/src/message.ts @@ -1,14 +1,10 @@ import type { SQSEvent } from 'aws-lambda'; import type { - EventStoreEventsDetails, - EventStoreAggregate, NotificationMessageQueue, StateCarryingMessageQueue, - NotificationMessage, - StateCarryingMessage, - MessageQueueSourceEventStoreIds, - MessageQueueSourceEventStoreIdTypes, + EventStoreNotificationMessage, + EventStoreStateCarryingMessage, MessageQueueSourceEventStores, } from '@castore/core'; @@ -22,52 +18,12 @@ type Prettify> = T extends infer U export type SQSMessageQueueMessageBody< M extends NotificationMessageQueue | StateCarryingMessageQueue, - S extends MessageQueueSourceEventStoreIds = MessageQueueSourceEventStoreIds, - T extends MessageQueueSourceEventStoreIdTypes< - M, - S - > = MessageQueueSourceEventStoreIdTypes, > = Prettify< - S extends infer I - ? I extends string - ? M extends NotificationMessageQueue - ? NotificationMessage< - I, - T extends infer U - ? U extends MessageQueueSourceEventStoreIdTypes - ? Extract< - EventStoreEventsDetails< - Extract< - MessageQueueSourceEventStores, - { eventStoreId: S } - > - >, - { type: U } - > - : never - : never - > - : M extends StateCarryingMessageQueue - ? StateCarryingMessage< - I, - T extends infer U - ? U extends MessageQueueSourceEventStoreIdTypes - ? Extract< - EventStoreEventsDetails< - Extract< - MessageQueueSourceEventStores, - { eventStoreId: S } - > - >, - { type: U } - > - : never - : never, - EventStoreAggregate< - Extract, { eventStoreId: S }> - > - > - : never - : never + M extends NotificationMessageQueue + ? EventStoreNotificationMessage> + : M extends StateCarryingMessageQueue + ? EventStoreStateCarryingMessage< + EventStoreNotificationMessage> + > : never >; diff --git a/packages/sqs-message-queue-adapter/src/sqs.unit.test.ts b/packages/sqs-message-queue-adapter/src/sqs.unit.test.ts new file mode 100644 index 00000000..035ff984 --- /dev/null +++ b/packages/sqs-message-queue-adapter/src/sqs.unit.test.ts @@ -0,0 +1,72 @@ +import { SQSClient, SendMessageCommand } from '@aws-sdk/client-sqs'; +import { mockClient } from 'aws-sdk-client-mock'; +import type { A } from 'ts-toolbelt'; + +import type { Message } from '@castore/core'; + +import { SQSMessageQueueAdapter } from './sqs'; + +const sqsClientMock = mockClient(SQSClient); + +describe('SQSMessageQueueAdapter', () => { + const queueUrlMock = 'queue.sqs'; + + const eventStoreIdMock = 'my-event-store'; + + const eventMock = { + aggregateId: 'my-aggregate-id', + version: 1, + type: 'my-event-type', + timestamp: new Date().toISOString(), + }; + + const messageMock = { + eventStoreId: eventStoreIdMock, + event: eventMock, + }; + + beforeEach(() => { + sqsClientMock.reset(); + sqsClientMock.on(SendMessageCommand).resolves({}); + }); + + it('send a SendMessageCommand to sqs client', async () => { + const adapter = new SQSMessageQueueAdapter({ + queueUrl: queueUrlMock, + sqsClient: sqsClientMock as unknown as SQSClient, + }); + + const assertMessage: A.Equals< + Parameters, + [Message] + > = 1; + assertMessage; + + await adapter.publishMessage(messageMock); + + // regularly check if vitest matchers are available (toHaveReceivedCommandWith) + // https://github.com/m-radzikowski/aws-sdk-client-mock/issues/139 + expect(sqsClientMock.calls()).toHaveLength(1); + expect(sqsClientMock.call(0).args[0].input).toMatchObject({ + QueueUrl: queueUrlMock, + MessageBody: JSON.stringify(messageMock), + }); + }); + + it('works with queue url getters', async () => { + const adapter = new SQSMessageQueueAdapter({ + queueUrl: () => queueUrlMock, + sqsClient: sqsClientMock as unknown as SQSClient, + }); + + await adapter.publishMessage(messageMock); + + // regularly check if vitest matchers are available (toHaveReceivedCommandWith) + // https://github.com/m-radzikowski/aws-sdk-client-mock/issues/139 + expect(sqsClientMock.calls()).toHaveLength(1); + expect(sqsClientMock.call(0).args[0].input).toMatchObject({ + QueueUrl: queueUrlMock, + MessageBody: JSON.stringify(messageMock), + }); + }); +}); diff --git a/yarn.lock b/yarn.lock index 97752c2c..d4001ac5 100644 --- a/yarn.lock +++ b/yarn.lock @@ -4131,12 +4131,14 @@ __metadata: "@types/aws-lambda": ^8.10.111 "@types/node": ^17.0.29 "@zerollup/ts-transform-paths": ^1.7.18 + aws-sdk-client-mock: ^2.1.0 babel-plugin-module-resolver: ^4.1.0 concurrently: ^7.1.0 dependency-cruiser: ^11.7.0 eslint: ^8.14.0 prettier: ^2.6.2 ts-node: ^10.7.0 + ts-toolbelt: ^9.6.0 ttypescript: ^1.5.13 typescript: ^4.6.3 vitest: ^0.26.2 @@ -4378,12 +4380,14 @@ __metadata: "@types/aws-lambda": ^8.10.111 "@types/node": ^17.0.29 "@zerollup/ts-transform-paths": ^1.7.18 + aws-sdk-client-mock: ^2.1.0 babel-plugin-module-resolver: ^4.1.0 concurrently: ^7.1.0 dependency-cruiser: ^11.7.0 eslint: ^8.14.0 prettier: ^2.6.2 ts-node: ^10.7.0 + ts-toolbelt: ^9.6.0 ttypescript: ^1.5.13 typescript: ^4.6.3 vitest: ^0.26.2 @@ -7570,6 +7574,17 @@ __metadata: languageName: node linkType: hard +"aws-sdk-client-mock@npm:^2.1.0": + version: 2.1.0 + resolution: "aws-sdk-client-mock@npm:2.1.0" + dependencies: + "@types/sinon": ^10.0.10 + sinon: ^14.0.2 + tslib: ^2.1.0 + checksum: c6179c5528709ff1ad7e7308d03c74850fb5b973ca3d63836ca9973ca70de0c3dd7846fcbd3fd02bf4dabe6cae1095683b38c4202878458c8369e7e837962cd2 + languageName: node + linkType: hard + "aws-sdk@npm:^2.1124.0": version: 2.1129.0 resolution: "aws-sdk@npm:2.1129.0"