forked from polyma/seneca-kafka-transport
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathkafka-transport.js
87 lines (76 loc) · 2.42 KB
/
kafka-transport.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
'use strict';
var nid = require('nid');
module.exports = function(options) {
var seneca = this;
var plugin = 'kafka-transport';
var listenBus;
var clientBus;
if (!seneca.hasplugin('transport')) {
seneca.use('transport');
}
function hookListenQueue(args, done) {
listenBus = require('microbial')(options.microbial);
var handlerFn = function(req, res) {
seneca.act(req.request.act, function(err, result){
var outmsg = {kind:'res',
id:req.request.id,
err:err?err.message:null,
res:result};
res.respond(outmsg);
});
};
listenBus.run([{group: options.kafka.group, topicName: options.kafka.requestTopic}],
[{ match: { kind: 'act' }, execute: handlerFn}], function(err) {
if (err) { return console.log(err); }
seneca.log.info('listen', args.host, args.port, seneca.toString());
done();
});
}
function hookClientQueue(args, done) {
var seneca = this;
var callmap = {};
clientBus = require('microbial')(options.microbial);
clientBus.run([{group: options.kafka.group, topicName: options.kafka.responseTopic, responseChannel: true}], [], function(err) {
if (err) {
console.log(err);
}
else {
var client = function(args, done) {
var outmsg = {
id: nid(),
kind: 'act',
act: args
};
callmap[outmsg.id] = {done:done};
clientBus.request({topicName: options.kafka.requestTopic}, outmsg, function(res) {
var call = callmap[res.response.id];
if( call ) {
delete callmap[res.response.id];
call.done(res.response.err ? new Error(res.response.err) : null, res.response.res);
}
});
};
seneca.log.info('client', 'pubsub', args.host, args.port, seneca.toString());
done(null,client);
}
});
}
var shutdown = function(args, done) {
if (listenBus) {
listenBus.tearDown(function(err) {
done(err);
});
}
else if (clientBus) {
clientBus.tearDown(function(err) {
done(err);
});
}
};
seneca.add({role:'transport',hook:'listen',type:'kafka'}, hookListenQueue);
seneca.add({role:'transport',hook:'client',type:'kafka'}, hookClientQueue);
seneca.add({role:'seneca',cmd:'close'}, shutdown);
return {
name: plugin,
};
};