Skip to content

Commit

Permalink
refactor: make consumer the only public interface for live event list…
Browse files Browse the repository at this point in the history
…ener (#91)
  • Loading branch information
Kishan Sairam Adapa authored Dec 28, 2023
1 parent 7b78923 commit 12462bd
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 96 deletions.
4 changes: 4 additions & 0 deletions kafka-event-listener/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ plugins {
jacoco
id("org.hypertrace.publish-plugin")
id("org.hypertrace.jacoco-report-plugin")
id("java-test-fixtures")
}

dependencies {
Expand All @@ -16,6 +17,9 @@ dependencies {
testImplementation("org.junit.jupiter:junit-jupiter:5.9.2")
testImplementation("org.mockito:mockito-core:5.2.0")
testImplementation("com.github.ben-manes.caffeine:caffeine:3.1.8")

testFixturesApi(platform(project(":kafka-bom")))
testFixturesApi("org.apache.kafka:kafka-clients")
}

tasks.test {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package org.hypertrace.core.kafka.event.listener;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValue;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;

public class KafkaConsumerUtils {
public static final String TOPIC_NAME = "topic.name"; // required key in kafkaConfig
public static final String POLL_TIMEOUT = "poll.timeout"; // defaults to 30s if not provided

/**
* Returns a kafka consumer for provided config and key value deserializers. Only one instance of
* consumer should be required per pod, ensure singleton.
*/
public static <K, V> Consumer<K, V> getKafkaConsumer(
Config kafkaConfig, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
return new KafkaConsumer<>(
getKafkaConsumerConfigs(kafkaConfig.withFallback(getDefaultKafkaConsumerConfigs())),
keyDeserializer,
valueDeserializer);
}

private static Properties getKafkaConsumerConfigs(Config configs) {
Map<String, String> configMap = new HashMap<>();
Set<Map.Entry<String, ConfigValue>> entries = configs.entrySet();
for (Map.Entry<String, ConfigValue> entry : entries) {
String key = entry.getKey();
configMap.put(key, configs.getString(key));
}
Properties props = new Properties();
props.putAll(configMap);
return props;
}

private static Config getDefaultKafkaConsumerConfigs() {
Map<String, String> defaultKafkaConsumerConfigMap = new HashMap<>();
defaultKafkaConsumerConfigMap.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
defaultKafkaConsumerConfigMap.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
defaultKafkaConsumerConfigMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000");
defaultKafkaConsumerConfigMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
return ConfigFactory.parseMap(defaultKafkaConsumerConfigMap);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.serialization.Deserializer;

/**
* KafkaLiveEventListener consumes events produced to a single Kafka Topic from its initialisation
Expand All @@ -25,6 +24,10 @@
*
* <p>Typical usage of this listener is to back the remote caches to have lower latency of refresh
* by generating respective information on kafka topics.
*
* <p>Refer to
* org.hypertrace.core.kafka.event.listener.KafkaLiveEventListenerTest#testEventModificationCache()
* for sample usage and test. Note that testing requires Thread.sleep > poll timeout in between
*/
public class KafkaLiveEventListener<K, V> implements AutoCloseable {
private final Future<Void> kafkaLiveEventListenerCallableFuture;
Expand Down Expand Up @@ -77,19 +80,6 @@ public KafkaLiveEventListener<K, V> build(
cleanupExecutor);
}

public KafkaLiveEventListener<K, V> build(
String consumerName,
Config kafkaConfig,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer) {
assertCallbacksPresent();
return new KafkaLiveEventListener<>(
new KafkaLiveEventListenerCallable<>(
consumerName, kafkaConfig, keyDeserializer, valueDeserializer, callbacks),
executorService,
cleanupExecutor);
}

private void assertCallbacksPresent() {
if (callbacks.isEmpty()) {
throw new IllegalArgumentException("no call backs are provided to KafkaLiveEventListener");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,59 +1,33 @@
package org.hypertrace.core.kafka.event.listener;

import static org.hypertrace.core.kafka.event.listener.KafkaConsumerUtils.POLL_TIMEOUT;
import static org.hypertrace.core.kafka.event.listener.KafkaConsumerUtils.TOPIC_NAME;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValue;
import io.micrometer.core.instrument.Counter;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.hypertrace.core.serviceframework.metrics.PlatformMetricsRegistry;

@Slf4j
class KafkaLiveEventListenerCallable<K, V> implements Callable<Void> {
private static final String EVENT_CONSUMER_ERROR_COUNT = "event.consumer.error.count";
private static final String TOPIC_NAME = "topic.name";
private static final String POLL_TIMEOUT = "poll.timeout";
private final List<TopicPartition> topicPartitions;
private final Consumer<K, V> kafkaConsumer;
private final Duration pollTimeout;
private final Counter errorCounter;
private final List<BiConsumer<? super K, ? super V>> callbacks;

KafkaLiveEventListenerCallable(
String consumerName,
Config kafkaConfig,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer,
List<BiConsumer<? super K, ? super V>> callbacks) {
this(
consumerName,
kafkaConfig,
new KafkaConsumer<>(
getKafkaConsumerConfigs(kafkaConfig.withFallback(getDefaultKafkaConsumerConfigs())),
keyDeserializer,
valueDeserializer),
callbacks);
}

KafkaLiveEventListenerCallable(
String consumerName,
Config kafkaConfig,
Expand Down Expand Up @@ -107,27 +81,4 @@ public Void call() {

} while (true);
}

private static Config getDefaultKafkaConsumerConfigs() {
Map<String, String> defaultKafkaConsumerConfigMap = new HashMap<>();
defaultKafkaConsumerConfigMap.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
defaultKafkaConsumerConfigMap.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
defaultKafkaConsumerConfigMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000");
defaultKafkaConsumerConfigMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
return ConfigFactory.parseMap(defaultKafkaConsumerConfigMap);
}

private static Properties getKafkaConsumerConfigs(Config configs) {
Map<String, String> configMap = new HashMap<>();
Set<Entry<String, ConfigValue>> entries = configs.entrySet();
for (Entry<String, ConfigValue> entry : entries) {
String key = entry.getKey();
configMap.put(key, configs.getString(key));
}
Properties props = new Properties();
props.putAll(configMap);
return props;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,19 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;

import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigException;
import com.typesafe.config.ConfigFactory;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Test;

class KafkaLiveEventListenerTest {
Expand Down Expand Up @@ -50,45 +43,29 @@ void testThrowOnInvalidInputs() {
@Test
void testEventModificationCache() throws Exception {
// kafka consumer mock setup
MockConsumer<String, Long> kafkaConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
String topic = "event-update-topic";
kafkaConsumer.updatePartitions(
topic,
List.of(
getPartitionInfo(topic, 0),
getPartitionInfo(topic, 1),
getPartitionInfo(topic, 2),
getPartitionInfo(topic, 3)));
HashMap<TopicPartition, Long> endOffsets = new HashMap<>();
endOffsets.put(new TopicPartition(topic, 0), 50L);
endOffsets.put(new TopicPartition(topic, 1), 50L);
endOffsets.put(new TopicPartition(topic, 2), 50L);
endOffsets.put(new TopicPartition(topic, 3), 50L);
kafkaConsumer.updateEndOffsets(endOffsets);
String topicName = "event-update-topic";
KafkaMockConsumerTestUtil<String, Long> mockConsumerTestUtil =
new KafkaMockConsumerTestUtil<>(topicName, 4);
// create instance of event modification cache consuming from this consumer
EventModificationCache eventModificationCache =
new EventModificationCache(
"modification-event-consumer",
ConfigFactory.parseMap(Map.of("topic.name", topic, "poll.timeout", "5ms")),
kafkaConsumer);
ConfigFactory.parseMap(Map.of("topic.name", topicName, "poll.timeout", "5ms")),
mockConsumerTestUtil.getMockConsumer());
Thread.sleep(10);
assertEquals(10L, eventModificationCache.get(10));
assertEquals(100L, eventModificationCache.get(100));
// not present key won't trigger any population but callback function should be called
kafkaConsumer.addRecord(new ConsumerRecord<>(topic, 0, 100, "32", 89L));
mockConsumerTestUtil.addRecordToPartition(0, "32", 89L);
Thread.sleep(100);
assertFalse(eventModificationCache.hasKey(32));
// existing key will be modified based on entry
kafkaConsumer.addRecord(new ConsumerRecord<>(topic, 3, 200, "10", -3L));
mockConsumerTestUtil.addRecordToPartition(3, "10", -3L);
Thread.sleep(100);
assertEquals(-3L, eventModificationCache.get(10));
eventModificationCache.close();
}

private PartitionInfo getPartitionInfo(String topic, int partition) {
return new PartitionInfo(topic, partition, mock(Node.class), new Node[0], new Node[0]);
}

static class EventModificationCache {
private final AsyncLoadingCache<Integer, Long> cache;
private final KafkaLiveEventListener<String, Long> eventListener;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package org.hypertrace.core.kafka.event.listener;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;

public class KafkaMockConsumerTestUtil<K, V> {
private final String topicName;
private final Map<TopicPartition, Long> currentOffsets;

private final MockConsumer<K, V> mockConsumer;

public KafkaMockConsumerTestUtil(String topicName, int numPartitions) {
this.topicName = topicName;
mockConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
List<PartitionInfo> partitionInfos =
IntStream.range(0, numPartitions)
.mapToObj(i -> getPartitionInfo(topicName, i))
.collect(Collectors.toUnmodifiableList());
mockConsumer.updatePartitions(topicName, partitionInfos);
currentOffsets =
IntStream.range(0, numPartitions)
.mapToObj(i -> getTopicPartition(topicName, i))
.collect(Collectors.toMap(Function.identity(), k -> 1L));
mockConsumer.updateEndOffsets(currentOffsets);
}

/** creates 1 partition by default */
public KafkaMockConsumerTestUtil(String topicName) {
this(topicName, 1);
}

public MockConsumer<K, V> getMockConsumer() {
return mockConsumer;
}

/** adds to 0th partition by default */
public void addRecord(K key, V value) {
addRecordToPartition(0, key, value);
}

public void addRecordToPartition(int partition, K key, V value) {
Long latestOffset =
currentOffsets.computeIfPresent(getTopicPartition(topicName, partition), (k, v) -> v + 1);
if (Objects.isNull(latestOffset)) {
throw new UnsupportedOperationException(
"cannot add to partition "
+ partition
+ ", total partitions is "
+ currentOffsets.size());
}
mockConsumer.addRecord(new ConsumerRecord<>(topicName, partition, latestOffset, key, value));
}

private static PartitionInfo getPartitionInfo(String topic, int partition) {
return new PartitionInfo(topic, partition, Node.noNode(), new Node[0], new Node[0]);
}

private static TopicPartition getTopicPartition(String topic, int partition) {
return new TopicPartition(topic, partition);
}
}

0 comments on commit 12462bd

Please sign in to comment.