From 023b6224ef0f80208d70505390d625c709a38e6b Mon Sep 17 00:00:00 2001 From: J Date: Wed, 30 Oct 2024 13:23:55 +0000 Subject: [PATCH 1/3] Add view state repo --- lib/api_schema.ts | 4 +-- lib/infrastructure.ts | 60 ++++++++++++++++++++++++++++++++++++++++++- server.ts | 37 ++++++++++++++++++++++---- 3 files changed, 93 insertions(+), 8 deletions(-) diff --git a/lib/api_schema.ts b/lib/api_schema.ts index 2d35ebd..707ce01 100644 --- a/lib/api_schema.ts +++ b/lib/api_schema.ts @@ -305,10 +305,10 @@ export const commandMetadataSchema = z.object({ }); // Event metadata export const eventMetadataSchema = z.object({ - tenant: z.string(), + // tenant: z.string(), eventId: z.string(), commandId: z.string(), - offset: z.number(), + // offset: z.number(), }); // Query metadata export const queryMetadataSchema = z.object({ diff --git a/lib/infrastructure.ts b/lib/infrastructure.ts index 46007d7..6a4ab6a 100644 --- a/lib/infrastructure.ts +++ b/lib/infrastructure.ts @@ -6,9 +6,10 @@ * These classes are used in the infrastructure layer of the application. */ -import type { IEventRepository } from "fmodel"; +import type { IEventRepository, IViewStateRepository } from "fmodel"; import type { Command, Event } from "./api.ts"; import { monotonicFactory } from "ulid"; +import type { OrderView, RestaurantView } from "./domain.ts"; export const LAST_STREAM_EVENT_KEY_PREFIX = "lastStreamEvent"; export const EVENTS_KEY_PREFIX = "events"; @@ -40,6 +41,7 @@ export class DenoEventRepository implements > { constructor( private readonly kv: Deno.Kv, + private readonly enqueueEvents: boolean = false, ) { } @@ -115,6 +117,9 @@ export class DenoEventRepository implements // Add the event key to the list of keys that are stored keys.push(eventsByStreamIdKey); + + // If the enqueueEvents flag is set, enqueue the event + this.enqueueEvents && atomicOperation.enqueue(newEvent); } // Commit the transaction if (!(await atomicOperation.commit()).ok) { @@ -142,3 +147,56 @@ export class DenoEventRepository implements return result; } } + +export const VIEW_KEY_PREFIX = "view"; + +export type ViewState = (RestaurantView & OrderView) | null; +export class DenoViewStateRepository implements + IViewStateRepository< + Event, + ViewState, + StreamVersion, + EventMetadata + > { + constructor(private readonly kv: Deno.Kv) {} + + // Save the state + // key schema: ["view", "streamId"] + async save( + state: ViewState, + em: EventMetadata, + version: StreamVersion | null, + ): Promise<(RestaurantView & OrderView & StreamVersion)> { + if (!state) throw new Error("State is missing"); + const streamIdFromEvent = (em as unknown as Event).id; + if (!streamIdFromEvent) { + throw new Error( + "Failed to extract streamId from event metadata, assuming EM is E", // can be assumed from how repo mat view handle is used + ); + } + const key = [VIEW_KEY_PREFIX, streamIdFromEvent]; + // const key = [VIEW_KEY_PREFIX, state.orderId ?? state.restaurantId]; // better way to do this? + const atomicOperation = this.kv.atomic(); + if (version) { + atomicOperation.check({ + key: key, + versionstamp: version.versionstamp, + }); + } + atomicOperation.set(key, state); + const res = await atomicOperation.commit(); + if (!res.ok) { + throw new Error("Failed to save state"); + } + return { ...state, versionstamp: res.versionstamp }; + } + + async fetch( + event: Event & EventMetadata, + ): Promise<(ViewState & StreamVersion) | null> { + const { value, versionstamp } = await this.kv.get< + ViewState & StreamVersion + >([VIEW_KEY_PREFIX, event.id]); + return value ? { ...value, versionstamp } : null; + } +} diff --git a/server.ts b/server.ts index c549b0e..e448770 100644 --- a/server.ts +++ b/server.ts @@ -2,18 +2,46 @@ import { blue } from "std/fmt/colors.ts"; import { type Order, orderDecider, + orderView, type Restaurant, restaurantDecider, + restaurantView, } from "./lib/domain.ts"; -import { type Decider, EventSourcingAggregate } from "fmodel"; +import { type Decider, EventSourcingAggregate, MaterializedView } from "fmodel"; import { type CommandMetadata, DenoEventRepository, + DenoViewStateRepository, + type EventMetadata, } from "./lib/infrastructure.ts"; -import type { ApplicationAggregate } from "./lib/application.ts"; -import { commandAndMetadataSchema } from "./lib/api_schema.ts"; +import type { + ApplicationAggregate, + ApplicationMaterializedView, +} from "./lib/application.ts"; +import { + commandAndMetadataSchema, + eventAndMetadataSchema, +} from "./lib/api_schema.ts"; import type { Command, Event } from "./lib/api.ts"; +const kv = await Deno.openKv(":memory:"); +kv.listenQueue(async (raw) => { + try { + const event: Event & EventMetadata = eventAndMetadataSchema.parse(raw); + console.log(blue("Handling event: "), event); + const readRepository = new DenoViewStateRepository(kv); + const view = restaurantView.combine(orderView); + const materializedView: ApplicationMaterializedView = new MaterializedView( + view, + readRepository, + ); + const result = await materializedView.handle(event); + console.log(blue("Result of event handling: "), result); + } catch (error) { + console.error("Error of event handling: ", error); + } +}); + // A simple HTTP server that handles commands of all types Deno.serve(async (request: Request) => { try { @@ -25,12 +53,11 @@ Deno.serve(async (request: Request) => { console.log(blue("Handling command: "), command); // Open the key-value store - const kv = await Deno.openKv("./db.sqlite3"); // Combine deciders to create a new decider that can handle both restaurant and order commands const decider: Decider = restaurantDecider.combine(orderDecider); // Create a repository for the events / a Deno implementation of the IEventRepository - const eventRepository = new DenoEventRepository(kv); + const eventRepository = new DenoEventRepository(kv, true); // Create an aggregate to handle the commands of all types / Aggregate is composed of a decider and an event repository const aggregate: ApplicationAggregate = new EventSourcingAggregate( decider, From 7ea904f9f2a8fefd51add187d2572a0cf401c1ef Mon Sep 17 00:00:00 2001 From: J Date: Wed, 30 Oct 2024 13:31:40 +0000 Subject: [PATCH 2/3] add comments in the server --- server.ts | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/server.ts b/server.ts index e448770..82748f6 100644 --- a/server.ts +++ b/server.ts @@ -1,4 +1,4 @@ -import { blue } from "std/fmt/colors.ts"; +import { blue, red } from "std/fmt/colors.ts"; import { type Order, orderDecider, @@ -24,21 +24,31 @@ import { } from "./lib/api_schema.ts"; import type { Command, Event } from "./lib/api.ts"; +// Open the key-value store: running in-memory const kv = await Deno.openKv(":memory:"); + +// Listen to events from kv queue and apply them to the materialized view +// retry policy can specified on kv.enqueue method (optionally enabled in the DenoEventRepository) kv.listenQueue(async (raw) => { try { + // Parse the event and metadata from the raw data / Zod validation/parsing const event: Event & EventMetadata = eventAndMetadataSchema.parse(raw); console.log(blue("Handling event: "), event); - const readRepository = new DenoViewStateRepository(kv); + // Combine views to create a new view that can handle both restaurant and order events const view = restaurantView.combine(orderView); + // Create a repository for the view state / a Deno implementation of the IViewStateRepository + const readRepository = new DenoViewStateRepository(kv); + // Create a materialized view to handle the events of all types / MaterializedView is composed of a view and a read repository const materializedView: ApplicationMaterializedView = new MaterializedView( view, readRepository, ); + // Handle the events of all types const result = await materializedView.handle(event); console.log(blue("Result of event handling: "), result); } catch (error) { - console.error("Error of event handling: ", error); + // Catch & no throw to prevent queue retries + console.log(red("Error of event handling: "), error); } }); @@ -52,11 +62,10 @@ Deno.serve(async (request: Request) => { console.log(blue("Handling command: "), command); - // Open the key-value store // Combine deciders to create a new decider that can handle both restaurant and order commands const decider: Decider = restaurantDecider.combine(orderDecider); - // Create a repository for the events / a Deno implementation of the IEventRepository + // Create a repository for the events / a Deno implementation of the IEventRepository and optionally enable event enqueueing const eventRepository = new DenoEventRepository(kv, true); // Create an aggregate to handle the commands of all types / Aggregate is composed of a decider and an event repository const aggregate: ApplicationAggregate = new EventSourcingAggregate( @@ -72,7 +81,7 @@ Deno.serve(async (request: Request) => { }); } catch (error) { console.error("Error of command handling: ", error); - return new Response(error?.message ?? error, { + return new Response((error as Error)?.message ?? error, { headers: { "Content-Type": "application/json" }, status: 500, }); From ee8d941a822ae0a58ec4db7a4f0308d97a2aaf30 Mon Sep 17 00:00:00 2001 From: J Date: Wed, 30 Oct 2024 13:43:02 +0000 Subject: [PATCH 3/3] back to fs kv --- server.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server.ts b/server.ts index 82748f6..73816c5 100644 --- a/server.ts +++ b/server.ts @@ -24,8 +24,8 @@ import { } from "./lib/api_schema.ts"; import type { Command, Event } from "./lib/api.ts"; -// Open the key-value store: running in-memory -const kv = await Deno.openKv(":memory:"); +// Open the key-value store +const kv = await Deno.openKv("./db.sqlite3"); // Listen to events from kv queue and apply them to the materialized view // retry policy can specified on kv.enqueue method (optionally enabled in the DenoEventRepository)