Skip to content

Commit

Permalink
move register callback to non builder inference (#94)
Browse files Browse the repository at this point in the history
* move register callback to non builder inference

* add comment

* address comments
  • Loading branch information
Kishan Sairam Adapa authored Mar 26, 2024
1 parent df4f796 commit f0f565e
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 28 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package org.hypertrace.core.kafka.event.listener;

import com.typesafe.config.Config;
import java.util.ArrayList;
import java.util.List;
import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -30,6 +30,7 @@
* for sample usage and test. Note that testing requires Thread.sleep > poll timeout in between
*/
public class KafkaLiveEventListener<K, V> implements AutoCloseable {
private final KafkaLiveEventListenerCallable<K, V> kafkaLiveEventListenerCallable;
private final Future<Void> kafkaLiveEventListenerCallableFuture;
private final ExecutorService executorService;
private final boolean cleanupExecutor;
Expand All @@ -40,10 +41,17 @@ private KafkaLiveEventListener(
boolean cleanupExecutor) {
this.executorService = executorService;
this.cleanupExecutor = cleanupExecutor;
this.kafkaLiveEventListenerCallable = kafkaLiveEventListenerCallable;
this.kafkaLiveEventListenerCallableFuture =
executorService.submit(kafkaLiveEventListenerCallable);
}

public KafkaLiveEventListener<K, V> registerCallback(
BiConsumer<? super K, ? super V> callbackFunction) {
kafkaLiveEventListenerCallable.addCallback(callbackFunction);
return this;
}

@Override
public void close() throws Exception {
kafkaLiveEventListenerCallableFuture.cancel(true);
Expand All @@ -54,11 +62,14 @@ public void close() throws Exception {
}

public static final class Builder<K, V> {
List<BiConsumer<? super K, ? super V>> callbacks = new ArrayList<>();
ExecutorService executorService = Executors.newSingleThreadExecutor();
boolean cleanupExecutor =
private final Collection<BiConsumer<? super K, ? super V>> callbacks =
new ConcurrentLinkedQueue<>();
private ExecutorService executorService = Executors.newSingleThreadExecutor();
private boolean cleanupExecutor =
true; // if builder creates executor shutdown executor while closing event listener

/** use registerCallback on the built instance instead */
@Deprecated(forRemoval = true)
public Builder<K, V> registerCallback(BiConsumer<? super K, ? super V> callbackFunction) {
callbacks.add(callbackFunction);
return this;
Expand All @@ -73,17 +84,10 @@ public Builder<K, V> withExecutorService(

public KafkaLiveEventListener<K, V> build(
String consumerName, Config kafkaConfig, Consumer<K, V> kafkaConsumer) {
assertCallbacksPresent();
return new KafkaLiveEventListener<>(
new KafkaLiveEventListenerCallable<>(consumerName, kafkaConfig, kafkaConsumer, callbacks),
executorService,
cleanupExecutor);
}

private void assertCallbacksPresent() {
if (callbacks.isEmpty()) {
throw new IllegalArgumentException("no call backs are provided to KafkaLiveEventListener");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
import com.typesafe.config.Config;
import io.micrometer.core.instrument.Counter;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -26,14 +28,15 @@ class KafkaLiveEventListenerCallable<K, V> implements Callable<Void> {
private final Consumer<K, V> kafkaConsumer;
private final Duration pollTimeout;
private final Counter errorCounter;
private final List<BiConsumer<? super K, ? super V>> callbacks;
private final ConcurrentLinkedQueue<BiConsumer<? super K, ? super V>> callbacks;

KafkaLiveEventListenerCallable(
String consumerName,
Config kafkaConfig,
Consumer<K, V> kafkaConsumer,
List<BiConsumer<? super K, ? super V>> callbacks) {
this.callbacks = callbacks;
Collection<BiConsumer<? super K, ? super V>> callbackCollection) {
this.callbacks = new ConcurrentLinkedQueue<>();
callbackCollection.forEach(this::addCallback);
this.pollTimeout =
kafkaConfig.hasPath(POLL_TIMEOUT)
? kafkaConfig.getDuration(POLL_TIMEOUT)
Expand All @@ -53,6 +56,10 @@ class KafkaLiveEventListenerCallable<K, V> implements Callable<Void> {
consumerName + "." + EVENT_CONSUMER_ERROR_COUNT, Collections.emptyMap());
}

void addCallback(BiConsumer<? super K, ? super V> callbackFunction) {
callbacks.add(callbackFunction);
}

@Override
public Void call() {
do {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,13 @@ class KafkaLiveEventListenerTest {

@Test
void testThrowOnInvalidInputs() {
// no callback
assertThrows(
IllegalArgumentException.class,
() ->
new KafkaLiveEventListener.Builder<String, Long>()
.build(
"",
ConfigFactory.parseMap(Map.of("topic.name", "")),
new MockConsumer<>(OffsetResetStrategy.LATEST)));
// no topic name
assertThrows(
ConfigException.class,
() ->
new KafkaLiveEventListener.Builder<String, Long>()
.registerCallback((String k, Long v) -> System.out.println(k + ":" + v))
.build("", ConfigFactory.empty(), new MockConsumer<>(OffsetResetStrategy.LATEST)));
.build("", ConfigFactory.empty(), new MockConsumer<>(OffsetResetStrategy.LATEST))
.registerCallback((String k, Long v) -> System.out.println(k + ":" + v)));
}

@Test
Expand Down Expand Up @@ -80,9 +71,9 @@ static class EventModificationCache {
.buildAsync(this::load);
eventListener =
new KafkaLiveEventListener.Builder<String, Long>()
.build(consumerName, kafkaConfig, consumer)
.registerCallback(this::actOnEvent)
.registerCallback(this::log)
.build(consumerName, kafkaConfig, consumer);
.registerCallback(this::log);
}

public void close() throws Exception {
Expand Down

0 comments on commit f0f565e

Please sign in to comment.