Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: make consumer the only public interface for live event listener #91

Merged
merged 4 commits into from
Dec 28, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package org.hypertrace.core.kafka.event.listener;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValue;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;

public class KafkaConsumerUtils {

Check warning on line 16 in kafka-event-listener/src/main/java/org/hypertrace/core/kafka/event/listener/KafkaConsumerUtils.java

View check run for this annotation

Codecov / codecov/patch

kafka-event-listener/src/main/java/org/hypertrace/core/kafka/event/listener/KafkaConsumerUtils.java#L16

Added line #L16 was not covered by tests
public static final String TOPIC_NAME = "topic.name"; // required key in kafkaConfig
public static final String POLL_TIMEOUT = "poll.timeout"; // defaults to 30s if not provided

/**
* Returns a kafka consumer for provided config and key value deserializers. Only one instance of
* consumer should be required per pod, ensure singleton.
*/
public static <K, V> Consumer<K, V> getKafkaConsumer(
Config kafkaConfig, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
return new KafkaConsumer<>(
getKafkaConsumerConfigs(kafkaConfig.withFallback(getDefaultKafkaConsumerConfigs())),

Check warning on line 27 in kafka-event-listener/src/main/java/org/hypertrace/core/kafka/event/listener/KafkaConsumerUtils.java

View check run for this annotation

Codecov / codecov/patch

kafka-event-listener/src/main/java/org/hypertrace/core/kafka/event/listener/KafkaConsumerUtils.java#L26-L27

Added lines #L26 - L27 were not covered by tests
keyDeserializer,
valueDeserializer);
}

private static Properties getKafkaConsumerConfigs(Config configs) {
Map<String, String> configMap = new HashMap<>();
Set<Map.Entry<String, ConfigValue>> entries = configs.entrySet();

Check warning on line 34 in kafka-event-listener/src/main/java/org/hypertrace/core/kafka/event/listener/KafkaConsumerUtils.java

View check run for this annotation

Codecov / codecov/patch

kafka-event-listener/src/main/java/org/hypertrace/core/kafka/event/listener/KafkaConsumerUtils.java#L33-L34

Added lines #L33 - L34 were not covered by tests
for (Map.Entry<String, ConfigValue> entry : entries) {
String key = entry.getKey();
configMap.put(key, configs.getString(key));
}
Properties props = new Properties();
props.putAll(configMap);
return props;

Check warning on line 41 in kafka-event-listener/src/main/java/org/hypertrace/core/kafka/event/listener/KafkaConsumerUtils.java

View check run for this annotation

Codecov / codecov/patch

kafka-event-listener/src/main/java/org/hypertrace/core/kafka/event/listener/KafkaConsumerUtils.java#L36-L41

Added lines #L36 - L41 were not covered by tests
}

private static Config getDefaultKafkaConsumerConfigs() {
Map<String, String> defaultKafkaConsumerConfigMap = new HashMap<>();
defaultKafkaConsumerConfigMap.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
defaultKafkaConsumerConfigMap.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
defaultKafkaConsumerConfigMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000");
defaultKafkaConsumerConfigMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
return ConfigFactory.parseMap(defaultKafkaConsumerConfigMap);

Check warning on line 52 in kafka-event-listener/src/main/java/org/hypertrace/core/kafka/event/listener/KafkaConsumerUtils.java

View check run for this annotation

Codecov / codecov/patch

kafka-event-listener/src/main/java/org/hypertrace/core/kafka/event/listener/KafkaConsumerUtils.java#L45-L52

Added lines #L45 - L52 were not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.serialization.Deserializer;

/**
* KafkaLiveEventListener consumes events produced to a single Kafka Topic from its initialisation
Expand All @@ -25,6 +24,10 @@
*
* <p>Typical usage of this listener is to back the remote caches to have lower latency of refresh
* by generating respective information on kafka topics.
*
* <p>Refer to
* org.hypertrace.core.kafka.event.listener.KafkaLiveEventListenerTest#testEventModificationCache()
* for sample usage and test. Note that testing requires Thread.sleep > poll timeout in between
*/
public class KafkaLiveEventListener<K, V> implements AutoCloseable {
private final Future<Void> kafkaLiveEventListenerCallableFuture;
Expand Down Expand Up @@ -77,19 +80,6 @@ public KafkaLiveEventListener<K, V> build(
cleanupExecutor);
}

public KafkaLiveEventListener<K, V> build(
String consumerName,
Config kafkaConfig,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer) {
assertCallbacksPresent();
return new KafkaLiveEventListener<>(
new KafkaLiveEventListenerCallable<>(
consumerName, kafkaConfig, keyDeserializer, valueDeserializer, callbacks),
executorService,
cleanupExecutor);
}

private void assertCallbacksPresent() {
if (callbacks.isEmpty()) {
throw new IllegalArgumentException("no call backs are provided to KafkaLiveEventListener");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,59 +1,33 @@
package org.hypertrace.core.kafka.event.listener;

import static org.hypertrace.core.kafka.event.listener.KafkaConsumerUtils.POLL_TIMEOUT;
import static org.hypertrace.core.kafka.event.listener.KafkaConsumerUtils.TOPIC_NAME;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValue;
import io.micrometer.core.instrument.Counter;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.hypertrace.core.serviceframework.metrics.PlatformMetricsRegistry;

@Slf4j
class KafkaLiveEventListenerCallable<K, V> implements Callable<Void> {
private static final String EVENT_CONSUMER_ERROR_COUNT = "event.consumer.error.count";
private static final String TOPIC_NAME = "topic.name";
private static final String POLL_TIMEOUT = "poll.timeout";
private final List<TopicPartition> topicPartitions;
private final Consumer<K, V> kafkaConsumer;
private final Duration pollTimeout;
private final Counter errorCounter;
private final List<BiConsumer<? super K, ? super V>> callbacks;

KafkaLiveEventListenerCallable(
String consumerName,
Config kafkaConfig,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer,
List<BiConsumer<? super K, ? super V>> callbacks) {
this(
consumerName,
kafkaConfig,
new KafkaConsumer<>(
getKafkaConsumerConfigs(kafkaConfig.withFallback(getDefaultKafkaConsumerConfigs())),
keyDeserializer,
valueDeserializer),
callbacks);
}

KafkaLiveEventListenerCallable(
String consumerName,
Config kafkaConfig,
Expand Down Expand Up @@ -107,27 +81,4 @@ public Void call() {

} while (true);
}

private static Config getDefaultKafkaConsumerConfigs() {
Map<String, String> defaultKafkaConsumerConfigMap = new HashMap<>();
defaultKafkaConsumerConfigMap.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
defaultKafkaConsumerConfigMap.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
defaultKafkaConsumerConfigMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000");
defaultKafkaConsumerConfigMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
return ConfigFactory.parseMap(defaultKafkaConsumerConfigMap);
}

private static Properties getKafkaConsumerConfigs(Config configs) {
Map<String, String> configMap = new HashMap<>();
Set<Entry<String, ConfigValue>> entries = configs.entrySet();
for (Entry<String, ConfigValue> entry : entries) {
String key = entry.getKey();
configMap.put(key, configs.getString(key));
}
Properties props = new Properties();
props.putAll(configMap);
return props;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package org.hypertrace.core.kafka.event.listener;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import lombok.Getter;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;

public class KafkaMockConsumerTestUtil<K, V> {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test utils can go in testFixtures so that production classes do not depend on them. Please refer to the following for a sample test fixture and its usage. https://github.com/Traceableai/activity-event-service/tree/main/policy-store-service-impl/src/testFixtures

Copy link
Author

@kishansairam9 kishansairam9 Dec 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, didn't know about them. Updated to use testFixtures

private final String topicName;
private final Map<TopicPartition, Long> currentOffsets;
@Getter private final MockConsumer<K, V> mockConsumer;

public KafkaMockConsumerTestUtil(String topicName, int numPartitions) {
this.topicName = topicName;
mockConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
List<PartitionInfo> partitionInfos =
IntStream.range(0, numPartitions)
.mapToObj(i -> getPartitionInfo(topicName, i))
.collect(Collectors.toUnmodifiableList());
mockConsumer.updatePartitions(topicName, partitionInfos);
currentOffsets =
IntStream.range(0, numPartitions)
.mapToObj(i -> getTopicPartition(topicName, i))
.collect(Collectors.toMap(Function.identity(), k -> 1L));
mockConsumer.updateEndOffsets(currentOffsets);
}

public void addRecord(K key, V value, int partition) {
Long latestOffset =
currentOffsets.computeIfPresent(getTopicPartition(topicName, partition), (k, v) -> v + 1);
if (Objects.isNull(latestOffset)) {
throw new UnsupportedOperationException(

Check warning on line 41 in kafka-event-listener/src/main/java/org/hypertrace/core/kafka/event/listener/KafkaMockConsumerTestUtil.java

View check run for this annotation

Codecov / codecov/patch

kafka-event-listener/src/main/java/org/hypertrace/core/kafka/event/listener/KafkaMockConsumerTestUtil.java#L41

Added line #L41 was not covered by tests
"cannot add to partition "
+ partition
+ ", total partitions is "
+ currentOffsets.size());

Check warning on line 45 in kafka-event-listener/src/main/java/org/hypertrace/core/kafka/event/listener/KafkaMockConsumerTestUtil.java

View check run for this annotation

Codecov / codecov/patch

kafka-event-listener/src/main/java/org/hypertrace/core/kafka/event/listener/KafkaMockConsumerTestUtil.java#L45

Added line #L45 was not covered by tests
}
mockConsumer.addRecord(new ConsumerRecord<>(topicName, partition, latestOffset, key, value));
}

private static PartitionInfo getPartitionInfo(String topic, int partition) {
return new PartitionInfo(topic, partition, Node.noNode(), new Node[0], new Node[0]);
}

private static TopicPartition getTopicPartition(String topic, int partition) {
return new TopicPartition(topic, partition);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,19 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;

import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigException;
import com.typesafe.config.ConfigFactory;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Test;

class KafkaLiveEventListenerTest {
Expand Down Expand Up @@ -50,45 +43,29 @@ void testThrowOnInvalidInputs() {
@Test
void testEventModificationCache() throws Exception {
// kafka consumer mock setup
MockConsumer<String, Long> kafkaConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
String topic = "event-update-topic";
kafkaConsumer.updatePartitions(
topic,
List.of(
getPartitionInfo(topic, 0),
getPartitionInfo(topic, 1),
getPartitionInfo(topic, 2),
getPartitionInfo(topic, 3)));
HashMap<TopicPartition, Long> endOffsets = new HashMap<>();
endOffsets.put(new TopicPartition(topic, 0), 50L);
endOffsets.put(new TopicPartition(topic, 1), 50L);
endOffsets.put(new TopicPartition(topic, 2), 50L);
endOffsets.put(new TopicPartition(topic, 3), 50L);
kafkaConsumer.updateEndOffsets(endOffsets);
String topicName = "event-update-topic";
KafkaMockConsumerTestUtil<String, Long> mockConsumerTestUtil =
new KafkaMockConsumerTestUtil<>(topicName, 4);
// create instance of event modification cache consuming from this consumer
EventModificationCache eventModificationCache =
new EventModificationCache(
"modification-event-consumer",
ConfigFactory.parseMap(Map.of("topic.name", topic, "poll.timeout", "5ms")),
kafkaConsumer);
ConfigFactory.parseMap(Map.of("topic.name", topicName, "poll.timeout", "5ms")),
mockConsumerTestUtil.getMockConsumer());
Thread.sleep(10);
assertEquals(10L, eventModificationCache.get(10));
assertEquals(100L, eventModificationCache.get(100));
// not present key won't trigger any population but callback function should be called
kafkaConsumer.addRecord(new ConsumerRecord<>(topic, 0, 100, "32", 89L));
mockConsumerTestUtil.addRecord("32", 89L, 0);
Thread.sleep(100);
assertFalse(eventModificationCache.hasKey(32));
// existing key will be modified based on entry
kafkaConsumer.addRecord(new ConsumerRecord<>(topic, 3, 200, "10", -3L));
mockConsumerTestUtil.addRecord("10", -3L, 3);
Thread.sleep(100);
assertEquals(-3L, eventModificationCache.get(10));
eventModificationCache.close();
}

private PartitionInfo getPartitionInfo(String topic, int partition) {
return new PartitionInfo(topic, partition, mock(Node.class), new Node[0], new Node[0]);
}

static class EventModificationCache {
private final AsyncLoadingCache<Integer, Long> cache;
private final KafkaLiveEventListener<String, Long> eventListener;
Expand Down