Skip to content

Commit

Permalink
Fix not overlapping setStreamMetadata prototype (#46)
Browse files Browse the repository at this point in the history
* Fix not overlapping setStreamMetadata prototype
* Fix issue: setMetadata in constructor call preceding mergeObjectContext
* Bump deps and lint files
  • Loading branch information
jokesterfr authored Feb 9, 2021
1 parent d83f080 commit b6d53d8
Show file tree
Hide file tree
Showing 13 changed files with 674 additions and 796 deletions.
51 changes: 25 additions & 26 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,41 +39,40 @@
},
"bugs": "https://github.com/prestashopCorp/nestjs-eventstore/issues",
"peerDependencies": {
"@nestjs/common": "^7.0.0",
"@nestjs/core": "^7.0.0",
"reflect-metadata": "^0.1.12",
"@nestjs/common": "^7.6.11",
"@nestjs/core": "^7.6.11",
"reflect-metadata": "^0.1.13",
"rimraf": "^3.0.2",
"rxjs": "^6.3.3"
"rxjs": "^6.6.3"
},
"dependencies": {
"@nestjs/common": "^7.0.0",
"@nestjs/core": "^7.0.0",
"@nestjs/cqrs": "^7.0.0",
"@nestjs/terminus": "^7.0.1",
"@godaddy/terminus": "^4.4.1",
"geteventstore-promise": "3.2.4",
"lodash": "^4.17.15",
"@godaddy/terminus": "^4.6.0",
"@nestjs/common": "^7.6.11",
"@nestjs/core": "^7.6.11",
"@nestjs/cqrs": "^7.0.1",
"@nestjs/terminus": "^7.1.0",
"geteventstore-promise": "3.2.5",
"lodash": "^4.17.20",
"node-eventstore-client": "0.2.18",
"rxjs": "^6.5.4",
"uuid": "^3.4.0"
"uuid": "^8.3.2"
},
"devDependencies": {
"@nestjs/platform-express": "^6.8.5",
"@nestjs/testing": "^6.8.5",
"@types/express": "^4.17.1",
"@types/jest": "^25.2.1",
"@types/node": "^12.12.0",
"@types/supertest": "^2.0.8",
"jest": "^26.0.0",
"prettier": "^1.18.2",
"supertest": "4.0.2",
"ts-jest": "^25.4.0",
"ts-node": "^8.4.1",
"tsc-watch": "^4.0.0",
"@nestjs/platform-express": "^7.6.11",
"@nestjs/testing": "^7.6.11",
"@types/jest": "^26.0.20",
"@types/node": "^14.14.25",
"@types/supertest": "^2.0.10",
"jest": "^26.6.3",
"prettier": "^2.2.1",
"supertest": "6.1.3",
"ts-jest": "^26.5.0",
"ts-node": "^9.1.1",
"tsc-watch": "^4.2.9",
"tsconfig-paths": "^3.9.0",
"tslint": "^5.20.0",
"tslint": "^5.20.1",
"tslint-config-airbnb": "^5.11.2",
"typescript": "3.9.7"
"typescript": "4.1.3"
},
"jest": {
"moduleFileExtensions": [
Expand Down
12 changes: 2 additions & 10 deletions src/cqrs/event-store-cqrs.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,7 @@ export class EventStoreCqrsModule extends CqrsModule {
{
provide: EventStoreBus,
useFactory: async (commandBus, eventStore, eventBus) => {
return new EventStoreBus(
eventStore,
eventStoreBusConfig,
eventBus,
);
return new EventStoreBus(eventStore, eventStoreBusConfig, eventBus);
},
inject: [CommandBus, EventStore, EventBus],
},
Expand Down Expand Up @@ -67,11 +63,7 @@ export class EventStoreCqrsModule extends CqrsModule {
{
provide: EventStoreBus,
useFactory: async (commandBus, eventStore, eventBus) => {
return new EventStoreBus(
eventStore,
eventStoreBusConfig,
eventBus,
);
return new EventStoreBus(eventStore, eventStoreBusConfig, eventBus);
},
inject: [CommandBus, EventStore, EventBus],
},
Expand Down
22 changes: 13 additions & 9 deletions src/cqrs/event-store.aggregate-root.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,38 @@ import { EventStoreTransaction, WriteResult } from 'node-eventstore-client';

export abstract class EventStoreAggregateRoot extends AggregateRoot {
public streamConfig: IStreamConfig;
public streamMetadata: IStreamMetadata;
public isMetadataSet: boolean;

async setStreamConfig(streamConfig: IStreamConfig) {
this.streamConfig = streamConfig;
if (streamConfig.metadata) {
await this.setStreamMetadata(streamConfig.metadata);
}
this.streamMetadata = streamConfig.metadata;
this.isMetadataSet = false;
}

async setStreamMetadata(metadata: IStreamMetadata): Promise<WriteResult> {
// TODO log if not replaced
async setStreamMetadata(
metadata: IStreamMetadata,
expectedStreamMetadataVersion: number = ExpectedVersion.Any,
): Promise<WriteResult> {
console.log('nestjs-get-eventstore::setStreamMetadata not replaced');
return;
}

async startTransaction(
expectedVersion: number = ExpectedVersion.Any,
): Promise<EventStoreTransaction> {
// TODO log if not replaced
console.log('nestjs-get-eventstore::startTransaction not replaced');
return;
}

async continueTransaction(
transaction: EventStoreTransaction,
): Promise<EventStoreTransaction> {
// TODO log if not replaced
console.log('nestjs-get-eventstore::continueTransaction not replaced');
return;
}

async commit(): Promise<void> {
return super.commit() as unknown as Promise<void>;
return (super.commit() as unknown) as Promise<void>;
}
}
21 changes: 10 additions & 11 deletions src/cqrs/event-store.bus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ import { PersistentSubscriptionOptions } from 'geteventstore-promise';
const fs = require('fs');

@Injectable()
export class EventStoreBus
implements OnModuleDestroy, OnModuleInit {
export class EventStoreBus implements OnModuleDestroy, OnModuleInit {
private logger: Logger = new Logger(this.constructor.name);
private readonly eventMapper: (data, options: IEventStoreEventOptions) => {};
private readonly onPublishFail = (
Expand Down Expand Up @@ -96,20 +95,20 @@ export class EventStoreBus
.writeEvents(event['eventStreamId'], [event], expectedVersion)
.pipe(
tap(
_ => {
(_) => {
// Forward to local event handler and saga
if (this.config.publishAlsoLocally) {
this.subject$.next(event);
}
},
err => {
(err) => {
this.onPublishFail(err, [event], this);
},
),
)
.toPromise();
}

async publishAll(events: IEvent[], streamConfig: IStreamConfig) {
const expectedVersion = streamConfig.expectedVersion || ExpectedVersion.Any;
const eventCount = events.length;
Expand All @@ -120,13 +119,13 @@ export class EventStoreBus
.writeEvents(streamConfig.streamName, events, expectedVersion)
.pipe(
tap(
_ => {
(_) => {
// Forward to local event handler and saga
if (this.config.publishAlsoLocally) {
events.forEach(event => this.subject$.next(event));
events.forEach((event) => this.subject$.next(event));
}
},
err => {
(err) => {
this.onPublishFail(err, events, this);
},
),
Expand All @@ -136,7 +135,7 @@ export class EventStoreBus

async assertProjections(projections: IEventStoreProjection[]) {
await Promise.all(
projections.map(async projection => {
projections.map(async (projection) => {
let content;
if (projection.content) {
this.logger.log(
Expand Down Expand Up @@ -200,7 +199,7 @@ export class EventStoreBus
subscriptions: IEventStorePersistentSubscriptionConfig[],
) {
await Promise.all(
subscriptions.map(async subscription => {
subscriptions.map(async (subscription) => {
try {
this.logger.log(
`Check if persistent subscription "${subscription.group}" on stream ${subscription.stream} needs to be created `,
Expand Down Expand Up @@ -239,7 +238,7 @@ export class EventStoreBus
}),
);
await Promise.all(
subscriptions.map(async config => {
subscriptions.map(async (config) => {
this.logger.log(
`Connecting to persistent subscription "${config.group}" on stream ${config.stream}`,
);
Expand Down
10 changes: 9 additions & 1 deletion src/cqrs/event-store.publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,17 @@ export class EventStorePublisher {
}

async commit() {
if (!this.isMetadataSet && this.streamMetadata) {
this.isMetadataSet = true;
await this.setStreamMetadata(this.streamMetadata);
}
if (this.streamConfig) {
await eventBus.publishAll(
this.getUncommittedEvents(),
this.streamConfig,
);
} else {
this.getUncommittedEvents().forEach(event => this.publish(event));
this.getUncommittedEvents().forEach((event) => this.publish(event));
}
this.uncommit();
}
Expand Down Expand Up @@ -77,6 +81,10 @@ export class EventStorePublisher {
const eventStore = this.eventStore;

object.commit = async () => {
if (object.streamMetadata && !object.isMetadataSet) {
object.isMetadataSet = true;
await object.setStreamMetadata(object.streamMetadata);
}
if (object.streamConfig) {
await eventBus.publishAll(
object.getUncommittedEvents(),
Expand Down
16 changes: 7 additions & 9 deletions src/event-store.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ export class EventStore {
private volatileSubscriptions: ISubscriptionStatus = {};
private persistentSubscriptions: ISubscriptionStatus = {};

constructor(
public readonly config: IEventStoreConfig,
) {
constructor(public readonly config: IEventStoreConfig) {
this.HTTPClient = new geteventstorePromise.HTTPClient({
hostname: config.http.host.replace(/^https?:\/\//, ''),
port: config.http.port,
Expand Down Expand Up @@ -64,15 +62,15 @@ export class EventStore {
writeEvents(stream, events: IEvent[], expectedVersion = ExpectedVersion.Any) {
return from(events)
.pipe(
map(event =>
map((event) =>
createJsonEventData(
event['eventId'] || v4(),
event['data'] || {},
event['metadata'] || { version: 1, created_at: new Date() },
event['eventType'] || event.constructor.name,
),
),
catchError(err => {
catchError((err) => {
return throwError({
message: `Unable to convert event to EventStore event : ${err.message}`,
response: { status: 400 },
Expand All @@ -82,12 +80,12 @@ export class EventStore {
)
.pipe(
//tap(esEvents => console.log('Writing events', stream, esEvents)),
flatMap(esEvents =>
flatMap((esEvents) =>
from(
this.connection.appendToStream(stream, expectedVersion, esEvents),
),
),
catchError(err => {
catchError((err) => {
if (err.response) {
err.message = err.message + ' : ' + err.response.statusText;
}
Expand Down Expand Up @@ -149,7 +147,7 @@ export class EventStore {
bufferSize,
autoAck,
)
.then(subscription => {
.then((subscription) => {
this.logger.log(
`Connected to persistent subscription ${group} on stream ${stream}!`,
);
Expand Down Expand Up @@ -218,7 +216,7 @@ export class EventStore {
lastCheckpoint,
resolveLinkTos,
onEvent,
subscription => {
(subscription) => {
this.catchupSubscriptions[stream] = {
isConnected: true,
streamName: stream,
Expand Down
8 changes: 4 additions & 4 deletions src/event-store.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ export class EventStoreModule {
},
{
provide: EventStoreHealthIndicator,
useFactory: eventStore => {
useFactory: (eventStore) => {
return new EventStoreHealthIndicator(eventStore);
},
inject: [EventStore],
},
{
provide: EventStoreSubscriptionHealthIndicator,
useFactory: eventStore => {
useFactory: (eventStore) => {
return new EventStoreSubscriptionHealthIndicator(eventStore);
},
inject: [EventStore],
Expand Down Expand Up @@ -62,14 +62,14 @@ export class EventStoreModule {
},
{
provide: EventStoreHealthIndicator,
useFactory: eventStore => {
useFactory: (eventStore) => {
return new EventStoreHealthIndicator(eventStore);
},
inject: [EventStore],
},
{
provide: EventStoreSubscriptionHealthIndicator,
useFactory: eventStore => {
useFactory: (eventStore) => {
return new EventStoreSubscriptionHealthIndicator(eventStore);
},
inject: [EventStore],
Expand Down
4 changes: 2 additions & 2 deletions src/interceptor/event-store.interceptor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ export class EventStoreInterceptor implements NestInterceptor {
// only if return is an event
handlerSubject$
.pipe(
filter(req => {
filter((req) => {
return req instanceof EventStoreEvent;
}),
map((ev: EventStoreEvent) => {
Expand All @@ -35,7 +35,7 @@ export class EventStoreInterceptor implements NestInterceptor {
return ev;
}),
)
.subscribe(ev => {
.subscribe((ev) => {
this.eventStore.writeEvents(ev.eventStreamId, [ev]);
});

Expand Down
5 changes: 3 additions & 2 deletions src/interfaces/event.interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ export abstract class EventStoreEvent implements IAggregateEvent {
created_at: new Date(),
version: 1,
},
...(options.metadata || {})
...(options.metadata || {}),
};
this.eventId = options.eventId || v4();
this.eventType = options.eventType || this.constructor.name;
Expand Down Expand Up @@ -91,7 +91,8 @@ export interface IAcknowledgeableEvent {
) => Promise<any>;
}

export abstract class AcknowledgeableEventStoreEvent extends EventStoreEvent
export abstract class AcknowledgeableEventStoreEvent
extends EventStoreEvent
implements IAcknowledgeableEvent {
ack() {
return Promise.resolve();
Expand Down
5 changes: 2 additions & 3 deletions src/interfaces/subscription.interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,9 @@ export interface ISubscriptionStatus {
export type IEventStorePersistentSubscriptionConfig = {
stream: string;
group: string;
options?: PersistentSubscriptionOptions
/**
options?: PersistentSubscriptionOptions & /**
* @deprecated The resolveLinktos parameter shouln't be used anymore. The resolveLinkTos parameter should be used instead.
*/ & { resolveLinktos?: boolean };
*/ { resolveLinktos?: boolean };
autoAck?: boolean | undefined;
bufferSize?: number | undefined;
onSubscriptionStart?: (
Expand Down
Loading

0 comments on commit b6d53d8

Please sign in to comment.