Skip to content

Commit

Permalink
Merge pull request #58 from castore-dev/document-new-messaging-packages
Browse files Browse the repository at this point in the history
fix: Document new messaging packages
  • Loading branch information
ThomasAribart authored Mar 10, 2023
2 parents 02d58af + 85f3896 commit 13386c6
Show file tree
Hide file tree
Showing 9 changed files with 340 additions and 57 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ Castore is opiniated. It comes with a collection of best practices and documente
- [Event-driven architecture](#--event-driven-architecture)
- [Message queues](#--messagequeue)
- [Message queue adapters](#--messagequeueadapter)
- [Message buses](#--message-bus)
- [Message bus adapters](#--message-bus)
- [Message buses](#--messagebus)
- [Message bus adapters](#--messagebusadapter)
- [Snapshotting](#--snapshotting)
- [Read Models](#--read-models)
- [📖 Resources](#-resources)
Expand Down Expand Up @@ -904,7 +904,7 @@ import type {
SQSMessageQueueMessageBody,
} from '@castore/sqs-message-queue-adapter';

const appMessagesHandler = async ({ Records }: SQSMessageQueueMessage) => {
const appMessagesWorker = async ({ Records }: SQSMessageQueueMessage) => {
Records.forEach(({ body }) => {
// 👇 Correctly typed!
const recordBody: SQSMessageQueueMessageBody<typeof appMessageQueue> =
Expand Down
84 changes: 83 additions & 1 deletion packages/event-bridge-message-bus-adapter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,86 @@ yarn add @castore/core @aws-sdk/client-eventbridge

## 👩‍💻 Usage

_coming soon_
```ts
import { EventBridgeClient } from '@aws-sdk/client-eventbridge';

import { EventBridgeMessageBusAdapter } from '@castore/event-bridge-message-bus-adapter';

const eventBridgeClient = new EventBridgeClient({});

const messageBusAdapter = new EventBridgeMessageBusAdapter({
eventBusName: 'my-table-name',
eventBridgeClient,
});

// 👇 Alternatively, provide a getter
const messageBusAdapter = new EventBridgeMessageBusAdapter({
eventBusName: () => process.env.MY_EVENT_BUS_NAME,
eventBridgeClient,
});

const appMessageBus = new NotificationMessageBus({
...
messageBusAdapter
})
```

This will directly plug your MessageBus to EventBridge 🙌

## 🤔 How it works

When publishing a message, its `eventStoreId` is used as the message `source` and its event `type` is used as `detail-type`. The whole message is passed to the `detail` property.

```json
// 👇 Entry example
{
"source": "USERS", // <= eventStoreId
"detail-type": "USER_CREATED", // <= event type
"detail": {
"eventSourceId": "USERS",
"event": {
"aggregateId": "123",
"version": 1,
"type": "USER_CREATED",
"timestamp": ...
...
},
"aggregate": ... // <= for state-carrying message buses
},
... // <= Other technical EventBridge properties
}
```

On the listeners side, you can use the `EventBridgeMessageBusMessage` TS type to type your argument:

```ts
import type { EventBridgeMessageBusMessage } from '@castore/event-bridge-message-bus-adapter';

const listener = async (
message: EventBridgeMessageBusMessage<typeof appMessageBus>,
) => {
// 🙌 Correctly typed!
const { eventStoreId, event } = message.detail;
};
```

You can provide event store ids and event types if you listener only listens to specific event types:

```ts
import type { EventBridgeMessageBusMessage } from '@castore/event-bridge-message-bus-adapter';

const listener = async (
message: EventBridgeMessageBusMessage<
typeof appMessageBus,
'USERS', // <= Only listen to the 'USER' event store events (optional)
'USER_CREATED' // <= Only listen to 'USER_CREATED' events (optional)
>,
) => {
// 🙌 Correctly typed!
const { eventStoreId, event } = message.detail;
};
```

## 🔑 IAM

The `publishMessage` method requires the `events:PutEvents` IAM permission on the provided event bus.
2 changes: 2 additions & 0 deletions packages/event-bridge-message-bus-adapter/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,14 @@
"@castore/core": "workspace:",
"@types/node": "^17.0.29",
"@zerollup/ts-transform-paths": "^1.7.18",
"aws-sdk-client-mock": "^2.1.0",
"babel-plugin-module-resolver": "^4.1.0",
"concurrently": "^7.1.0",
"dependency-cruiser": "^11.7.0",
"eslint": "^8.14.0",
"prettier": "^2.6.2",
"ts-node": "^10.7.0",
"ts-toolbelt": "^9.6.0",
"ttypescript": "^1.5.13",
"typescript": "^4.6.3",
"vitest": "^0.26.2"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import {
EventBridgeClient,
PutEventsCommand,
} from '@aws-sdk/client-eventbridge';
import { mockClient } from 'aws-sdk-client-mock';
import type { A } from 'ts-toolbelt';

import type { Message } from '@castore/core';

import { EventBridgeMessageBusAdapter } from './eventBridge';

const eventBridgeClientMock = mockClient(EventBridgeClient);

describe('EventBridgeMessageBusAdapter', () => {
const eventBusNameMock = 'my-event-bus';

const eventStoreIdMock = 'my-event-store';

const eventMock = {
aggregateId: 'my-aggregate-id',
version: 1,
type: 'my-event-type',
timestamp: new Date().toISOString(),
};

const messageMock = {
eventStoreId: eventStoreIdMock,
event: eventMock,
};

beforeEach(() => {
eventBridgeClientMock.reset();
eventBridgeClientMock.on(PutEventsCommand).resolves({});
});

it('send a PutEventsCommand to event bridge client', async () => {
const adapter = new EventBridgeMessageBusAdapter({
eventBusName: eventBusNameMock,
eventBridgeClient: eventBridgeClientMock as unknown as EventBridgeClient,
});

const assertMessage: A.Equals<
Parameters<typeof adapter.publishMessage>,
[Message]
> = 1;
assertMessage;

await adapter.publishMessage(messageMock);

// regularly check if vitest matchers are available (toHaveReceivedCommandWith)
// https://github.com/m-radzikowski/aws-sdk-client-mock/issues/139
expect(eventBridgeClientMock.calls()).toHaveLength(1);
expect(eventBridgeClientMock.call(0).args[0].input).toMatchObject({
Entries: [
{
EventBusName: eventBusNameMock,
Source: eventStoreIdMock,
DetailType: eventMock.type,
Detail: JSON.stringify(messageMock),
},
],
});
});

it('works with event bus name getters', async () => {
const adapter = new EventBridgeMessageBusAdapter({
eventBusName: () => eventBusNameMock,
eventBridgeClient: eventBridgeClientMock as unknown as EventBridgeClient,
});

await adapter.publishMessage(messageMock);

// regularly check if vitest matchers are available (toHaveReceivedCommandWith)
// https://github.com/m-radzikowski/aws-sdk-client-mock/issues/139
expect(eventBridgeClientMock.calls()).toHaveLength(1);
expect(eventBridgeClientMock.call(0).args[0].input).toMatchObject({
Entries: [
{
EventBusName: eventBusNameMock,
Source: eventStoreIdMock,
DetailType: eventMock.type,
Detail: JSON.stringify(messageMock),
},
],
});
});
});
69 changes: 68 additions & 1 deletion packages/sqs-message-queue-adapter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,71 @@ yarn add @castore/core @aws-sdk/client-sqs

## 👩‍💻 Usage

_coming soon_
```ts
import { SQSClient } from '@aws-sdk/client-sqs';

import { SQSMessageQueueAdapter } from '@castore/sqs-message-queue-adapter';

const sqsClient = new SQSClient({});

const messageQueueAdapter = new SQSMessageQueueAdapter({
queueUrl: 'https://sqs.us-east-1.amazonaws.com/111122223333/my-super-queue',
sqsClient,
});

// 👇 Alternatively, provide a getter
const messageQueueAdapter = new SQSMessageQueueAdapter({
queueUrl: () => process.env.MY_SQS_QUEUE_URL,
sqsClient,
});

const appMessageQueue = new NotificationMessageQueue({
...
messageQueueAdapter
})
```

This will directly plug your MessageQueue to SQS 🙌

## 🤔 How it works

When publishing a message, it is JSON stringified and passed as the record body.

```json
// 👇 Record example
{
"body": "{
\"eventSourceId\": \"USERS\",
\"event\": {
\"aggregateId\": \"123\",
\"version\": 1,
\"type\": \"USER_CREATED\",
\"timestamp\": ...
...
},
\"aggregate\": ... // <= for state-carrying message queues
}",
... // <= Other technical SQS properties
}
```

On the worker side, you can use the `SQSMessageQueueMessage` and `SQSMessageQueueMessageBody` TS types to type your argument:

```ts
import type {
SQSMessageQueueMessage,
SQSMessageQueueMessageBody,
} from '@castore/sqs-message-queue-adapter';

const appMessagesWorker = async ({ Records }: SQSMessageQueueMessage) => {
Records.forEach(({ body }) => {
// 👇 Correctly typed!
const recordBody: SQSMessageQueueMessageBody<typeof appMessageQueue> =
JSON.parse(body);
});
};
```

## 🔑 IAM

The `publishMessage` method requires the `sqs:SendMessage` IAM permission on the provided SQS queue.
2 changes: 2 additions & 0 deletions packages/sqs-message-queue-adapter/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,14 @@
"@castore/core": "workspace:",
"@types/node": "^17.0.29",
"@zerollup/ts-transform-paths": "^1.7.18",
"aws-sdk-client-mock": "^2.1.0",
"babel-plugin-module-resolver": "^4.1.0",
"concurrently": "^7.1.0",
"dependency-cruiser": "^11.7.0",
"eslint": "^8.14.0",
"prettier": "^2.6.2",
"ts-node": "^10.7.0",
"ts-toolbelt": "^9.6.0",
"ttypescript": "^1.5.13",
"typescript": "^4.6.3",
"vitest": "^0.26.2"
Expand Down
60 changes: 8 additions & 52 deletions packages/sqs-message-queue-adapter/src/message.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
import type { SQSEvent } from 'aws-lambda';

import type {
EventStoreEventsDetails,
EventStoreAggregate,
NotificationMessageQueue,
StateCarryingMessageQueue,
NotificationMessage,
StateCarryingMessage,
MessageQueueSourceEventStoreIds,
MessageQueueSourceEventStoreIdTypes,
EventStoreNotificationMessage,
EventStoreStateCarryingMessage,
MessageQueueSourceEventStores,
} from '@castore/core';

Expand All @@ -22,52 +18,12 @@ type Prettify<T extends Record<string, unknown>> = T extends infer U

export type SQSMessageQueueMessageBody<
M extends NotificationMessageQueue | StateCarryingMessageQueue,
S extends MessageQueueSourceEventStoreIds<M> = MessageQueueSourceEventStoreIds<M>,
T extends MessageQueueSourceEventStoreIdTypes<
M,
S
> = MessageQueueSourceEventStoreIdTypes<M, S>,
> = Prettify<
S extends infer I
? I extends string
? M extends NotificationMessageQueue
? NotificationMessage<
I,
T extends infer U
? U extends MessageQueueSourceEventStoreIdTypes<M, I>
? Extract<
EventStoreEventsDetails<
Extract<
MessageQueueSourceEventStores<M>,
{ eventStoreId: S }
>
>,
{ type: U }
>
: never
: never
>
: M extends StateCarryingMessageQueue
? StateCarryingMessage<
I,
T extends infer U
? U extends MessageQueueSourceEventStoreIdTypes<M, I>
? Extract<
EventStoreEventsDetails<
Extract<
MessageQueueSourceEventStores<M>,
{ eventStoreId: S }
>
>,
{ type: U }
>
: never
: never,
EventStoreAggregate<
Extract<MessageQueueSourceEventStores<M>, { eventStoreId: S }>
>
>
: never
: never
M extends NotificationMessageQueue
? EventStoreNotificationMessage<MessageQueueSourceEventStores<M>>
: M extends StateCarryingMessageQueue
? EventStoreStateCarryingMessage<
EventStoreNotificationMessage<MessageQueueSourceEventStores<M>>
>
: never
>;
Loading

0 comments on commit 13386c6

Please sign in to comment.