-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathpkafka.js
79 lines (68 loc) · 1.97 KB
/
pkafka.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
const stream = require('stream')
const Kafka = require('node-rdkafka')
const util = require("util");
module.exports = function pinoKafka(opts) {
opts.on = Object.assign({
error(err) {
process.stderr.write(util.format.apply(this, err) + '\n')
},
ready() {
},
disconnected(){},
'event'(){},
'event.log'(){},
'event.stats'(){},
'event.throttle'(){},
'delivery-report'(){},
}, opts.on || {})
const through = new stream.PassThrough()
const inputStream = process.stdin
through.pause()
const kafkaStream = new Kafka.HighLevelProducer({
...opts.kafka,
'metadata.broker.list': opts.brokers
})
kafkaStream.connect({
timeout: opts.timeout
}, (err) => {
if (err) {
opts.on.error.call(kafkaStream, err)
}
})
kafkaStream.on('ready', (info, metadata) => {
through.pipe(outputStream)
through.resume()
opts.on.ready.call(kafkaStream, info, metadata)
})
kafkaStream.on('disconnected', opts.on.disconnected.bind(kafkaStream))
kafkaStream.on('event', opts.on.event.bind(kafkaStream))
kafkaStream.on('event.log', opts.on['event.log'].bind(kafkaStream))
kafkaStream.on('event.stats', opts.on['event.stats'].bind(kafkaStream))
kafkaStream.on('event.error', opts.on.error.bind(kafkaStream))
kafkaStream.on('event.throttle', opts.on['event.throttle'].bind(kafkaStream))
kafkaStream.on('delivery-report', opts.on['delivery-report'].bind(kafkaStream))
const outputStream = new stream.Writable({
write(body, enc, cb) {
// TODO: remove new line delimeters
kafkaStream.produce(opts.defaultTopic,
null,
body,
null,
null,
(err, offset) => {
if (err) {
opts.on.error.call(kafkaStream, err)
cb(err)
} else {
if (opts.echo){
process.stdout.write(body)
}
cb()
}
})
}
})
inputStream.pipe(through)
through._kafka = kafkaStream
return through;
}