-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathpostgres-pubsub.js
55 lines (52 loc) · 1.55 KB
/
postgres-pubsub.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
const { PubSub } = require("graphql-subscriptions");
const pgIPC = require("pg-ipc");
const { Client } = require("pg");
const {
eventEmitterAsyncIterator
} = require("./event-emitter-to-async-iterator");
const defaultCommonMessageHandler = message => message;
class PostgresPubSub extends PubSub {
constructor(options = {}) {
const { commonMessageHandler, client, ...pgOptions } = options;
super();
this.client = client || new Client(pgOptions);
if (!client) {
this.client.connect();
}
this.ee = new pgIPC(this.client);
this.subscriptions = {};
this.subIdCounter = 0;
this.commonMessageHandler =
commonMessageHandler || defaultCommonMessageHandler;
}
publish(triggerName, payload) {
this.ee.notify(triggerName, payload);
return true;
}
subscribe(triggerName, onMessage) {
const callback = message => {
onMessage(
message instanceof Error
? message
: this.commonMessageHandler(message.payload)
);
};
this.ee.on(triggerName, callback);
this.subIdCounter = this.subIdCounter + 1;
this.subscriptions[this.subIdCounter] = [triggerName, callback];
return Promise.resolve(this.subIdCounter);
}
unsubscribe(subId) {
const [triggerName, onMessage] = this.subscriptions[subId];
delete this.subscriptions[subId];
this.ee.removeListener(triggerName, onMessage);
}
asyncIterator(triggers) {
return eventEmitterAsyncIterator(
this.ee,
triggers,
this.commonMessageHandler
);
}
}
module.exports = { PostgresPubSub };