Skip to content

Commit

Permalink
Incorporate contract instance path into subscription name
Browse files Browse the repository at this point in the history
Fixes #59

Signed-off-by: Andrew Richardson <andrew.richardson@kaleido.io>
  • Loading branch information
awrichar committed Mar 15, 2022
1 parent 1667298 commit d0cee54
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 47 deletions.
4 changes: 2 additions & 2 deletions src/event-stream/event-stream.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,9 @@ export class EventStreamService {
return response.data;
}

async createOrUpdateStream(topic: string): Promise<EventStream> {
async createOrUpdateStream(name: string, topic: string): Promise<EventStream> {
const streamDetails = {
name: topic,
name,
errorHandling: 'block',
batchSize: 50,
batchTimeoutMS: 500,
Expand Down
26 changes: 18 additions & 8 deletions src/tokens/tokens.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import {
encodeHex,
encodeHexIDForURI,
isFungible,
packStreamName,
packSubscriptionName,
packTokenId,
unpackSubscriptionName,
Expand Down Expand Up @@ -125,13 +126,14 @@ export class TokensService {
this.instancePath,
this.stream.id,
tokenCreateEvent,
packSubscriptionName(this.topic, BASE_SUBSCRIPTION_NAME),
packSubscriptionName(this.topic, this.instancePath, BASE_SUBSCRIPTION_NAME, tokenCreateEvent),
);
}

private async getStream() {
if (this.stream === undefined) {
this.stream = await this.eventstream.createOrUpdateStream(this.topic);
const name = packStreamName(this.topic, this.instancePath);
this.stream = await this.eventstream.createOrUpdateStream(name, this.topic);
}
return this.stream;
}
Expand All @@ -145,9 +147,17 @@ export class TokensService {
* TODO: eventually this migration logic can be pruned
*/
async migrate() {
const name = packStreamName(this.topic, this.instancePath);
const streams = await this.eventstream.getStreams();
const existingStream = streams.find(s => s.name === this.topic);
const existingStream = streams.find(s => s.name === name);
if (existingStream === undefined) {
// Look for the old stream name (topic alone)
const oldStream = streams.find(s => s.name === this.topic);
if (oldStream !== undefined) {
this.logger.warn('Old event stream found - deleting and recreating');
await this.eventstream.deleteStream(oldStream.id);
await this.init();
}
return;
}
const subscriptions = await this.eventstream.getSubscriptions();
Expand Down Expand Up @@ -175,7 +185,7 @@ export class TokensService {
this.logger.warn('Incorrect event stream subscriptions found - deleting and recreating');
await this.eventstream.deleteStream(existingStream.id);
await this.init();
break;
return;
}
}
}
Expand Down Expand Up @@ -231,28 +241,28 @@ export class TokensService {
this.instancePath,
stream.id,
tokenCreateEvent,
packSubscriptionName(this.topic, dto.poolId, tokenCreateEvent),
packSubscriptionName(this.topic, this.instancePath, dto.poolId, tokenCreateEvent),
dto.transaction?.blockNumber ?? '0',
),
this.eventstream.getOrCreateSubscription(
this.instancePath,
stream.id,
transferSingleEvent,
packSubscriptionName(this.topic, dto.poolId, transferSingleEvent),
packSubscriptionName(this.topic, this.instancePath, dto.poolId, transferSingleEvent),
dto.transaction?.blockNumber ?? '0',
),
this.eventstream.getOrCreateSubscription(
this.instancePath,
stream.id,
transferBatchEvent,
packSubscriptionName(this.topic, dto.poolId, transferBatchEvent),
packSubscriptionName(this.topic, this.instancePath, dto.poolId, transferBatchEvent),
dto.transaction?.blockNumber ?? '0',
),
this.eventstream.getOrCreateSubscription(
this.instancePath,
stream.id,
approvalForAllEvent,
packSubscriptionName(this.topic, dto.poolId, approvalForAllEvent),
packSubscriptionName(this.topic, this.instancePath, dto.poolId, approvalForAllEvent),
// Block number is 0 because it is important to receive all approval events,
// so existing approvals will be reflected in the newly created pool
'0',
Expand Down
26 changes: 14 additions & 12 deletions src/tokens/tokens.util.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
decodeHex,
encodeHex,
encodeHexIDForURI,
packStreamName,
packSubscriptionName,
packTokenId,
unpackSubscriptionName,
Expand Down Expand Up @@ -67,29 +68,30 @@ describe('Util', () => {
});
});

it('packStreamName', () => {
expect(packStreamName('token', '0x123')).toEqual('token:0x123');
});

it('packSubscriptionName', () => {
expect(packSubscriptionName('token', 'F1')).toEqual('token:F1');
expect(packSubscriptionName('token', 'N1', 'create')).toEqual('token:N1:create');
expect(packSubscriptionName('tok:en', 'N1', 'create')).toEqual('tok:en:N1:create');
expect(packSubscriptionName('token', '0x123', 'F1', 'create')).toEqual('token:0x123:F1:create');
expect(packSubscriptionName('tok:en', '0x123', 'N1', 'create')).toEqual(
'tok:en:0x123:N1:create',
);
});

it('unpackSubscriptionName', () => {
expect(unpackSubscriptionName('token', 'token:F1')).toEqual({
expect(unpackSubscriptionName('token', 'token:0x123:F1:create')).toEqual({
prefix: 'token',
instancePath: '0x123',
poolId: 'F1',
});
expect(unpackSubscriptionName('token', 'token:N1:create')).toEqual({
prefix: 'token',
poolId: 'N1',
event: 'create',
});
expect(unpackSubscriptionName('tok:en', 'tok:en:N1:create')).toEqual({
expect(unpackSubscriptionName('tok:en', 'tok:en:0x123:N1:create')).toEqual({
prefix: 'tok:en',
instancePath: '0x123',
poolId: 'N1',
event: 'create',
});
expect(unpackSubscriptionName('token', 'bad:N1:create')).toEqual({
prefix: 'token',
});
expect(unpackSubscriptionName('token', 'bad:N1:create')).toEqual({});
});
});
31 changes: 21 additions & 10 deletions src/tokens/tokens.util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,20 +68,31 @@ export function unpackTokenId(id: string) {
};
}

export function packSubscriptionName(prefix: string, poolId: string, event?: string) {
if (event === undefined) {
return [prefix, poolId].join(':');
}
return [prefix, poolId, event].join(':');
export function packStreamName(prefix: string, instancePath: string) {
return [prefix, instancePath].join(':');
}

export function packSubscriptionName(
prefix: string,
instancePath: string,
poolId: string,
event: string,
) {
return [prefix, instancePath, poolId, event].join(':');
}

export function unpackSubscriptionName(prefix: string, data: string) {
const parts = data.startsWith(prefix + ':')
? data.slice(prefix.length + 1).split(':', 2)
: undefined;
if (!data.startsWith(prefix + ':')) {
return {};
}
const parts = data.slice(prefix.length + 1).split(':');
if (parts.length !== 3) {
return {};
}
return {
prefix,
poolId: parts?.[0],
event: parts?.[1],
instancePath: parts[0],
poolId: parts[1],
event: parts[2],
};
}
29 changes: 14 additions & 15 deletions test/app.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import {
} from '../src/tokens/tokens.interfaces';
import { TokensService } from '../src/tokens/tokens.service';
import { WebSocketMessage } from '../src/websocket-events/websocket-events.base';
import { packSubscriptionName } from '../src/tokens/tokens.util';
import { AppModule } from './../src/app.module';

const BASE_URL = 'http://eth';
Expand Down Expand Up @@ -403,7 +404,7 @@ describe('AppController (e2e)', () => {

it('Websocket: token pool event', () => {
eventstream.getSubscription.mockReturnValueOnce(<EventStreamSubscription>{
name: TOPIC + ':F1',
name: packSubscriptionName(TOPIC, '0x123', 'F1', ''),
});

return server
Expand Down Expand Up @@ -461,7 +462,7 @@ describe('AppController (e2e)', () => {

it('Websocket: token pool event from base subscription', () => {
eventstream.getSubscription.mockReturnValueOnce(<EventStreamSubscription>{
name: TOPIC + ':base',
name: packSubscriptionName(TOPIC, '0x123', 'base', ''),
});

return server
Expand Down Expand Up @@ -519,7 +520,7 @@ describe('AppController (e2e)', () => {

it('Websocket: token mint event', async () => {
eventstream.getSubscription.mockReturnValueOnce(<EventStreamSubscription>{
name: TOPIC + ':F1',
name: packSubscriptionName(TOPIC, '0x123', 'F1', ''),
});

http.get = jest.fn(
Expand Down Expand Up @@ -611,7 +612,7 @@ describe('AppController (e2e)', () => {

it('Websocket: token burn event', async () => {
eventstream.getSubscription.mockReturnValueOnce(<EventStreamSubscription>{
name: TOPIC + ':N1',
name: packSubscriptionName(TOPIC, '0x123', 'N1', ''),
});

http.get = jest.fn(
Expand Down Expand Up @@ -702,7 +703,7 @@ describe('AppController (e2e)', () => {

it('Websocket: token transfer event', async () => {
eventstream.getSubscription.mockReturnValueOnce(<EventStreamSubscription>{
name: TOPIC + ':N1',
name: packSubscriptionName(TOPIC, '0x123', 'N1', ''),
});

http.get = jest.fn(
Expand Down Expand Up @@ -780,7 +781,7 @@ describe('AppController (e2e)', () => {

it('Websocket: token approval event', async () => {
eventstream.getSubscription.mockReturnValueOnce(<EventStreamSubscription>{
name: TOPIC + ':N1',
name: packSubscriptionName(TOPIC, '0x123', 'N1', ''),
});

await server
Expand Down Expand Up @@ -841,9 +842,8 @@ describe('AppController (e2e)', () => {
});

it('Websocket: token transfer event from wrong pool', () => {
eventstream.getSubscription
.mockReturnValueOnce(<EventStreamSubscription>{ name: TOPIC + ':N1' })
.mockReturnValueOnce(<EventStreamSubscription>{ name: TOPIC + ':N1' });
const sub = <EventStreamSubscription>{ name: packSubscriptionName(TOPIC, '0x123', 'N1', '') };
eventstream.getSubscription.mockReturnValueOnce(sub).mockReturnValueOnce(sub);

return server
.ws('/api/ws')
Expand Down Expand Up @@ -893,7 +893,7 @@ describe('AppController (e2e)', () => {

it('Websocket: token batch transfer', async () => {
eventstream.getSubscription.mockReturnValueOnce(<EventStreamSubscription>{
name: TOPIC + ':N1',
name: packSubscriptionName(TOPIC, '0x123', 'N1', ''),
});

http.get = jest.fn(
Expand Down Expand Up @@ -1077,7 +1077,7 @@ describe('AppController (e2e)', () => {
};

eventstream.getSubscription.mockReturnValueOnce(<EventStreamSubscription>{
name: TOPIC + ':F1',
name: packSubscriptionName(TOPIC, '0x123', 'F1', ''),
});

await server
Expand Down Expand Up @@ -1116,7 +1116,7 @@ describe('AppController (e2e)', () => {
};

eventstream.getSubscription.mockReturnValueOnce(<EventStreamSubscription>{
name: TOPIC + ':F1',
name: packSubscriptionName(TOPIC, '0x123', 'F1', ''),
});

const ws1 = server.ws('/api/ws');
Expand Down Expand Up @@ -1173,9 +1173,8 @@ describe('AppController (e2e)', () => {
},
};

eventstream.getSubscription
.mockReturnValueOnce(<EventStreamSubscription>{ name: TOPIC + ':F1' })
.mockReturnValueOnce(<EventStreamSubscription>{ name: TOPIC + ':F1' });
const sub = <EventStreamSubscription>{ name: packSubscriptionName(TOPIC, '0x123', 'F1', '') };
eventstream.getSubscription.mockReturnValueOnce(sub).mockReturnValueOnce(sub);

const ws1 = server.ws('/api/ws');
const ws2 = server.ws('/api/ws');
Expand Down

0 comments on commit d0cee54

Please sign in to comment.