Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add pipeline support for batching multiple publish calls #630

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,35 @@ pubSub.subscribe('Test', message => {
});
```

## Use a pipeline to batch publish calls

If you need to publish a number of triggers at the same time, you can batch them in a single HTTP request to redis using the pub/sub pipeline:

```javascript
const pubSub = new RedisPubSub({ ... });

// Create pipeline
const pipeline = pubSub.pipeline();

[1,2,3,4,5,6].forEach((id) => pipeline.publish('Test', {id}))

// Execute the pipeline to redis
await pipeline.exec();
```

The publish method is also chainable.

```javascript
const pubSub = new RedisPubSub({ ... });
const pipeline = pubSub.pipeline();

await pipeline
.publish('Test1', {})
.publish('Test2', {})
.publish('Test3', {})
.exec();
```

## Old Usage (Deprecated)

```javascript
Expand Down
39 changes: 39 additions & 0 deletions src/redis-pipeline.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import { ChainableCommander } from "ioredis";
import type { RedisClient, Serializer } from "./redis-pubsub";

export interface RedisPubSubPipelineOptions {
publisher: RedisClient;
serializer: Serializer;
}

/**
* Create a pipelined publisher that will send all the pub/sub messages in a single batch
*/
export class RedisPubSubPipeline {
private pipeline: ChainableCommander;
private publisher: RedisClient;
private serializer: Serializer;

constructor(options: RedisPubSubPipelineOptions) {
this.publisher = options.publisher;
this.serializer = options.serializer;

// Start pipeline
this.pipeline = this.publisher.pipeline();
}

/**
* Publish to the redis pipeline
*/
public publish<T>(trigger: string, payload: T): this {
this.pipeline.publish(trigger, this.serializer(payload));
return this;
}

/**
* Execute the entire pipeline
*/
public async exec() {
return this.pipeline.exec();
}
}
31 changes: 20 additions & 11 deletions src/redis-pubsub.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
import {Cluster, Redis, RedisOptions} from 'ioredis';
import {PubSubEngine} from 'graphql-subscriptions';
import {PubSubAsyncIterator} from './pubsub-async-iterator';
import { RedisPubSubPipeline } from "./redis-pipeline";

type RedisClient = Redis | Cluster;
export type RedisClient = Redis | Cluster;
type OnMessage<T> = (message: T) => void;
type DeserializerContext = { channel: string, pattern?: string };

function defaultSerializer(payload: unknown): string | Buffer {
if (payload instanceof Buffer) {
return payload;
}
return JSON.stringify(payload);
}

export interface PubSubRedisOptions {
connection?: RedisOptions | string;
triggerTransform?: TriggerTransform;
Expand All @@ -29,8 +37,8 @@ export class RedisPubSub implements PubSubEngine {
subscriber,
publisher,
reviver,
serializer,
deserializer,
serializer = defaultSerializer,
messageEventName = 'message',
pmessageEventName = 'pmessage',
} = options;
Expand Down Expand Up @@ -85,13 +93,14 @@ export class RedisPubSub implements PubSubEngine {
}

public async publish<T>(trigger: string, payload: T): Promise<void> {
if(this.serializer) {
await this.redisPublisher.publish(trigger, this.serializer(payload));
} else if (payload instanceof Buffer){
await this.redisPublisher.publish(trigger, payload);
} else {
await this.redisPublisher.publish(trigger, JSON.stringify(payload));
}
await this.redisPublisher.publish(trigger, this.serializer(payload));
}

public pipeline() {
return new RedisPubSubPipeline({
publisher: this.redisPublisher,
serializer: this.serializer,
});
}

public subscribe<T = any>(
Expand Down Expand Up @@ -191,7 +200,7 @@ export class RedisPubSub implements PubSubEngine {
]);
}

private readonly serializer?: Serializer;
private readonly serializer: Serializer;
private readonly deserializer?: Deserializer;
private readonly triggerTransform: TriggerTransform;
private readonly redisSubscriber: RedisClient;
Expand Down Expand Up @@ -251,5 +260,5 @@ export type TriggerTransform = (
channelOptions?: unknown,
) => string;
export type Reviver = (key: any, value: any) => any;
export type Serializer = (source: any) => string;
export type Serializer = (source: unknown) => string | Buffer;
export type Deserializer = (source: string | Buffer, context: DeserializerContext) => any;
62 changes: 62 additions & 0 deletions src/test/tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import * as chai from 'chai';
import * as chaiAsPromised from 'chai-as-promised';
import { spy, restore, stub } from 'simple-mock';
import { RedisPubSub } from '../redis-pubsub';
import { RedisPubSubPipeline } from "../redis-pipeline";
import * as IORedis from 'ioredis';

chai.use(chaiAsPromised);
Expand All @@ -17,20 +18,28 @@ const unsubscribeSpy = spy((channel, cb) => cb && cb(channel));
const psubscribeSpy = spy((channel, cb) => cb && cb(null, channel));
const punsubscribeSpy = spy((channel, cb) => cb && cb(channel));

const pipelinePublishSpy = spy(() => {});
const pipelineExecSpy = spy(() => Promise.resolve());
const pipelineSpy = spy(() => mockPipelineClient);
const quitSpy = spy(cb => cb);
const mockRedisClient = {
publish: publishSpy,
subscribe: subscribeSpy,
unsubscribe: unsubscribeSpy,
psubscribe: psubscribeSpy,
punsubscribe: punsubscribeSpy,
pipeline: pipelineSpy,
on: (event, cb) => {
if (event === 'message') {
listener = cb;
}
},
quit: quitSpy,
};
const mockPipelineClient = {
publish: pipelinePublishSpy,
exec: pipelineExecSpy,
};
const mockOptions = {
publisher: (mockRedisClient as any),
subscriber: (mockRedisClient as any),
Expand Down Expand Up @@ -459,6 +468,13 @@ describe('RedisPubSub', () => {

});

it('creates a pipline', (done) => {
const pubSub = new RedisPubSub(mockOptions);
const pipeline = pubSub.pipeline();
expect(pipeline).to.be.an.instanceOf(RedisPubSubPipeline);
done();
});

// TODO pattern subs

afterEach('Reset spy count', () => {
Expand Down Expand Up @@ -562,3 +578,49 @@ describe('PubSubAsyncIterator', () => {
});

});

describe('RedisPubSubPipeline', () => {
it('should publish to pipeline', () => {
const pubSub = new RedisPubSub(mockOptions);
const pipeline = pubSub.pipeline();

pipeline.publish('TOPIC', 'test');
expect(pipelinePublishSpy.lastCall.args).to.have.members([
'TOPIC',
'"test"',
]);
});

it('should be chainable', () => {
const pubSub = new RedisPubSub(mockOptions);
const pipeline = pubSub.pipeline();

const result = pipeline.publish('TOPIC', { hello: 'world' });
expect(result).to.be.an.instanceOf(RedisPubSubPipeline);
});

it('should use the serializer to transform the payload before publishing', () => {
const serializer = stub();
const serializedPayload = `{ 'hello': 'custom' }`;
serializer.returnWith(serializedPayload);

const pipeline = new RedisPubSubPipeline({
publisher: mockRedisClient as any,
serializer,
});
pipeline.publish('TOPIC', { hello: 'world' });

expect(serializer.callCount).to.equal(1);
expect(pipelinePublishSpy.lastCall.args).to.have.members([
'TOPIC',
serializedPayload,
]);
});

it('should execute the pipeline', async () => {
const pubSub = new RedisPubSub(mockOptions);
const pipeline = pubSub.pipeline();
await pipeline.exec();
expect(pipelineExecSpy.callCount).to.equal(1);
});
});