-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconsumer.js
46 lines (40 loc) · 1.13 KB
/
consumer.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
const { Kafka } = require('kafkajs');
const { SchemaRegistry } = require('@kafkajs/confluent-schema-registry');
const { "kafka": kafkaConfig, registry } = require("./config");
const schemaRegistry = new SchemaRegistry({
host: registry.host,
auth: {
username: registry.username,
password: registry.password
}
});
const kafka = new Kafka({
clientId: kafkaConfig.clientId,
brokers: kafkaConfig.brokers,
ssl: true,
sasl: {
mechanism: 'plain',
username: kafkaConfig.username,
password: kafkaConfig.password
}
});
const consumer = kafka.consumer({ groupId: 'test-group' })
const run = async () => {
// Producing
console.log("Connecting...");
await consumer.connect()
console.log("Connected...");
console.log("Recieving...");
await consumer.subscribe({ topic: kafkaConfig.topicName, fromBeginning: true })
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log("New message recieved...");
console.log({
partition,
offset: message.offset,
value: await schemaRegistry.decode(message.value),
})
},
})
}
run().catch(console.error)