From d0cee542a4bea8043262159a390bc06ff29f5c90 Mon Sep 17 00:00:00 2001 From: Andrew Richardson Date: Tue, 15 Mar 2022 13:41:20 -0400 Subject: [PATCH] Incorporate contract instance path into subscription name Fixes #59 Signed-off-by: Andrew Richardson --- src/event-stream/event-stream.service.ts | 4 +-- src/tokens/tokens.service.ts | 26 ++++++++++++++------ src/tokens/tokens.util.spec.ts | 26 +++++++++++--------- src/tokens/tokens.util.ts | 31 ++++++++++++++++-------- test/app.e2e-spec.ts | 29 +++++++++++----------- 5 files changed, 69 insertions(+), 47 deletions(-) diff --git a/src/event-stream/event-stream.service.ts b/src/event-stream/event-stream.service.ts index d7b018b..e2b704e 100644 --- a/src/event-stream/event-stream.service.ts +++ b/src/event-stream/event-stream.service.ts @@ -156,9 +156,9 @@ export class EventStreamService { return response.data; } - async createOrUpdateStream(topic: string): Promise { + async createOrUpdateStream(name: string, topic: string): Promise { const streamDetails = { - name: topic, + name, errorHandling: 'block', batchSize: 50, batchTimeoutMS: 500, diff --git a/src/tokens/tokens.service.ts b/src/tokens/tokens.service.ts index 04ab06d..64aa90c 100644 --- a/src/tokens/tokens.service.ts +++ b/src/tokens/tokens.service.ts @@ -51,6 +51,7 @@ import { encodeHex, encodeHexIDForURI, isFungible, + packStreamName, packSubscriptionName, packTokenId, unpackSubscriptionName, @@ -125,13 +126,14 @@ export class TokensService { this.instancePath, this.stream.id, tokenCreateEvent, - packSubscriptionName(this.topic, BASE_SUBSCRIPTION_NAME), + packSubscriptionName(this.topic, this.instancePath, BASE_SUBSCRIPTION_NAME, tokenCreateEvent), ); } private async getStream() { if (this.stream === undefined) { - this.stream = await this.eventstream.createOrUpdateStream(this.topic); + const name = packStreamName(this.topic, this.instancePath); + this.stream = await this.eventstream.createOrUpdateStream(name, this.topic); } return this.stream; } @@ -145,9 +147,17 @@ export class TokensService { * TODO: eventually this migration logic can be pruned */ async migrate() { + const name = packStreamName(this.topic, this.instancePath); const streams = await this.eventstream.getStreams(); - const existingStream = streams.find(s => s.name === this.topic); + const existingStream = streams.find(s => s.name === name); if (existingStream === undefined) { + // Look for the old stream name (topic alone) + const oldStream = streams.find(s => s.name === this.topic); + if (oldStream !== undefined) { + this.logger.warn('Old event stream found - deleting and recreating'); + await this.eventstream.deleteStream(oldStream.id); + await this.init(); + } return; } const subscriptions = await this.eventstream.getSubscriptions(); @@ -175,7 +185,7 @@ export class TokensService { this.logger.warn('Incorrect event stream subscriptions found - deleting and recreating'); await this.eventstream.deleteStream(existingStream.id); await this.init(); - break; + return; } } } @@ -231,28 +241,28 @@ export class TokensService { this.instancePath, stream.id, tokenCreateEvent, - packSubscriptionName(this.topic, dto.poolId, tokenCreateEvent), + packSubscriptionName(this.topic, this.instancePath, dto.poolId, tokenCreateEvent), dto.transaction?.blockNumber ?? '0', ), this.eventstream.getOrCreateSubscription( this.instancePath, stream.id, transferSingleEvent, - packSubscriptionName(this.topic, dto.poolId, transferSingleEvent), + packSubscriptionName(this.topic, this.instancePath, dto.poolId, transferSingleEvent), dto.transaction?.blockNumber ?? '0', ), this.eventstream.getOrCreateSubscription( this.instancePath, stream.id, transferBatchEvent, - packSubscriptionName(this.topic, dto.poolId, transferBatchEvent), + packSubscriptionName(this.topic, this.instancePath, dto.poolId, transferBatchEvent), dto.transaction?.blockNumber ?? '0', ), this.eventstream.getOrCreateSubscription( this.instancePath, stream.id, approvalForAllEvent, - packSubscriptionName(this.topic, dto.poolId, approvalForAllEvent), + packSubscriptionName(this.topic, this.instancePath, dto.poolId, approvalForAllEvent), // Block number is 0 because it is important to receive all approval events, // so existing approvals will be reflected in the newly created pool '0', diff --git a/src/tokens/tokens.util.spec.ts b/src/tokens/tokens.util.spec.ts index ce3b0fc..b71a666 100644 --- a/src/tokens/tokens.util.spec.ts +++ b/src/tokens/tokens.util.spec.ts @@ -18,6 +18,7 @@ import { decodeHex, encodeHex, encodeHexIDForURI, + packStreamName, packSubscriptionName, packTokenId, unpackSubscriptionName, @@ -67,29 +68,30 @@ describe('Util', () => { }); }); + it('packStreamName', () => { + expect(packStreamName('token', '0x123')).toEqual('token:0x123'); + }); + it('packSubscriptionName', () => { - expect(packSubscriptionName('token', 'F1')).toEqual('token:F1'); - expect(packSubscriptionName('token', 'N1', 'create')).toEqual('token:N1:create'); - expect(packSubscriptionName('tok:en', 'N1', 'create')).toEqual('tok:en:N1:create'); + expect(packSubscriptionName('token', '0x123', 'F1', 'create')).toEqual('token:0x123:F1:create'); + expect(packSubscriptionName('tok:en', '0x123', 'N1', 'create')).toEqual( + 'tok:en:0x123:N1:create', + ); }); it('unpackSubscriptionName', () => { - expect(unpackSubscriptionName('token', 'token:F1')).toEqual({ + expect(unpackSubscriptionName('token', 'token:0x123:F1:create')).toEqual({ prefix: 'token', + instancePath: '0x123', poolId: 'F1', - }); - expect(unpackSubscriptionName('token', 'token:N1:create')).toEqual({ - prefix: 'token', - poolId: 'N1', event: 'create', }); - expect(unpackSubscriptionName('tok:en', 'tok:en:N1:create')).toEqual({ + expect(unpackSubscriptionName('tok:en', 'tok:en:0x123:N1:create')).toEqual({ prefix: 'tok:en', + instancePath: '0x123', poolId: 'N1', event: 'create', }); - expect(unpackSubscriptionName('token', 'bad:N1:create')).toEqual({ - prefix: 'token', - }); + expect(unpackSubscriptionName('token', 'bad:N1:create')).toEqual({}); }); }); diff --git a/src/tokens/tokens.util.ts b/src/tokens/tokens.util.ts index b0877dd..49b0d22 100644 --- a/src/tokens/tokens.util.ts +++ b/src/tokens/tokens.util.ts @@ -68,20 +68,31 @@ export function unpackTokenId(id: string) { }; } -export function packSubscriptionName(prefix: string, poolId: string, event?: string) { - if (event === undefined) { - return [prefix, poolId].join(':'); - } - return [prefix, poolId, event].join(':'); +export function packStreamName(prefix: string, instancePath: string) { + return [prefix, instancePath].join(':'); +} + +export function packSubscriptionName( + prefix: string, + instancePath: string, + poolId: string, + event: string, +) { + return [prefix, instancePath, poolId, event].join(':'); } export function unpackSubscriptionName(prefix: string, data: string) { - const parts = data.startsWith(prefix + ':') - ? data.slice(prefix.length + 1).split(':', 2) - : undefined; + if (!data.startsWith(prefix + ':')) { + return {}; + } + const parts = data.slice(prefix.length + 1).split(':'); + if (parts.length !== 3) { + return {}; + } return { prefix, - poolId: parts?.[0], - event: parts?.[1], + instancePath: parts[0], + poolId: parts[1], + event: parts[2], }; } diff --git a/test/app.e2e-spec.ts b/test/app.e2e-spec.ts index f33468e..0af64b1 100644 --- a/test/app.e2e-spec.ts +++ b/test/app.e2e-spec.ts @@ -52,6 +52,7 @@ import { } from '../src/tokens/tokens.interfaces'; import { TokensService } from '../src/tokens/tokens.service'; import { WebSocketMessage } from '../src/websocket-events/websocket-events.base'; +import { packSubscriptionName } from '../src/tokens/tokens.util'; import { AppModule } from './../src/app.module'; const BASE_URL = 'http://eth'; @@ -403,7 +404,7 @@ describe('AppController (e2e)', () => { it('Websocket: token pool event', () => { eventstream.getSubscription.mockReturnValueOnce({ - name: TOPIC + ':F1', + name: packSubscriptionName(TOPIC, '0x123', 'F1', ''), }); return server @@ -461,7 +462,7 @@ describe('AppController (e2e)', () => { it('Websocket: token pool event from base subscription', () => { eventstream.getSubscription.mockReturnValueOnce({ - name: TOPIC + ':base', + name: packSubscriptionName(TOPIC, '0x123', 'base', ''), }); return server @@ -519,7 +520,7 @@ describe('AppController (e2e)', () => { it('Websocket: token mint event', async () => { eventstream.getSubscription.mockReturnValueOnce({ - name: TOPIC + ':F1', + name: packSubscriptionName(TOPIC, '0x123', 'F1', ''), }); http.get = jest.fn( @@ -611,7 +612,7 @@ describe('AppController (e2e)', () => { it('Websocket: token burn event', async () => { eventstream.getSubscription.mockReturnValueOnce({ - name: TOPIC + ':N1', + name: packSubscriptionName(TOPIC, '0x123', 'N1', ''), }); http.get = jest.fn( @@ -702,7 +703,7 @@ describe('AppController (e2e)', () => { it('Websocket: token transfer event', async () => { eventstream.getSubscription.mockReturnValueOnce({ - name: TOPIC + ':N1', + name: packSubscriptionName(TOPIC, '0x123', 'N1', ''), }); http.get = jest.fn( @@ -780,7 +781,7 @@ describe('AppController (e2e)', () => { it('Websocket: token approval event', async () => { eventstream.getSubscription.mockReturnValueOnce({ - name: TOPIC + ':N1', + name: packSubscriptionName(TOPIC, '0x123', 'N1', ''), }); await server @@ -841,9 +842,8 @@ describe('AppController (e2e)', () => { }); it('Websocket: token transfer event from wrong pool', () => { - eventstream.getSubscription - .mockReturnValueOnce({ name: TOPIC + ':N1' }) - .mockReturnValueOnce({ name: TOPIC + ':N1' }); + const sub = { name: packSubscriptionName(TOPIC, '0x123', 'N1', '') }; + eventstream.getSubscription.mockReturnValueOnce(sub).mockReturnValueOnce(sub); return server .ws('/api/ws') @@ -893,7 +893,7 @@ describe('AppController (e2e)', () => { it('Websocket: token batch transfer', async () => { eventstream.getSubscription.mockReturnValueOnce({ - name: TOPIC + ':N1', + name: packSubscriptionName(TOPIC, '0x123', 'N1', ''), }); http.get = jest.fn( @@ -1077,7 +1077,7 @@ describe('AppController (e2e)', () => { }; eventstream.getSubscription.mockReturnValueOnce({ - name: TOPIC + ':F1', + name: packSubscriptionName(TOPIC, '0x123', 'F1', ''), }); await server @@ -1116,7 +1116,7 @@ describe('AppController (e2e)', () => { }; eventstream.getSubscription.mockReturnValueOnce({ - name: TOPIC + ':F1', + name: packSubscriptionName(TOPIC, '0x123', 'F1', ''), }); const ws1 = server.ws('/api/ws'); @@ -1173,9 +1173,8 @@ describe('AppController (e2e)', () => { }, }; - eventstream.getSubscription - .mockReturnValueOnce({ name: TOPIC + ':F1' }) - .mockReturnValueOnce({ name: TOPIC + ':F1' }); + const sub = { name: packSubscriptionName(TOPIC, '0x123', 'F1', '') }; + eventstream.getSubscription.mockReturnValueOnce(sub).mockReturnValueOnce(sub); const ws1 = server.ws('/api/ws'); const ws2 = server.ws('/api/ws');