Skip to content

Commit

Permalink
Merge pull request #720 from fraktalio/feature/metadata
Browse files Browse the repository at this point in the history
Feature/metadata
  • Loading branch information
idugalic authored Apr 12, 2024
2 parents 038e79d + 4179370 commit 054a489
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 38 deletions.
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@fraktalio/fmodel-ts",
"version": "2.0.2",
"version": "2.1.0",
"description": "Functional domain modeling with TypeScript. Optimized for event sourcing and CQRS",
"main": "build/main/index.js",
"typings": "build/main/index.d.ts",
Expand Down
11 changes: 3 additions & 8 deletions src/lib/application/eventsourcing-aggregate.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -241,28 +241,23 @@ class EventRepositoryImpl
async save(
eList: readonly Evt[],
commandMetadata: CmdMetadata,
versionProvider: (e: Evt & EvtMetadata) => Promise<Version | null>
versionProvider: (e: Evt) => Promise<Version | null>
): Promise<readonly (Evt & Version & EvtMetadata)[]> {
//mapping the Commands metadata into Events metadata !!!
const savedEvents: readonly (Evt & Version & EvtMetadata)[] =
await Promise.all(
eList.map(async (e: Evt, index) => ({
kind: e.kind,
value: e.value,
version:
((
await versionProvider({ ...e, traceId: commandMetadata.traceId })
)?.version ?? 0) +
index +
1,
version: ((await versionProvider(e))?.version ?? 0) + index + 1,
traceId: commandMetadata.traceId,
}))
);
storage.concat(savedEvents);
return savedEvents;
}

async versionProvider(_e: Evt & EvtMetadata): Promise<Version | null> {
async versionProvider(_e: Evt): Promise<Version | null> {
return storage[storage.length - 1];
}
}
Expand Down
40 changes: 20 additions & 20 deletions src/lib/application/eventsourcing-aggregate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,33 +31,33 @@ export interface IEventRepository<C, E, V, CM, EM> {
/**
* Fetch events
*
* @param command - Command of type `C` with metadata of type `CM`
* @param command - Command of type `C`
*
* @return list of Events with Version and Event Metadata
*/
readonly fetch: (command: C & CM) => Promise<readonly (E & V & EM)[]>;
readonly fetch: (command: C) => Promise<readonly (E & V & EM)[]>;

/**
* Get the latest event stream version / sequence
* Get the event stream version / sequence
*
* @param event - Event of type `E & EM`
* @param event - Event of type `E`
*
* @return the latest version / sequence of the event stream that this event belongs to.
* @return the version / sequence of the event stream that this event belongs to.
*/
readonly versionProvider: (event: E & EM) => Promise<V | null>;
readonly versionProvider: (event: E) => Promise<V | null>;

/**
* Save events
*
* @param events - list of Events
* @param commandMetadata - Command Metadata of the command that initiated `events`
* @param versionProvider - A provider for the stream Version/Sequence
* @param versionProvider - A provider for the Latest Event in this stream and its Version/Sequence
* @return a list of newly saved Event(s) of type `E` with Version of type `V` and with Event Metadata of type `EM`
*/
readonly save: (
events: readonly E[],
commandMetadata: CM,
versionProvider: (e: E & EM) => Promise<V | null>
versionProvider: (e: E) => Promise<V | null>
) => Promise<readonly (E & V & EM)[]>;
}

Expand Down Expand Up @@ -140,7 +140,7 @@ export abstract class EventComputation<C, S, E> implements IDecider<C, S, E> {
* An abstract algorithm to compute new events based on the old events and the command being handled.
* It returns all the events, including the events created by handling commands which are triggered by Saga - orchestration included.
*/
export abstract class EventOrchestratingComputation<C, S, E, CM>
export abstract class EventOrchestratingComputation<C, S, E>
implements IDecider<C, S, E>, ISaga<E, C>
{
protected constructor(
Expand Down Expand Up @@ -177,14 +177,14 @@ export abstract class EventOrchestratingComputation<C, S, E, CM>

protected async computeNewEvents(
events: readonly E[],
command: C & CM,
fetch: (c: C & CM) => Promise<readonly E[]>
command: C,
fetch: (c: C) => Promise<readonly E[]>
): Promise<readonly E[]> {
// eslint-disable-next-line functional/no-let
let resultingEvents = this.computeNewEventsInternally(events, command);
await asyncForEach(
resultingEvents.flatMap((evt) => this.saga.react(evt)),
async (cmd) => {
async (cmd: C) => {
const newEvents = this.computeNewEvents(
(await fetch(cmd)).map((evt) => evt as E).concat(resultingEvents),
cmd,
Expand Down Expand Up @@ -224,18 +224,18 @@ export class EventSourcingAggregate<C, S, E, V, CM, EM>
super(decider);
}

async fetch(command: C & CM): Promise<readonly (E & V & EM)[]> {
async fetch(command: C): Promise<readonly (E & V & EM)[]> {
return this.eventRepository.fetch(command);
}

async versionProvider(event: E & EM): Promise<V | null> {
async versionProvider(event: E): Promise<V | null> {
return this.eventRepository.versionProvider(event);
}

async save(
events: readonly E[],
commandMetadata: CM,
versionProvider: (e: E & EM) => Promise<V | null>
versionProvider: (e: E) => Promise<V | null>
): Promise<readonly (E & V & EM)[]> {
return this.eventRepository.save(events, commandMetadata, versionProvider);
}
Expand Down Expand Up @@ -269,7 +269,7 @@ export class EventSourcingAggregate<C, S, E, V, CM, EM>
* @author Иван Дугалић / Ivan Dugalic / @idugalic
*/
export class EventSourcingOrchestratingAggregate<C, S, E, V, CM, EM>
extends EventOrchestratingComputation<C, S, E, CM>
extends EventOrchestratingComputation<C, S, E>
implements IEventSourcingOrchestratingAggregate<C, S, E, V, CM, EM>
{
constructor(
Expand All @@ -280,18 +280,18 @@ export class EventSourcingOrchestratingAggregate<C, S, E, V, CM, EM>
super(decider, saga);
}

async fetch(command: C & CM): Promise<readonly (E & V & EM)[]> {
async fetch(command: C): Promise<readonly (E & V & EM)[]> {
return this.eventRepository.fetch(command);
}

async versionProvider(event: E & EM): Promise<V | null> {
async versionProvider(event: E): Promise<V | null> {
return this.eventRepository.versionProvider(event);
}

async save(
events: readonly E[],
commandMetadata: CM,
versionProvider: (e: E & EM) => Promise<V | null>
versionProvider: (e: E) => Promise<V | null>
): Promise<readonly (E & V & EM)[]> {
return this.eventRepository.save(events, commandMetadata, versionProvider);
}
Expand All @@ -302,7 +302,7 @@ export class EventSourcingOrchestratingAggregate<C, S, E, V, CM, EM>
await this.computeNewEvents(
currentEvents,
command,
async (cmd: C & CM) => await this.eventRepository.fetch(cmd)
async (cmd: C) => await this.eventRepository.fetch(cmd)
),
command,
this.versionProvider.bind(this)
Expand Down
6 changes: 3 additions & 3 deletions src/lib/application/materialized-view.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ export interface IViewStateRepository<E, S, V, EM> {
/**
* Fetch state
*
* @param event - Event of type `E` with metadata of type `EM`
* @param event - Event of type `E`
*
* @return current state / `S` with version / `V`, or NULL
*/
readonly fetch: (event: E & EM) => Promise<(S & V) | null>;
readonly fetch: (event: E) => Promise<(S & V) | null>;
/**
* Save state
*
Expand Down Expand Up @@ -98,7 +98,7 @@ export class MaterializedView<S, E, V, EM>
return this.view.evolve(state, event);
}

async fetch(event: E & EM): Promise<(S & V) | null> {
async fetch(event: E): Promise<(S & V) | null> {
return this.viewStateRepository.fetch(event);
}

Expand Down
8 changes: 4 additions & 4 deletions src/lib/application/statestored-aggregate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ export interface IStateRepository<C, S, V, CM, SM> {
/**
* Fetch state, version and metadata
*
* @param command - Command payload of type C with metadata of type `CM`
* @param command - Command payload of type C
* @return current State/[S], Version/[V] and State Metadata/[SM]
*/
readonly fetch: (command: C & CM) => Promise<(S & V & SM) | null>;
readonly fetch: (command: C) => Promise<(S & V & SM) | null>;

/**
* Save state (with optimistic locking)
Expand Down Expand Up @@ -176,7 +176,7 @@ export class StateStoredAggregate<C, S, E, V, CM, SM>
) {
super(decider);
}
async fetch(command: C & CM): Promise<(S & V & SM) | null> {
async fetch(command: C): Promise<(S & V & SM) | null> {
return this.stateRepository.fetch(command);
}

Expand Down Expand Up @@ -230,7 +230,7 @@ export class StateStoredOrchestratingAggregate<C, S, E, V, CM, SM>
super(decider, saga);
}

async fetch(command: C & CM): Promise<(S & V & SM) | null> {
async fetch(command: C): Promise<(S & V & SM) | null> {
return this.stateRepository.fetch(command);
}

Expand Down

0 comments on commit 054a489

Please sign in to comment.