Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
kishansairam9 committed Mar 26, 2024
1 parent d972f20 commit 17f50bf
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.hypertrace.core.kafka.event.listener;

import com.typesafe.config.Config;
import java.util.Queue;
import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -30,26 +30,25 @@
* for sample usage and test. Note that testing requires Thread.sleep > poll timeout in between
*/
public class KafkaLiveEventListener<K, V> implements AutoCloseable {
Queue<BiConsumer<? super K, ? super V>> callbacks;
private final KafkaLiveEventListenerCallable<K, V> kafkaLiveEventListenerCallable;
private final Future<Void> kafkaLiveEventListenerCallableFuture;
private final ExecutorService executorService;
private final boolean cleanupExecutor;

private KafkaLiveEventListener(
KafkaLiveEventListenerCallable<K, V> kafkaLiveEventListenerCallable,
Queue<BiConsumer<? super K, ? super V>> callbacks,
ExecutorService executorService,
boolean cleanupExecutor) {
this.callbacks = callbacks;
this.executorService = executorService;
this.cleanupExecutor = cleanupExecutor;
this.kafkaLiveEventListenerCallable = kafkaLiveEventListenerCallable;
this.kafkaLiveEventListenerCallableFuture =
executorService.submit(kafkaLiveEventListenerCallable);
}

public KafkaLiveEventListener<K, V> registerCallback(
BiConsumer<? super K, ? super V> callbackFunction) {
callbacks.add(callbackFunction);
kafkaLiveEventListenerCallable.addCallback(callbackFunction);
return this;
}

Expand All @@ -63,15 +62,16 @@ public void close() throws Exception {
}

public static final class Builder<K, V> {
Queue<BiConsumer<? super K, ? super V>> deprecatedCallbacksFlow = new ConcurrentLinkedQueue<>();
ExecutorService executorService = Executors.newSingleThreadExecutor();
boolean cleanupExecutor =
private final Collection<BiConsumer<? super K, ? super V>> callbacks =
new ConcurrentLinkedQueue<>();
private ExecutorService executorService = Executors.newSingleThreadExecutor();
private boolean cleanupExecutor =
true; // if builder creates executor shutdown executor while closing event listener

/** use registerCallback on the built instance instead */
@Deprecated(forRemoval = true)
public Builder<K, V> registerCallback(BiConsumer<? super K, ? super V> callbackFunction) {
deprecatedCallbacksFlow.add(callbackFunction);
callbacks.add(callbackFunction);
return this;
}

Expand All @@ -84,15 +84,8 @@ public Builder<K, V> withExecutorService(

public KafkaLiveEventListener<K, V> build(
String consumerName, Config kafkaConfig, Consumer<K, V> kafkaConsumer) {
Queue<BiConsumer<? super K, ? super V>> callbacks;
if (deprecatedCallbacksFlow.size() > 0) {
callbacks = deprecatedCallbacksFlow;
} else {
callbacks = new ConcurrentLinkedQueue<>();
}
return new KafkaLiveEventListener<>(
new KafkaLiveEventListenerCallable<>(consumerName, kafkaConfig, kafkaConsumer, callbacks),
callbacks,
executorService,
cleanupExecutor);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
import com.typesafe.config.Config;
import io.micrometer.core.instrument.Counter;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -27,14 +28,15 @@ class KafkaLiveEventListenerCallable<K, V> implements Callable<Void> {
private final Consumer<K, V> kafkaConsumer;
private final Duration pollTimeout;
private final Counter errorCounter;
private final Queue<BiConsumer<? super K, ? super V>> callbacks;
private final ConcurrentLinkedQueue<BiConsumer<? super K, ? super V>> callbacks;

KafkaLiveEventListenerCallable(
String consumerName,
Config kafkaConfig,
Consumer<K, V> kafkaConsumer,
Queue<BiConsumer<? super K, ? super V>> callbacks) {
this.callbacks = callbacks;
Collection<BiConsumer<? super K, ? super V>> callbackCollection) {
this.callbacks = new ConcurrentLinkedQueue<>();
callbackCollection.forEach(this::addCallback);
this.pollTimeout =
kafkaConfig.hasPath(POLL_TIMEOUT)
? kafkaConfig.getDuration(POLL_TIMEOUT)
Expand All @@ -54,6 +56,10 @@ class KafkaLiveEventListenerCallable<K, V> implements Callable<Void> {
consumerName + "." + EVENT_CONSUMER_ERROR_COUNT, Collections.emptyMap());
}

void addCallback(BiConsumer<? super K, ? super V> callbackFunction) {
callbacks.add(callbackFunction);
}

@Override
public Void call() {
do {
Expand Down

0 comments on commit 17f50bf

Please sign in to comment.