Generally speaking, The Modern C++ Kafka API
is quite similar with Kafka Java's API
We'd recommend users to cross-reference them, --especially the examples.
Unlike Java's KafkaConsumer, here we introduced two derived classes, --KafkaAutoCommitConsumer and KafkaManualCommitConsumer, --depending on whether users should call commit
manually.
-
Automatically commits previously polled offsets on each
poll
(and the finalclose
) operations.- Note, the internal
offset commit
is asynchronous, and is not guaranteed to succeed. It's supposed to be triggered (within eachpoll
operation) periodically, thus the occasional failure doesn't quite matter.
- Note, the internal
// Create configuration object
kafka::Properties props ({
{"bootstrap.servers", brokers},
{"enable.auto.commit", "true"}
});
// Create a consumer instance
kafka::clients::KafkaConsumer consumer(props);
// Subscribe to topics
consumer.subscribe({topic});
// Read messages from the topic
std::cout << "% Reading messages from topic: " << topic << std::endl;
while (true) {
auto records = consumer.poll(std::chrono::milliseconds(100));
for (const auto& record: records) {
// In this example, quit on empty message
if (record.value().size() == 0) return 0;
if (!record.error()) {
std::cout << "% Got a new message..." << std::endl;
std::cout << " Topic : " << record.topic() << std::endl;
std::cout << " Partition: " << record.partition() << std::endl;
std::cout << " Offset : " << record.offset() << std::endl;
std::cout << " Timestamp: " << record.timestamp().toString() << std::endl;
std::cout << " Headers : " << kafka::toString(record.headers()) << std::endl;
std::cout << " Key [" << record.key().toString() << "]" << std::endl;
std::cout << " Value [" << record.value().toString() << "]" << std::endl;
} else {
std::cerr << record.toString() << std::endl;
}
}
}
// consumer.close(); // No explicit close is needed, RAII will take care of it
-
bootstrap.servers
property is mandatory for a Kafka client. -
subscribe
could take a topic list. It's a block operation, would wait the consumer to get partitions assigned. -
poll
must be called periodically, thus to trigger kinds of callback handling internally. In practice, it could be put in a "while loop". -
At the end, we could
close
the consumer explicitly, or just leave it to the destructor.
- Users must commit the offsets for received records manually.
// Create configuration object
kafka::Properties props ({
{"bootstrap.servers", brokers},
});
// Create a consumer instance
kafka::clients::KafkaConsumer consumer(props);
// Subscribe to topics
consumer.subscribe({topic});
auto lastTimeCommitted = std::chrono::steady_clock::now();
// Read messages from the topic
std::cout << "% Reading messages from topic: " << topic << std::endl;
bool allCommitted = true;
bool running = true;
while (running) {
auto records = consumer.poll(std::chrono::milliseconds(100));
for (const auto& record: records) {
// In this example, quit on empty message
if (record.value().size() == 0) {
running = false;
break;
}
if (!record.error()) {
std::cout << "% Got a new message..." << std::endl;
std::cout << " Topic : " << record.topic() << std::endl;
std::cout << " Partition: " << record.partition() << std::endl;
std::cout << " Offset : " << record.offset() << std::endl;
std::cout << " Timestamp: " << record.timestamp().toString() << std::endl;
std::cout << " Headers : " << kafka::toString(record.headers()) << std::endl;
std::cout << " Key [" << record.key().toString() << "]" << std::endl;
std::cout << " Value [" << record.value().toString() << "]" << std::endl;
allCommitted = false;
} else {
std::cerr << record.toString() << std::endl;
}
}
if (!allCommitted) {
auto now = std::chrono::steady_clock::now();
if (now - lastTimeCommitted > std::chrono::seconds(1)) {
// Commit offsets for messages polled
std::cout << "% syncCommit offsets: " << kafka::utility::getCurrentTime() << std::endl;
consumer.commitSync(); // or commitAsync()
lastTimeCommitted = now;
allCommitted = true;
}
}
}
// consumer.close(); // No explicit close is needed, RAII will take care of it
-
The example is quite similar with the KafkaAutoCommitConsumer, with only 1 more line added for manual-commit.
-
commitSync
andcommitAsync
are both available for a KafkaManualConsumer. Normally, usecommitSync
to guarantee the commitment, or usecommitAsync
(withOffsetCommitCallback
) to get a better performance.
While we construct a KafkaConsumer
with kafka::clients::KafkaClient::EventsPollingOption::Auto
(i.e. the default option), an internal thread would be created for OffsetCommit
callbacks handling.
This might not be what you want, since then you have to use 2 different threads to process the messages and handle the OffsetCommit
responses.
Here we have another choice, -- using kafka::clients::KafkaClient::EventsPollingOption::Manual
, thus the OffsetCommit
callbacks would be called within member function pollEvents()
.
KafkaConsumer consumer(props, kafka::clients::KafkaClient::EventsPollingOption::Manual);
consumer.subscribe({"topic1", "topic2"});
while (true) {
auto records = consumer.poll(std::chrono::milliseconds(100));
for (auto& record: records) {
// Process the message...
process(record);
// Here we commit the offset manually
consumer.commitSync(*record);
}
// Here we call the `OffsetCommit` callbacks
// Note, we can only do this while the consumer was constructed with `EventsPollingOption::Manual`.
consumer.pollEvents();
}
No exception would be thrown from a consumer's poll
operation.
Instead, once an error occurs, the Error
would be embedded in the Consumer::ConsumerRecord
.
About Error
's value()
s, there are 2 cases
-
Success
-
RD_KAFKA_RESP_ERR__NO_ERROR
(0
), -- got a message successfully -
RD_KAFKA_RESP_ERR__PARTITION_EOF
, -- reached the end of a partition (no message got)
-
-
Failure
-
What're the available configurations?
-
How to enhance the polling performance?
ConsumerConfig::QUEUED_MIN_MESSAGES
determines how frequently the consumer would send the FetchRequest towards brokers. The default configuration (i.e, 100000) might not be good enough for small (less than 1KB) messages, and suggest using a larger value (e.g, 1000000) for it. -
How many threads would be created by a KafkaConsumer?
Excluding the user's main thread, if
enable.auto.commit
isfalse
, theKafkaConsumer
would start another (N + 2) threads in the background; otherwise, theKafkaConsumer
would start (N + 3) background threads. (N means the number of BOOTSTRAP_SERVERS)-
Each broker (in the list of BOOTSTRAP_SERVERS) would take a seperate thread to transmit messages towards a kafka cluster server.
-
Another 3 threads will handle internal operations, consumer group operations, and kinds of timers, etc.
-
To enable the auto commit, one more thread would be create, which keeps polling/processing the offset-commit callback event.
E.g, if a KafkaConsumer was created with property of
BOOTSTRAP_SERVERS=127.0.0.1:8888,127.0.0.1:8889,127.0.0.1:8890
, it would take 6 threads in total (including the main thread). -
-
Which one of these threads will handle the callbacks?
There are 2 kinds of callbacks for a KafkaConsumer,
-
RebalanceCallback
will be triggered internally by the user's thread, -- within thepoll
function. -
If
enable.auto.commit=true
, theOffsetCommitCallback
will be triggered by the user'spoll
thread; otherwise, it would be triggered by a background thread.
-