From d0e409fa4d5c88a90f8d4eac98b1fd775fbd2a35 Mon Sep 17 00:00:00 2001 From: Kishan Sairam Adapa Date: Thu, 22 Feb 2024 20:25:36 +0530 Subject: [PATCH] update to latest kafka streams framework (#456) * update to latest kafka streams framework * vuln fix --- gradle/libs.versions.toml | 4 ++-- .../enrichment/clients/DefaultClientRegistry.java | 14 +++++++++----- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 1e5861b4..c62bf234 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -4,8 +4,8 @@ hypertrace-attribute-service = "0.14.38" hypertrace-config-service = "0.1.60" hypertrace-grpc-utils = "0.12.6" hypertrace-serviceFramework = "0.1.68" -hypertrace-kafkaStreams = "0.4.4" -hypertrace-view-generator = "0.4.21" +hypertrace-kafkaStreams = "0.4.7" +hypertrace-view-generator = "0.4.24" grpc = "1.57.2" [libraries] diff --git a/hypertrace-trace-enricher/hypertrace-trace-enricher-impl/src/main/java/org/hypertrace/traceenricher/enrichment/clients/DefaultClientRegistry.java b/hypertrace-trace-enricher/hypertrace-trace-enricher-impl/src/main/java/org/hypertrace/traceenricher/enrichment/clients/DefaultClientRegistry.java index 39be7796..df70ec04 100644 --- a/hypertrace-trace-enricher/hypertrace-trace-enricher-impl/src/main/java/org/hypertrace/traceenricher/enrichment/clients/DefaultClientRegistry.java +++ b/hypertrace-trace-enricher/hypertrace-trace-enricher-impl/src/main/java/org/hypertrace/traceenricher/enrichment/clients/DefaultClientRegistry.java @@ -21,6 +21,7 @@ import org.hypertrace.core.grpcutils.client.GrpcChannelConfig; import org.hypertrace.core.grpcutils.client.GrpcChannelRegistry; import org.hypertrace.core.grpcutils.client.RequestContextClientCallCredsProviderFactory; +import org.hypertrace.core.kafka.event.listener.KafkaConsumerUtils; import org.hypertrace.core.kafka.event.listener.KafkaLiveEventListener; import org.hypertrace.entity.change.event.v1.EntityChangeEventKey; import org.hypertrace.entity.change.event.v1.EntityChangeEventValue; @@ -99,7 +100,7 @@ public DefaultClientRegistry( EntityServiceClientConfig.from(config).getCacheConfig(), cacheLoaderExecutor); this.entityChangeEventListener = - getEntityChangeEventConsumer(config, edsCacheClient::updateBasedOnChangeEvent); + getEntityChangeEventListener(config, edsCacheClient::updateBasedOnChangeEvent); this.entityDataClient = EntityDataClient.builder(this.entityServiceChannel).build(); this.entityCache = new EntityCache(this.edsCacheClient, cacheLoaderExecutor); this.entityAccessor = @@ -197,7 +198,7 @@ protected Channel buildChannel(String host, int port, GrpcChannelConfig grpcChan } private static Optional> - getEntityChangeEventConsumer( + getEntityChangeEventListener( Config clientsConfig, BiConsumer callback) { if (clientsConfig.hasPath(ENTITY_CHANGE_EVENTS_CONSUMER_ENABLED_KEY) && clientsConfig.getBoolean(ENTITY_CHANGE_EVENTS_CONSUMER_ENABLED_KEY)) { @@ -206,14 +207,17 @@ protected Channel buildChannel(String host, int port, GrpcChannelConfig grpcChan Collections.singletonMap( "schema.registry.url", clientsConfig.getString(ENTITY_CHANGE_EVENTS_SCHEMA_REGISTRY_URL_KEY)); + Config kafkaConfig = clientsConfig.getConfig(ENTITY_CHANGE_EVENTS_CONFIG_KEY); return Optional.of( new KafkaLiveEventListener.Builder() .registerCallback(callback) .build( consumerName, - clientsConfig.getConfig(ENTITY_CHANGE_EVENTS_CONFIG_KEY), - getEntityChangeEventKeyDeser(deserConfig), - getEntityChangeEventValueDeser(deserConfig))); + kafkaConfig, + KafkaConsumerUtils.getKafkaConsumer( + kafkaConfig, + getEntityChangeEventKeyDeser(deserConfig), + getEntityChangeEventValueDeser(deserConfig)))); } return Optional.empty(); }