Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: make consumer the only public interface for live event listener #91

Merged
merged 4 commits into from
Dec 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions kafka-event-listener/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ plugins {
jacoco
id("org.hypertrace.publish-plugin")
id("org.hypertrace.jacoco-report-plugin")
id("java-test-fixtures")
}

dependencies {
Expand All @@ -16,6 +17,9 @@ dependencies {
testImplementation("org.junit.jupiter:junit-jupiter:5.9.2")
testImplementation("org.mockito:mockito-core:5.2.0")
testImplementation("com.github.ben-manes.caffeine:caffeine:3.1.8")

testFixturesApi(platform(project(":kafka-bom")))
testFixturesApi("org.apache.kafka:kafka-clients")
}

tasks.test {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package org.hypertrace.core.kafka.event.listener;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValue;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;

public class KafkaConsumerUtils {

Check warning on line 16 in kafka-event-listener/src/main/java/org/hypertrace/core/kafka/event/listener/KafkaConsumerUtils.java

View check run for this annotation

Codecov / codecov/patch

kafka-event-listener/src/main/java/org/hypertrace/core/kafka/event/listener/KafkaConsumerUtils.java#L16

Added line #L16 was not covered by tests
public static final String TOPIC_NAME = "topic.name"; // required key in kafkaConfig
public static final String POLL_TIMEOUT = "poll.timeout"; // defaults to 30s if not provided

/**
* Returns a kafka consumer for provided config and key value deserializers. Only one instance of
* consumer should be required per pod, ensure singleton.
*/
public static <K, V> Consumer<K, V> getKafkaConsumer(
Config kafkaConfig, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
return new KafkaConsumer<>(
getKafkaConsumerConfigs(kafkaConfig.withFallback(getDefaultKafkaConsumerConfigs())),

Check warning on line 27 in kafka-event-listener/src/main/java/org/hypertrace/core/kafka/event/listener/KafkaConsumerUtils.java

View check run for this annotation

Codecov / codecov/patch

kafka-event-listener/src/main/java/org/hypertrace/core/kafka/event/listener/KafkaConsumerUtils.java#L26-L27

Added lines #L26 - L27 were not covered by tests
keyDeserializer,
valueDeserializer);
}

private static Properties getKafkaConsumerConfigs(Config configs) {
Map<String, String> configMap = new HashMap<>();
Set<Map.Entry<String, ConfigValue>> entries = configs.entrySet();

Check warning on line 34 in kafka-event-listener/src/main/java/org/hypertrace/core/kafka/event/listener/KafkaConsumerUtils.java

View check run for this annotation

Codecov / codecov/patch

kafka-event-listener/src/main/java/org/hypertrace/core/kafka/event/listener/KafkaConsumerUtils.java#L33-L34

Added lines #L33 - L34 were not covered by tests
for (Map.Entry<String, ConfigValue> entry : entries) {
String key = entry.getKey();
configMap.put(key, configs.getString(key));
}
Properties props = new Properties();
props.putAll(configMap);
return props;

Check warning on line 41 in kafka-event-listener/src/main/java/org/hypertrace/core/kafka/event/listener/KafkaConsumerUtils.java

View check run for this annotation

Codecov / codecov/patch

kafka-event-listener/src/main/java/org/hypertrace/core/kafka/event/listener/KafkaConsumerUtils.java#L36-L41

Added lines #L36 - L41 were not covered by tests
}

private static Config getDefaultKafkaConsumerConfigs() {
Map<String, String> defaultKafkaConsumerConfigMap = new HashMap<>();
defaultKafkaConsumerConfigMap.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
defaultKafkaConsumerConfigMap.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
defaultKafkaConsumerConfigMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000");
defaultKafkaConsumerConfigMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
return ConfigFactory.parseMap(defaultKafkaConsumerConfigMap);

Check warning on line 52 in kafka-event-listener/src/main/java/org/hypertrace/core/kafka/event/listener/KafkaConsumerUtils.java

View check run for this annotation

Codecov / codecov/patch

kafka-event-listener/src/main/java/org/hypertrace/core/kafka/event/listener/KafkaConsumerUtils.java#L45-L52

Added lines #L45 - L52 were not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.serialization.Deserializer;

/**
* KafkaLiveEventListener consumes events produced to a single Kafka Topic from its initialisation
Expand All @@ -25,6 +24,10 @@
*
* <p>Typical usage of this listener is to back the remote caches to have lower latency of refresh
* by generating respective information on kafka topics.
*
* <p>Refer to
* org.hypertrace.core.kafka.event.listener.KafkaLiveEventListenerTest#testEventModificationCache()
* for sample usage and test. Note that testing requires Thread.sleep > poll timeout in between
*/
public class KafkaLiveEventListener<K, V> implements AutoCloseable {
private final Future<Void> kafkaLiveEventListenerCallableFuture;
Expand Down Expand Up @@ -77,19 +80,6 @@ public KafkaLiveEventListener<K, V> build(
cleanupExecutor);
}

public KafkaLiveEventListener<K, V> build(
String consumerName,
Config kafkaConfig,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer) {
assertCallbacksPresent();
return new KafkaLiveEventListener<>(
new KafkaLiveEventListenerCallable<>(
consumerName, kafkaConfig, keyDeserializer, valueDeserializer, callbacks),
executorService,
cleanupExecutor);
}

private void assertCallbacksPresent() {
if (callbacks.isEmpty()) {
throw new IllegalArgumentException("no call backs are provided to KafkaLiveEventListener");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,59 +1,33 @@
package org.hypertrace.core.kafka.event.listener;

import static org.hypertrace.core.kafka.event.listener.KafkaConsumerUtils.POLL_TIMEOUT;
import static org.hypertrace.core.kafka.event.listener.KafkaConsumerUtils.TOPIC_NAME;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValue;
import io.micrometer.core.instrument.Counter;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.hypertrace.core.serviceframework.metrics.PlatformMetricsRegistry;

@Slf4j
class KafkaLiveEventListenerCallable<K, V> implements Callable<Void> {
private static final String EVENT_CONSUMER_ERROR_COUNT = "event.consumer.error.count";
private static final String TOPIC_NAME = "topic.name";
private static final String POLL_TIMEOUT = "poll.timeout";
private final List<TopicPartition> topicPartitions;
private final Consumer<K, V> kafkaConsumer;
private final Duration pollTimeout;
private final Counter errorCounter;
private final List<BiConsumer<? super K, ? super V>> callbacks;

KafkaLiveEventListenerCallable(
String consumerName,
Config kafkaConfig,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer,
List<BiConsumer<? super K, ? super V>> callbacks) {
this(
consumerName,
kafkaConfig,
new KafkaConsumer<>(
getKafkaConsumerConfigs(kafkaConfig.withFallback(getDefaultKafkaConsumerConfigs())),
keyDeserializer,
valueDeserializer),
callbacks);
}

KafkaLiveEventListenerCallable(
String consumerName,
Config kafkaConfig,
Expand Down Expand Up @@ -107,27 +81,4 @@ public Void call() {

} while (true);
}

private static Config getDefaultKafkaConsumerConfigs() {
Map<String, String> defaultKafkaConsumerConfigMap = new HashMap<>();
defaultKafkaConsumerConfigMap.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
defaultKafkaConsumerConfigMap.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
defaultKafkaConsumerConfigMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000");
defaultKafkaConsumerConfigMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
return ConfigFactory.parseMap(defaultKafkaConsumerConfigMap);
}

private static Properties getKafkaConsumerConfigs(Config configs) {
Map<String, String> configMap = new HashMap<>();
Set<Entry<String, ConfigValue>> entries = configs.entrySet();
for (Entry<String, ConfigValue> entry : entries) {
String key = entry.getKey();
configMap.put(key, configs.getString(key));
}
Properties props = new Properties();
props.putAll(configMap);
return props;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,19 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;

import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigException;
import com.typesafe.config.ConfigFactory;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Test;

class KafkaLiveEventListenerTest {
Expand Down Expand Up @@ -50,45 +43,29 @@ void testThrowOnInvalidInputs() {
@Test
void testEventModificationCache() throws Exception {
// kafka consumer mock setup
MockConsumer<String, Long> kafkaConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
String topic = "event-update-topic";
kafkaConsumer.updatePartitions(
topic,
List.of(
getPartitionInfo(topic, 0),
getPartitionInfo(topic, 1),
getPartitionInfo(topic, 2),
getPartitionInfo(topic, 3)));
HashMap<TopicPartition, Long> endOffsets = new HashMap<>();
endOffsets.put(new TopicPartition(topic, 0), 50L);
endOffsets.put(new TopicPartition(topic, 1), 50L);
endOffsets.put(new TopicPartition(topic, 2), 50L);
endOffsets.put(new TopicPartition(topic, 3), 50L);
kafkaConsumer.updateEndOffsets(endOffsets);
String topicName = "event-update-topic";
KafkaMockConsumerTestUtil<String, Long> mockConsumerTestUtil =
new KafkaMockConsumerTestUtil<>(topicName, 4);
// create instance of event modification cache consuming from this consumer
EventModificationCache eventModificationCache =
new EventModificationCache(
"modification-event-consumer",
ConfigFactory.parseMap(Map.of("topic.name", topic, "poll.timeout", "5ms")),
kafkaConsumer);
ConfigFactory.parseMap(Map.of("topic.name", topicName, "poll.timeout", "5ms")),
mockConsumerTestUtil.getMockConsumer());
Thread.sleep(10);
assertEquals(10L, eventModificationCache.get(10));
assertEquals(100L, eventModificationCache.get(100));
// not present key won't trigger any population but callback function should be called
kafkaConsumer.addRecord(new ConsumerRecord<>(topic, 0, 100, "32", 89L));
mockConsumerTestUtil.addRecordToPartition(0, "32", 89L);
Thread.sleep(100);
assertFalse(eventModificationCache.hasKey(32));
// existing key will be modified based on entry
kafkaConsumer.addRecord(new ConsumerRecord<>(topic, 3, 200, "10", -3L));
mockConsumerTestUtil.addRecordToPartition(3, "10", -3L);
Thread.sleep(100);
assertEquals(-3L, eventModificationCache.get(10));
eventModificationCache.close();
}

private PartitionInfo getPartitionInfo(String topic, int partition) {
return new PartitionInfo(topic, partition, mock(Node.class), new Node[0], new Node[0]);
}

static class EventModificationCache {
private final AsyncLoadingCache<Integer, Long> cache;
private final KafkaLiveEventListener<String, Long> eventListener;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package org.hypertrace.core.kafka.event.listener;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;

public class KafkaMockConsumerTestUtil<K, V> {
private final String topicName;
private final Map<TopicPartition, Long> currentOffsets;

private final MockConsumer<K, V> mockConsumer;

public KafkaMockConsumerTestUtil(String topicName, int numPartitions) {
this.topicName = topicName;
mockConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
List<PartitionInfo> partitionInfos =
IntStream.range(0, numPartitions)
.mapToObj(i -> getPartitionInfo(topicName, i))
.collect(Collectors.toUnmodifiableList());
mockConsumer.updatePartitions(topicName, partitionInfos);
currentOffsets =
IntStream.range(0, numPartitions)
.mapToObj(i -> getTopicPartition(topicName, i))
.collect(Collectors.toMap(Function.identity(), k -> 1L));
mockConsumer.updateEndOffsets(currentOffsets);
}

/** creates 1 partition by default */
public KafkaMockConsumerTestUtil(String topicName) {
this(topicName, 1);
}

public MockConsumer<K, V> getMockConsumer() {
return mockConsumer;
}

/** adds to 0th partition by default */
public void addRecord(K key, V value) {
addRecordToPartition(0, key, value);
}

public void addRecordToPartition(int partition, K key, V value) {
Long latestOffset =
currentOffsets.computeIfPresent(getTopicPartition(topicName, partition), (k, v) -> v + 1);
if (Objects.isNull(latestOffset)) {
throw new UnsupportedOperationException(
"cannot add to partition "
+ partition
+ ", total partitions is "
+ currentOffsets.size());
}
mockConsumer.addRecord(new ConsumerRecord<>(topicName, partition, latestOffset, key, value));
}

private static PartitionInfo getPartitionInfo(String topic, int partition) {
return new PartitionInfo(topic, partition, Node.noNode(), new Node[0], new Node[0]);
}

private static TopicPartition getTopicPartition(String topic, int partition) {
return new TopicPartition(topic, partition);
}
}