-
Notifications
You must be signed in to change notification settings - Fork 1
/
cli.js
112 lines (101 loc) · 2.64 KB
/
cli.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
#! /usr/bin/env node
'use strict'
const minimist = require('minimist')
const pump = require('pump')
const fs = require('fs')
const path = require('path')
const pinoKafka = require('./pkafka')
const util = require("util");
function keysToDotNotation(obj, current, final) {
if(!final) {
final = {}
}
for (var key in obj) {
var value = obj[key];
var newKey = (current ? current + "." + key : key); // joined key with dot
if (value && typeof value === "object") {
keysToDotNotation(value, newKey, final); // it's a nested object, so do it again
} else {
final[newKey] = value; // it's not an object, so set the property
}
}
return final
}
function start (opts) {
if(opts.kafka){
if(typeof opts.kafka !== 'object'){
throw new Error('Kafka options must be an object')
}
opts.kafka = keysToDotNotation(opts.kafka)
}
if (opts.help) {
console.log(fs.readFileSync(path.join(__dirname, './help.txt'), 'utf8'))
return
}
if (opts.version) {
console.log('pino-kafka', require('./package.json').version)
return
}
if (opts.settings) {
try {
const loadedSettings = require(path.resolve(opts.settings))
const settings = Object.assign(loadedSettings, argv)
opts = Object.assign(opts, settings)
} catch (e) {
console.error('`settings` parameter specified but could not load file: %s', e.message)
process.exit(1)
}
}
const stream = pinoKafka(opts)
pump(process.stdin, stream)
function terminator( sig ) {
if ( typeof sig === 'string' ) {
flushKafkaQueue(function (err) {
process.exit(err ? 1 : 0);
});
}
}
async function flushKafkaQueue( callback ){
stream._kafka.flush(opts.timeout, err => {
if(err){
process.stderr.write(util.format.apply(this, err) + '\n')
}
callback(err)
})
}
['SIGHUP', 'SIGINT', 'SIGQUIT', 'SIGILL', 'SIGTRAP', 'SIGABRT',
'SIGBUS', 'SIGFPE', 'SIGUSR1', 'SIGSEGV', 'SIGUSR2', 'SIGTERM'
].forEach(function (sig) {
process.on(sig, function () {
try {
terminator(sig);
} catch (e) {
process.exit(1)
}
});
});
}
start(minimist(process.argv.slice(2), {
alias: {
version: 'v',
help: 'h',
brokers: 'b',
defaultTopic: 'd',
settings: 's',
echo: 'e',
timeout: 't'
},
default: {
kafka: {
'compression.codec':'none',
'enable.idempotence': 'true',
'max.in.flight.requests.per.connection': 4,
'message.send.max.retries': 10000000,
'acks': 'all'
},
echo: false,
timeout: 10000,
brokers: '10.6.25.11:9092, 10.6.25.12:9092',
defaultTopic: 'blackbox'
}
}))