-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathconsumer.js
73 lines (59 loc) · 1.69 KB
/
consumer.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
const kafka = require('kafka-node');
const config = require('../config');
const logger = require('../modules/logger');
const utility = require('./utility');
const auditLogController = require('../controllers');
const client = new kafka.KafkaClient({
kafkaHost: config.kafkaHost
});
const consumer = new kafka.Consumer(client, [
{
topic: config.kafkaTopic
}
], {
autoCommit: false
});
process.on('SIGINT', () => {
consumer.close(true, () => { // true -> commit the current offset
process.exit();
});
});
consumer.on('error', (error) => {
logger.error('consumer error: ', error);
});
consumer.on('message', (message) => {
logger.debug('consumer message: ', message);
// processing data
const params = JSON.parse(message.value);
const auditLog = {
tableId: params.tableId,
rowId: params.rowId,
changeHistory: {}
};
if (params.ipAddress) {
auditLog.changeHistory.ipAddress = params.ipAddress;
}
if (params.user) {
auditLog.changeHistory.user = params.user;
}
if (params.miscellaneous) {
auditLog.changeHistory.miscellaneous = params.miscellaneous;
}
auditLog.changeHistory.log =
utility.calculateDiff(params.oldData, params.newData);
auditLogController.createOrUpdate(auditLog)
.then(([auditLog, responseCode]) => {
logger.debug('saved log', auditLog);
consumer.commit((error, data) => {
if (error) {
logger.error('consumer.commit: ', error);
} else {
logger.debug('commit success: ', data);
}
});
}).catch(([err, responseCode]) => {
if (typeof(err) !== 'string') {
logger.error('consumer auditLogController.createOrUpdate', err);
}
});
});