-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: make consumer the only public interface for live event listener #91
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
package org.hypertrace.core.kafka.event.listener; | ||
|
||
import com.typesafe.config.Config; | ||
import com.typesafe.config.ConfigFactory; | ||
import com.typesafe.config.ConfigValue; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
import java.util.Properties; | ||
import java.util.Set; | ||
import org.apache.kafka.clients.consumer.Consumer; | ||
import org.apache.kafka.clients.consumer.ConsumerConfig; | ||
import org.apache.kafka.clients.consumer.KafkaConsumer; | ||
import org.apache.kafka.common.serialization.ByteArrayDeserializer; | ||
import org.apache.kafka.common.serialization.Deserializer; | ||
|
||
public class KafkaConsumerUtils { | ||
Check warning on line 16 in kafka-event-listener/src/main/java/org/hypertrace/core/kafka/event/listener/KafkaConsumerUtils.java
|
||
public static final String TOPIC_NAME = "topic.name"; // required key in kafkaConfig | ||
public static final String POLL_TIMEOUT = "poll.timeout"; // defaults to 30s if not provided | ||
|
||
/** | ||
* Returns a kafka consumer for provided config and key value deserializers. Only one instance of | ||
* consumer should be required per pod, ensure singleton. | ||
*/ | ||
public static <K, V> Consumer<K, V> getKafkaConsumer( | ||
Config kafkaConfig, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) { | ||
return new KafkaConsumer<>( | ||
getKafkaConsumerConfigs(kafkaConfig.withFallback(getDefaultKafkaConsumerConfigs())), | ||
Check warning on line 27 in kafka-event-listener/src/main/java/org/hypertrace/core/kafka/event/listener/KafkaConsumerUtils.java
|
||
keyDeserializer, | ||
valueDeserializer); | ||
} | ||
|
||
private static Properties getKafkaConsumerConfigs(Config configs) { | ||
Map<String, String> configMap = new HashMap<>(); | ||
Set<Map.Entry<String, ConfigValue>> entries = configs.entrySet(); | ||
Check warning on line 34 in kafka-event-listener/src/main/java/org/hypertrace/core/kafka/event/listener/KafkaConsumerUtils.java
|
||
for (Map.Entry<String, ConfigValue> entry : entries) { | ||
String key = entry.getKey(); | ||
configMap.put(key, configs.getString(key)); | ||
} | ||
Properties props = new Properties(); | ||
props.putAll(configMap); | ||
return props; | ||
Check warning on line 41 in kafka-event-listener/src/main/java/org/hypertrace/core/kafka/event/listener/KafkaConsumerUtils.java
|
||
} | ||
|
||
private static Config getDefaultKafkaConsumerConfigs() { | ||
Map<String, String> defaultKafkaConsumerConfigMap = new HashMap<>(); | ||
defaultKafkaConsumerConfigMap.put( | ||
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); | ||
defaultKafkaConsumerConfigMap.put( | ||
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); | ||
defaultKafkaConsumerConfigMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000"); | ||
defaultKafkaConsumerConfigMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); | ||
return ConfigFactory.parseMap(defaultKafkaConsumerConfigMap); | ||
Check warning on line 52 in kafka-event-listener/src/main/java/org/hypertrace/core/kafka/event/listener/KafkaConsumerUtils.java
|
||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
package org.hypertrace.core.kafka.event.listener; | ||
|
||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Objects; | ||
import java.util.function.Function; | ||
import java.util.stream.Collectors; | ||
import java.util.stream.IntStream; | ||
import lombok.Getter; | ||
import org.apache.kafka.clients.consumer.ConsumerRecord; | ||
import org.apache.kafka.clients.consumer.MockConsumer; | ||
import org.apache.kafka.clients.consumer.OffsetResetStrategy; | ||
import org.apache.kafka.common.Node; | ||
import org.apache.kafka.common.PartitionInfo; | ||
import org.apache.kafka.common.TopicPartition; | ||
|
||
public class KafkaMockConsumerTestUtil<K, V> { | ||
private final String topicName; | ||
private final Map<TopicPartition, Long> currentOffsets; | ||
@Getter private final MockConsumer<K, V> mockConsumer; | ||
|
||
public KafkaMockConsumerTestUtil(String topicName, int numPartitions) { | ||
this.topicName = topicName; | ||
mockConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST); | ||
List<PartitionInfo> partitionInfos = | ||
IntStream.range(0, numPartitions) | ||
.mapToObj(i -> getPartitionInfo(topicName, i)) | ||
.collect(Collectors.toUnmodifiableList()); | ||
mockConsumer.updatePartitions(topicName, partitionInfos); | ||
currentOffsets = | ||
IntStream.range(0, numPartitions) | ||
.mapToObj(i -> getTopicPartition(topicName, i)) | ||
.collect(Collectors.toMap(Function.identity(), k -> 1L)); | ||
mockConsumer.updateEndOffsets(currentOffsets); | ||
} | ||
|
||
public void addRecord(K key, V value, int partition) { | ||
Long latestOffset = | ||
currentOffsets.computeIfPresent(getTopicPartition(topicName, partition), (k, v) -> v + 1); | ||
if (Objects.isNull(latestOffset)) { | ||
throw new UnsupportedOperationException( | ||
Check warning on line 41 in kafka-event-listener/src/main/java/org/hypertrace/core/kafka/event/listener/KafkaMockConsumerTestUtil.java
|
||
"cannot add to partition " | ||
+ partition | ||
+ ", total partitions is " | ||
+ currentOffsets.size()); | ||
Check warning on line 45 in kafka-event-listener/src/main/java/org/hypertrace/core/kafka/event/listener/KafkaMockConsumerTestUtil.java
|
||
} | ||
mockConsumer.addRecord(new ConsumerRecord<>(topicName, partition, latestOffset, key, value)); | ||
} | ||
|
||
private static PartitionInfo getPartitionInfo(String topic, int partition) { | ||
return new PartitionInfo(topic, partition, Node.noNode(), new Node[0], new Node[0]); | ||
} | ||
|
||
private static TopicPartition getTopicPartition(String topic, int partition) { | ||
return new TopicPartition(topic, partition); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test utils can go in testFixtures so that production classes do not depend on them. Please refer to the following for a sample test fixture and its usage. https://github.com/Traceableai/activity-event-service/tree/main/policy-store-service-impl/src/testFixtures
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, didn't know about them. Updated to use testFixtures