From f0f565ecdf80be2c171ab8ecea8a5b79cf59f514 Mon Sep 17 00:00:00 2001 From: Kishan Sairam Adapa Date: Tue, 26 Mar 2024 12:59:12 +0530 Subject: [PATCH] move register callback to non builder inference (#94) * move register callback to non builder inference * add comment * address comments --- .../listener/KafkaLiveEventListener.java | 28 +++++++++++-------- .../KafkaLiveEventListenerCallable.java | 13 +++++++-- .../listener/KafkaLiveEventListenerTest.java | 17 +++-------- 3 files changed, 30 insertions(+), 28 deletions(-) diff --git a/kafka-event-listener/src/main/java/org/hypertrace/core/kafka/event/listener/KafkaLiveEventListener.java b/kafka-event-listener/src/main/java/org/hypertrace/core/kafka/event/listener/KafkaLiveEventListener.java index f162c10..e92a66e 100644 --- a/kafka-event-listener/src/main/java/org/hypertrace/core/kafka/event/listener/KafkaLiveEventListener.java +++ b/kafka-event-listener/src/main/java/org/hypertrace/core/kafka/event/listener/KafkaLiveEventListener.java @@ -1,8 +1,8 @@ package org.hypertrace.core.kafka.event.listener; import com.typesafe.config.Config; -import java.util.ArrayList; -import java.util.List; +import java.util.Collection; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -30,6 +30,7 @@ * for sample usage and test. Note that testing requires Thread.sleep > poll timeout in between */ public class KafkaLiveEventListener implements AutoCloseable { + private final KafkaLiveEventListenerCallable kafkaLiveEventListenerCallable; private final Future kafkaLiveEventListenerCallableFuture; private final ExecutorService executorService; private final boolean cleanupExecutor; @@ -40,10 +41,17 @@ private KafkaLiveEventListener( boolean cleanupExecutor) { this.executorService = executorService; this.cleanupExecutor = cleanupExecutor; + this.kafkaLiveEventListenerCallable = kafkaLiveEventListenerCallable; this.kafkaLiveEventListenerCallableFuture = executorService.submit(kafkaLiveEventListenerCallable); } + public KafkaLiveEventListener registerCallback( + BiConsumer callbackFunction) { + kafkaLiveEventListenerCallable.addCallback(callbackFunction); + return this; + } + @Override public void close() throws Exception { kafkaLiveEventListenerCallableFuture.cancel(true); @@ -54,11 +62,14 @@ public void close() throws Exception { } public static final class Builder { - List> callbacks = new ArrayList<>(); - ExecutorService executorService = Executors.newSingleThreadExecutor(); - boolean cleanupExecutor = + private final Collection> callbacks = + new ConcurrentLinkedQueue<>(); + private ExecutorService executorService = Executors.newSingleThreadExecutor(); + private boolean cleanupExecutor = true; // if builder creates executor shutdown executor while closing event listener + /** use registerCallback on the built instance instead */ + @Deprecated(forRemoval = true) public Builder registerCallback(BiConsumer callbackFunction) { callbacks.add(callbackFunction); return this; @@ -73,17 +84,10 @@ public Builder withExecutorService( public KafkaLiveEventListener build( String consumerName, Config kafkaConfig, Consumer kafkaConsumer) { - assertCallbacksPresent(); return new KafkaLiveEventListener<>( new KafkaLiveEventListenerCallable<>(consumerName, kafkaConfig, kafkaConsumer, callbacks), executorService, cleanupExecutor); } - - private void assertCallbacksPresent() { - if (callbacks.isEmpty()) { - throw new IllegalArgumentException("no call backs are provided to KafkaLiveEventListener"); - } - } } } diff --git a/kafka-event-listener/src/main/java/org/hypertrace/core/kafka/event/listener/KafkaLiveEventListenerCallable.java b/kafka-event-listener/src/main/java/org/hypertrace/core/kafka/event/listener/KafkaLiveEventListenerCallable.java index 45951c7..04ef672 100644 --- a/kafka-event-listener/src/main/java/org/hypertrace/core/kafka/event/listener/KafkaLiveEventListenerCallable.java +++ b/kafka-event-listener/src/main/java/org/hypertrace/core/kafka/event/listener/KafkaLiveEventListenerCallable.java @@ -6,9 +6,11 @@ import com.typesafe.config.Config; import io.micrometer.core.instrument.Counter; import java.time.Duration; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.function.BiConsumer; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; @@ -26,14 +28,15 @@ class KafkaLiveEventListenerCallable implements Callable { private final Consumer kafkaConsumer; private final Duration pollTimeout; private final Counter errorCounter; - private final List> callbacks; + private final ConcurrentLinkedQueue> callbacks; KafkaLiveEventListenerCallable( String consumerName, Config kafkaConfig, Consumer kafkaConsumer, - List> callbacks) { - this.callbacks = callbacks; + Collection> callbackCollection) { + this.callbacks = new ConcurrentLinkedQueue<>(); + callbackCollection.forEach(this::addCallback); this.pollTimeout = kafkaConfig.hasPath(POLL_TIMEOUT) ? kafkaConfig.getDuration(POLL_TIMEOUT) @@ -53,6 +56,10 @@ class KafkaLiveEventListenerCallable implements Callable { consumerName + "." + EVENT_CONSUMER_ERROR_COUNT, Collections.emptyMap()); } + void addCallback(BiConsumer callbackFunction) { + callbacks.add(callbackFunction); + } + @Override public Void call() { do { diff --git a/kafka-event-listener/src/test/java/org/hypertrace/core/kafka/event/listener/KafkaLiveEventListenerTest.java b/kafka-event-listener/src/test/java/org/hypertrace/core/kafka/event/listener/KafkaLiveEventListenerTest.java index c7554f4..9384af0 100644 --- a/kafka-event-listener/src/test/java/org/hypertrace/core/kafka/event/listener/KafkaLiveEventListenerTest.java +++ b/kafka-event-listener/src/test/java/org/hypertrace/core/kafka/event/listener/KafkaLiveEventListenerTest.java @@ -22,22 +22,13 @@ class KafkaLiveEventListenerTest { @Test void testThrowOnInvalidInputs() { - // no callback - assertThrows( - IllegalArgumentException.class, - () -> - new KafkaLiveEventListener.Builder() - .build( - "", - ConfigFactory.parseMap(Map.of("topic.name", "")), - new MockConsumer<>(OffsetResetStrategy.LATEST))); // no topic name assertThrows( ConfigException.class, () -> new KafkaLiveEventListener.Builder() - .registerCallback((String k, Long v) -> System.out.println(k + ":" + v)) - .build("", ConfigFactory.empty(), new MockConsumer<>(OffsetResetStrategy.LATEST))); + .build("", ConfigFactory.empty(), new MockConsumer<>(OffsetResetStrategy.LATEST)) + .registerCallback((String k, Long v) -> System.out.println(k + ":" + v))); } @Test @@ -80,9 +71,9 @@ static class EventModificationCache { .buildAsync(this::load); eventListener = new KafkaLiveEventListener.Builder() + .build(consumerName, kafkaConfig, consumer) .registerCallback(this::actOnEvent) - .registerCallback(this::log) - .build(consumerName, kafkaConfig, consumer); + .registerCallback(this::log); } public void close() throws Exception {