Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP/Suggestion: Add KV queues & ViewStateRepository to example #1

Merged
merged 3 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/api_schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -305,10 +305,10 @@ export const commandMetadataSchema = z.object({
});
// Event metadata
export const eventMetadataSchema = z.object({
tenant: z.string(),
// tenant: z.string(),
eventId: z.string(),
commandId: z.string(),
offset: z.number(),
// offset: z.number(),
});
// Query metadata
export const queryMetadataSchema = z.object({
Expand Down
60 changes: 59 additions & 1 deletion lib/infrastructure.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
* These classes are used in the infrastructure layer of the application.
*/

import type { IEventRepository } from "fmodel";
import type { IEventRepository, IViewStateRepository } from "fmodel";
import type { Command, Event } from "./api.ts";
import { monotonicFactory } from "ulid";
import type { OrderView, RestaurantView } from "./domain.ts";

export const LAST_STREAM_EVENT_KEY_PREFIX = "lastStreamEvent";
export const EVENTS_KEY_PREFIX = "events";
Expand Down Expand Up @@ -40,6 +41,7 @@ export class DenoEventRepository implements
> {
constructor(
private readonly kv: Deno.Kv,
private readonly enqueueEvents: boolean = false,
) {
}

Expand Down Expand Up @@ -115,6 +117,9 @@ export class DenoEventRepository implements

// Add the event key to the list of keys that are stored
keys.push(eventsByStreamIdKey);

// If the enqueueEvents flag is set, enqueue the event
this.enqueueEvents && atomicOperation.enqueue(newEvent);
}
// Commit the transaction
if (!(await atomicOperation.commit()).ok) {
Expand Down Expand Up @@ -142,3 +147,56 @@ export class DenoEventRepository implements
return result;
}
}

export const VIEW_KEY_PREFIX = "view";

export type ViewState = (RestaurantView & OrderView) | null;
export class DenoViewStateRepository implements
IViewStateRepository<
Event,
ViewState,
StreamVersion,
EventMetadata
> {
constructor(private readonly kv: Deno.Kv) {}

// Save the state
// key schema: ["view", "streamId"]
async save(
state: ViewState,
em: EventMetadata,
version: StreamVersion | null,
): Promise<(RestaurantView & OrderView & StreamVersion)> {
if (!state) throw new Error("State is missing");
const streamIdFromEvent = (em as unknown as Event).id;
if (!streamIdFromEvent) {
throw new Error(
"Failed to extract streamId from event metadata, assuming EM is E", // can be assumed from how repo mat view handle is used
);
}
const key = [VIEW_KEY_PREFIX, streamIdFromEvent];
// const key = [VIEW_KEY_PREFIX, state.orderId ?? state.restaurantId]; // better way to do this?
const atomicOperation = this.kv.atomic();
if (version) {
atomicOperation.check({
key: key,
versionstamp: version.versionstamp,
});
}
atomicOperation.set(key, state);
const res = await atomicOperation.commit();
if (!res.ok) {
throw new Error("Failed to save state");
}
return { ...state, versionstamp: res.versionstamp };
}

async fetch(
event: Event & EventMetadata,
): Promise<(ViewState & StreamVersion) | null> {
const { value, versionstamp } = await this.kv.get<
ViewState & StreamVersion
>([VIEW_KEY_PREFIX, event.id]);
return value ? { ...value, versionstamp } : null;
}
}
54 changes: 45 additions & 9 deletions server.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,57 @@
import { blue } from "std/fmt/colors.ts";
import { blue, red } from "std/fmt/colors.ts";
import {
type Order,
orderDecider,
orderView,
type Restaurant,
restaurantDecider,
restaurantView,
} from "./lib/domain.ts";
import { type Decider, EventSourcingAggregate } from "fmodel";
import { type Decider, EventSourcingAggregate, MaterializedView } from "fmodel";
import {
type CommandMetadata,
DenoEventRepository,
DenoViewStateRepository,
type EventMetadata,
} from "./lib/infrastructure.ts";
import type { ApplicationAggregate } from "./lib/application.ts";
import { commandAndMetadataSchema } from "./lib/api_schema.ts";
import type {
ApplicationAggregate,
ApplicationMaterializedView,
} from "./lib/application.ts";
import {
commandAndMetadataSchema,
eventAndMetadataSchema,
} from "./lib/api_schema.ts";
import type { Command, Event } from "./lib/api.ts";

// Open the key-value store
const kv = await Deno.openKv("./db.sqlite3");

// Listen to events from kv queue and apply them to the materialized view
// retry policy can specified on kv.enqueue method (optionally enabled in the DenoEventRepository)
kv.listenQueue(async (raw) => {
try {
// Parse the event and metadata from the raw data / Zod validation/parsing
const event: Event & EventMetadata = eventAndMetadataSchema.parse(raw);
console.log(blue("Handling event: "), event);
// Combine views to create a new view that can handle both restaurant and order events
const view = restaurantView.combine(orderView);
// Create a repository for the view state / a Deno implementation of the IViewStateRepository
const readRepository = new DenoViewStateRepository(kv);
// Create a materialized view to handle the events of all types / MaterializedView is composed of a view and a read repository
const materializedView: ApplicationMaterializedView = new MaterializedView(
view,
readRepository,
);
// Handle the events of all types
const result = await materializedView.handle(event);
console.log(blue("Result of event handling: "), result);
} catch (error) {
// Catch & no throw to prevent queue retries
console.log(red("Error of event handling: "), error);
}
});

// A simple HTTP server that handles commands of all types
Deno.serve(async (request: Request) => {
try {
Expand All @@ -24,13 +62,11 @@ Deno.serve(async (request: Request) => {

console.log(blue("Handling command: "), command);

// Open the key-value store
const kv = await Deno.openKv("./db.sqlite3");
// Combine deciders to create a new decider that can handle both restaurant and order commands
const decider: Decider<Command, (Order & Restaurant) | null, Event> =
restaurantDecider.combine(orderDecider);
// Create a repository for the events / a Deno implementation of the IEventRepository
const eventRepository = new DenoEventRepository(kv);
// Create a repository for the events / a Deno implementation of the IEventRepository and optionally enable event enqueueing
const eventRepository = new DenoEventRepository(kv, true);
// Create an aggregate to handle the commands of all types / Aggregate is composed of a decider and an event repository
const aggregate: ApplicationAggregate = new EventSourcingAggregate(
decider,
Expand All @@ -45,7 +81,7 @@ Deno.serve(async (request: Request) => {
});
} catch (error) {
console.error("Error of command handling: ", error);
return new Response(error?.message ?? error, {
return new Response((error as Error)?.message ?? error, {
headers: { "Content-Type": "application/json" },
status: 500,
});
Expand Down
Loading