diff --git a/lib/api_schema.ts b/lib/api_schema.ts index 2d35ebd..707ce01 100644 --- a/lib/api_schema.ts +++ b/lib/api_schema.ts @@ -305,10 +305,10 @@ export const commandMetadataSchema = z.object({ }); // Event metadata export const eventMetadataSchema = z.object({ - tenant: z.string(), + // tenant: z.string(), eventId: z.string(), commandId: z.string(), - offset: z.number(), + // offset: z.number(), }); // Query metadata export const queryMetadataSchema = z.object({ diff --git a/lib/infrastructure.ts b/lib/infrastructure.ts index 46007d7..6a4ab6a 100644 --- a/lib/infrastructure.ts +++ b/lib/infrastructure.ts @@ -6,9 +6,10 @@ * These classes are used in the infrastructure layer of the application. */ -import type { IEventRepository } from "fmodel"; +import type { IEventRepository, IViewStateRepository } from "fmodel"; import type { Command, Event } from "./api.ts"; import { monotonicFactory } from "ulid"; +import type { OrderView, RestaurantView } from "./domain.ts"; export const LAST_STREAM_EVENT_KEY_PREFIX = "lastStreamEvent"; export const EVENTS_KEY_PREFIX = "events"; @@ -40,6 +41,7 @@ export class DenoEventRepository implements > { constructor( private readonly kv: Deno.Kv, + private readonly enqueueEvents: boolean = false, ) { } @@ -115,6 +117,9 @@ export class DenoEventRepository implements // Add the event key to the list of keys that are stored keys.push(eventsByStreamIdKey); + + // If the enqueueEvents flag is set, enqueue the event + this.enqueueEvents && atomicOperation.enqueue(newEvent); } // Commit the transaction if (!(await atomicOperation.commit()).ok) { @@ -142,3 +147,56 @@ export class DenoEventRepository implements return result; } } + +export const VIEW_KEY_PREFIX = "view"; + +export type ViewState = (RestaurantView & OrderView) | null; +export class DenoViewStateRepository implements + IViewStateRepository< + Event, + ViewState, + StreamVersion, + EventMetadata + > { + constructor(private readonly kv: Deno.Kv) {} + + // Save the state + // key schema: ["view", "streamId"] + async save( + state: ViewState, + em: EventMetadata, + version: StreamVersion | null, + ): Promise<(RestaurantView & OrderView & StreamVersion)> { + if (!state) throw new Error("State is missing"); + const streamIdFromEvent = (em as unknown as Event).id; + if (!streamIdFromEvent) { + throw new Error( + "Failed to extract streamId from event metadata, assuming EM is E", // can be assumed from how repo mat view handle is used + ); + } + const key = [VIEW_KEY_PREFIX, streamIdFromEvent]; + // const key = [VIEW_KEY_PREFIX, state.orderId ?? state.restaurantId]; // better way to do this? + const atomicOperation = this.kv.atomic(); + if (version) { + atomicOperation.check({ + key: key, + versionstamp: version.versionstamp, + }); + } + atomicOperation.set(key, state); + const res = await atomicOperation.commit(); + if (!res.ok) { + throw new Error("Failed to save state"); + } + return { ...state, versionstamp: res.versionstamp }; + } + + async fetch( + event: Event & EventMetadata, + ): Promise<(ViewState & StreamVersion) | null> { + const { value, versionstamp } = await this.kv.get< + ViewState & StreamVersion + >([VIEW_KEY_PREFIX, event.id]); + return value ? { ...value, versionstamp } : null; + } +} diff --git a/server.ts b/server.ts index c549b0e..73816c5 100644 --- a/server.ts +++ b/server.ts @@ -1,19 +1,57 @@ -import { blue } from "std/fmt/colors.ts"; +import { blue, red } from "std/fmt/colors.ts"; import { type Order, orderDecider, + orderView, type Restaurant, restaurantDecider, + restaurantView, } from "./lib/domain.ts"; -import { type Decider, EventSourcingAggregate } from "fmodel"; +import { type Decider, EventSourcingAggregate, MaterializedView } from "fmodel"; import { type CommandMetadata, DenoEventRepository, + DenoViewStateRepository, + type EventMetadata, } from "./lib/infrastructure.ts"; -import type { ApplicationAggregate } from "./lib/application.ts"; -import { commandAndMetadataSchema } from "./lib/api_schema.ts"; +import type { + ApplicationAggregate, + ApplicationMaterializedView, +} from "./lib/application.ts"; +import { + commandAndMetadataSchema, + eventAndMetadataSchema, +} from "./lib/api_schema.ts"; import type { Command, Event } from "./lib/api.ts"; +// Open the key-value store +const kv = await Deno.openKv("./db.sqlite3"); + +// Listen to events from kv queue and apply them to the materialized view +// retry policy can specified on kv.enqueue method (optionally enabled in the DenoEventRepository) +kv.listenQueue(async (raw) => { + try { + // Parse the event and metadata from the raw data / Zod validation/parsing + const event: Event & EventMetadata = eventAndMetadataSchema.parse(raw); + console.log(blue("Handling event: "), event); + // Combine views to create a new view that can handle both restaurant and order events + const view = restaurantView.combine(orderView); + // Create a repository for the view state / a Deno implementation of the IViewStateRepository + const readRepository = new DenoViewStateRepository(kv); + // Create a materialized view to handle the events of all types / MaterializedView is composed of a view and a read repository + const materializedView: ApplicationMaterializedView = new MaterializedView( + view, + readRepository, + ); + // Handle the events of all types + const result = await materializedView.handle(event); + console.log(blue("Result of event handling: "), result); + } catch (error) { + // Catch & no throw to prevent queue retries + console.log(red("Error of event handling: "), error); + } +}); + // A simple HTTP server that handles commands of all types Deno.serve(async (request: Request) => { try { @@ -24,13 +62,11 @@ Deno.serve(async (request: Request) => { console.log(blue("Handling command: "), command); - // Open the key-value store - const kv = await Deno.openKv("./db.sqlite3"); // Combine deciders to create a new decider that can handle both restaurant and order commands const decider: Decider = restaurantDecider.combine(orderDecider); - // Create a repository for the events / a Deno implementation of the IEventRepository - const eventRepository = new DenoEventRepository(kv); + // Create a repository for the events / a Deno implementation of the IEventRepository and optionally enable event enqueueing + const eventRepository = new DenoEventRepository(kv, true); // Create an aggregate to handle the commands of all types / Aggregate is composed of a decider and an event repository const aggregate: ApplicationAggregate = new EventSourcingAggregate( decider, @@ -45,7 +81,7 @@ Deno.serve(async (request: Request) => { }); } catch (error) { console.error("Error of command handling: ", error); - return new Response(error?.message ?? error, { + return new Response((error as Error)?.message ?? error, { headers: { "Content-Type": "application/json" }, status: 500, });