Skip to content

Commit

Permalink
Merge pull request #1 from barnesoir/main
Browse files Browse the repository at this point in the history
WIP/Suggestion: Add KV queues & ViewStateRepository to example
  • Loading branch information
idugalic authored Oct 31, 2024
2 parents 7ee51b8 + ee8d941 commit 6bb489d
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 12 deletions.
4 changes: 2 additions & 2 deletions lib/api_schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -305,10 +305,10 @@ export const commandMetadataSchema = z.object({
});
// Event metadata
export const eventMetadataSchema = z.object({
tenant: z.string(),
// tenant: z.string(),
eventId: z.string(),
commandId: z.string(),
offset: z.number(),
// offset: z.number(),
});
// Query metadata
export const queryMetadataSchema = z.object({
Expand Down
60 changes: 59 additions & 1 deletion lib/infrastructure.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
* These classes are used in the infrastructure layer of the application.
*/

import type { IEventRepository } from "fmodel";
import type { IEventRepository, IViewStateRepository } from "fmodel";
import type { Command, Event } from "./api.ts";
import { monotonicFactory } from "ulid";
import type { OrderView, RestaurantView } from "./domain.ts";

export const LAST_STREAM_EVENT_KEY_PREFIX = "lastStreamEvent";
export const EVENTS_KEY_PREFIX = "events";
Expand Down Expand Up @@ -40,6 +41,7 @@ export class DenoEventRepository implements
> {
constructor(
private readonly kv: Deno.Kv,
private readonly enqueueEvents: boolean = false,
) {
}

Expand Down Expand Up @@ -115,6 +117,9 @@ export class DenoEventRepository implements

// Add the event key to the list of keys that are stored
keys.push(eventsByStreamIdKey);

// If the enqueueEvents flag is set, enqueue the event
this.enqueueEvents && atomicOperation.enqueue(newEvent);
}
// Commit the transaction
if (!(await atomicOperation.commit()).ok) {
Expand Down Expand Up @@ -142,3 +147,56 @@ export class DenoEventRepository implements
return result;
}
}

export const VIEW_KEY_PREFIX = "view";

export type ViewState = (RestaurantView & OrderView) | null;
export class DenoViewStateRepository implements
IViewStateRepository<
Event,
ViewState,
StreamVersion,
EventMetadata
> {
constructor(private readonly kv: Deno.Kv) {}

// Save the state
// key schema: ["view", "streamId"]
async save(
state: ViewState,
em: EventMetadata,
version: StreamVersion | null,
): Promise<(RestaurantView & OrderView & StreamVersion)> {
if (!state) throw new Error("State is missing");
const streamIdFromEvent = (em as unknown as Event).id;
if (!streamIdFromEvent) {
throw new Error(
"Failed to extract streamId from event metadata, assuming EM is E", // can be assumed from how repo mat view handle is used
);
}
const key = [VIEW_KEY_PREFIX, streamIdFromEvent];
// const key = [VIEW_KEY_PREFIX, state.orderId ?? state.restaurantId]; // better way to do this?
const atomicOperation = this.kv.atomic();
if (version) {
atomicOperation.check({
key: key,
versionstamp: version.versionstamp,
});
}
atomicOperation.set(key, state);
const res = await atomicOperation.commit();
if (!res.ok) {
throw new Error("Failed to save state");
}
return { ...state, versionstamp: res.versionstamp };
}

async fetch(
event: Event & EventMetadata,
): Promise<(ViewState & StreamVersion) | null> {
const { value, versionstamp } = await this.kv.get<
ViewState & StreamVersion
>([VIEW_KEY_PREFIX, event.id]);
return value ? { ...value, versionstamp } : null;
}
}
54 changes: 45 additions & 9 deletions server.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,57 @@
import { blue } from "std/fmt/colors.ts";
import { blue, red } from "std/fmt/colors.ts";
import {
type Order,
orderDecider,
orderView,
type Restaurant,
restaurantDecider,
restaurantView,
} from "./lib/domain.ts";
import { type Decider, EventSourcingAggregate } from "fmodel";
import { type Decider, EventSourcingAggregate, MaterializedView } from "fmodel";
import {
type CommandMetadata,
DenoEventRepository,
DenoViewStateRepository,
type EventMetadata,
} from "./lib/infrastructure.ts";
import type { ApplicationAggregate } from "./lib/application.ts";
import { commandAndMetadataSchema } from "./lib/api_schema.ts";
import type {
ApplicationAggregate,
ApplicationMaterializedView,
} from "./lib/application.ts";
import {
commandAndMetadataSchema,
eventAndMetadataSchema,
} from "./lib/api_schema.ts";
import type { Command, Event } from "./lib/api.ts";

// Open the key-value store
const kv = await Deno.openKv("./db.sqlite3");

// Listen to events from kv queue and apply them to the materialized view
// retry policy can specified on kv.enqueue method (optionally enabled in the DenoEventRepository)
kv.listenQueue(async (raw) => {
try {
// Parse the event and metadata from the raw data / Zod validation/parsing
const event: Event & EventMetadata = eventAndMetadataSchema.parse(raw);
console.log(blue("Handling event: "), event);
// Combine views to create a new view that can handle both restaurant and order events
const view = restaurantView.combine(orderView);
// Create a repository for the view state / a Deno implementation of the IViewStateRepository
const readRepository = new DenoViewStateRepository(kv);
// Create a materialized view to handle the events of all types / MaterializedView is composed of a view and a read repository
const materializedView: ApplicationMaterializedView = new MaterializedView(
view,
readRepository,
);
// Handle the events of all types
const result = await materializedView.handle(event);
console.log(blue("Result of event handling: "), result);
} catch (error) {
// Catch & no throw to prevent queue retries
console.log(red("Error of event handling: "), error);
}
});

// A simple HTTP server that handles commands of all types
Deno.serve(async (request: Request) => {
try {
Expand All @@ -24,13 +62,11 @@ Deno.serve(async (request: Request) => {

console.log(blue("Handling command: "), command);

// Open the key-value store
const kv = await Deno.openKv("./db.sqlite3");
// Combine deciders to create a new decider that can handle both restaurant and order commands
const decider: Decider<Command, (Order & Restaurant) | null, Event> =
restaurantDecider.combine(orderDecider);
// Create a repository for the events / a Deno implementation of the IEventRepository
const eventRepository = new DenoEventRepository(kv);
// Create a repository for the events / a Deno implementation of the IEventRepository and optionally enable event enqueueing
const eventRepository = new DenoEventRepository(kv, true);
// Create an aggregate to handle the commands of all types / Aggregate is composed of a decider and an event repository
const aggregate: ApplicationAggregate = new EventSourcingAggregate(
decider,
Expand All @@ -45,7 +81,7 @@ Deno.serve(async (request: Request) => {
});
} catch (error) {
console.error("Error of command handling: ", error);
return new Response(error?.message ?? error, {
return new Response((error as Error)?.message ?? error, {
headers: { "Content-Type": "application/json" },
status: 500,
});
Expand Down

1 comment on commit 6bb489d

@deno-deploy
Copy link

@deno-deploy deno-deploy bot commented on 6bb489d Oct 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Failed to deploy:

UNCAUGHT_EXCEPTION

TypeError: Non-default databases are not supported
    at Object.openKv (ext:deno_kv/01_db.ts:10:21)
    at file:///src/server.ts:28:23

Please sign in to comment.