diff --git a/kafka-event-listener/src/main/java/org/hypertrace/core/kafka/event/listener/KafkaLiveEventListenerCallable.java b/kafka-event-listener/src/main/java/org/hypertrace/core/kafka/event/listener/KafkaLiveEventListenerCallable.java index 04ef672..1fd8fa5 100644 --- a/kafka-event-listener/src/main/java/org/hypertrace/core/kafka/event/listener/KafkaLiveEventListenerCallable.java +++ b/kafka-event-listener/src/main/java/org/hypertrace/core/kafka/event/listener/KafkaLiveEventListenerCallable.java @@ -16,18 +16,18 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.InterruptException; import org.hypertrace.core.serviceframework.metrics.PlatformMetricsRegistry; @Slf4j class KafkaLiveEventListenerCallable implements Callable { + private static final String EVENT_CONSUMER_ERROR_COUNT = "event.consumer.error.count"; - private final List topicPartitions; private final Consumer kafkaConsumer; private final Duration pollTimeout; private final Counter errorCounter; + private final String topic; private final ConcurrentLinkedQueue> callbacks; KafkaLiveEventListenerCallable( @@ -41,16 +41,9 @@ class KafkaLiveEventListenerCallable implements Callable { kafkaConfig.hasPath(POLL_TIMEOUT) ? kafkaConfig.getDuration(POLL_TIMEOUT) : Duration.ofSeconds(30); - String topic = kafkaConfig.getString(TOPIC_NAME); + this.topic = kafkaConfig.getString(TOPIC_NAME); this.kafkaConsumer = kafkaConsumer; - // fetch partitions and seek to end of partitions to consume live events - List partitions = kafkaConsumer.partitionsFor(topic); - topicPartitions = - partitions.stream() - .map(p -> new TopicPartition(p.topic(), p.partition())) - .collect(Collectors.toList()); - kafkaConsumer.assign(topicPartitions); - kafkaConsumer.seekToEnd(topicPartitions); + this.errorCounter = PlatformMetricsRegistry.registerCounter( consumerName + "." + EVENT_CONSUMER_ERROR_COUNT, Collections.emptyMap()); @@ -60,8 +53,20 @@ void addCallback(BiConsumer callbackFunction) { callbacks.add(callbackFunction); } + private List initializePartitions() { + // fetch partitions and seek to end of partitions to consume live events + List topicPartitions = + kafkaConsumer.partitionsFor(this.topic).stream() + .map(p -> new TopicPartition(p.topic(), p.partition())) + .collect(Collectors.toUnmodifiableList()); + kafkaConsumer.assign(topicPartitions); + kafkaConsumer.seekToEnd(topicPartitions); + return topicPartitions; + } + @Override public Void call() { + List topicPartitions = this.initializePartitions(); do { try { ConsumerRecords records = kafkaConsumer.poll(pollTimeout);