-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathindex.ts
98 lines (82 loc) · 2.55 KB
/
index.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import {
AckPolicy,
jetstream,
jetstreamManager,
type JetStreamClient,
type JetStreamManager,
type JsMsg,
} from "@nats-io/jetstream";
import { connect, nanos } from "@nats-io/transport-node";
const MAX_CONCURRNECY = 5;
const NUMBER_OF_CONCURRENT_MESSAGES_TO_PUBLISH = 10;
// Create logger helper
const logger = (message: string) => {
console.log(`[${new Date().toISOString()}] ${message}`);
};
async function main() {
logger("Connecting to NATS...");
const nc = await connect({
servers: "localhost:4223",
});
const jsm: JetStreamManager = await jetstreamManager(nc);
const js: JetStreamClient = jetstream(nc);
const streamName = "TEST_STREAM";
const subject = "test";
try {
await jsm.streams.delete(streamName);
} catch (error) {
logger(`Error deleting stream: ${error}`);
}
logger("Creating stream...");
await jsm.streams.add({
name: streamName,
subjects: [`${subject}.>`],
max_age: nanos(3 * 24 * 60 * 60 * 1000),
max_bytes: -1,
max_msgs: -1,
});
const createConsumer = async () => {
logger("Creating consumer...");
await jsm.consumers.add(streamName, {
name: "test-consumer",
durable_name: "test-consumer",
ack_policy: AckPolicy.Explicit,
filter_subject: `${subject}.>`,
max_deliver: -1,
max_ack_pending: -1,
max_batch: 10,
// we will wait for 1 millisecond for the consumer to ack the messages
// this will ensure that the messages don't get stuck as
// outstanding ack's
// ack_wait: nanos(1000 * 1),
max_waiting: 100_000,
});
logger("Consumer created");
};
const publishMessages = async () => {
logger("Publishing test messages...");
for (let i = 0; i < NUMBER_OF_CONCURRENT_MESSAGES_TO_PUBLISH; i++) {
await js.publish(`${subject}.${i}`, i.toString());
logger(`Published message ${i}`);
}
};
logger("Creating consumer instance...");
await createConsumer();
const consumer = await js.consumers.get(streamName, "test-consumer");
logger("Initializing message consumption...");
const messages = await consumer.consume({
max_messages: MAX_CONCURRNECY,
callback: async (message) => {
logger(`Processing message: ${message.data.toString()}`);
await new Promise((resolve) => setTimeout(resolve, 3_000));
logger(`Finished processing message: ${message.data.toString()}`);
message.ack();
},
});
console.log("Publishing messages...");
await publishMessages();
await nc.drain();
logger("Drained");
process.exit(0)
}
main().catch(console.error);