From d942b67634f9aa3a313668f51030890eb9200d03 Mon Sep 17 00:00:00 2001 From: "denys.kozhevnikov" Date: Fri, 21 Apr 2023 15:20:33 +0100 Subject: [PATCH] Regenerate configs upon consumer/producer creation --- .../Configuration/KafkaProducerSpecification.cs | 4 ++-- .../Configuration/KafkaTopicReceiveEndpointConfiguration.cs | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/Transports/MassTransit.KafkaIntegration/KafkaIntegration/Configuration/KafkaProducerSpecification.cs b/src/Transports/MassTransit.KafkaIntegration/KafkaIntegration/Configuration/KafkaProducerSpecification.cs index 39d3572151b..f2fdd985b58 100644 --- a/src/Transports/MassTransit.KafkaIntegration/KafkaIntegration/Configuration/KafkaProducerSpecification.cs +++ b/src/Transports/MassTransit.KafkaIntegration/KafkaIntegration/Configuration/KafkaProducerSpecification.cs @@ -159,10 +159,10 @@ public void SetHeadersSerializer(IHeadersSerializer serializer) public KafkaSendTransportContext CreateSendTransportContext(IBusInstance busInstance, Action onStop = null) { - var producerConfig = _hostConfiguration.GetProducerConfig(_producerConfig); - ProducerBuilder CreateProducerBuilder() { + var producerConfig = _hostConfiguration.GetProducerConfig(_producerConfig); + ProducerBuilder producerBuilder = new ProducerBuilder(producerConfig) .SetKeySerializer(Serializers.ByteArray) .SetValueSerializer(Serializers.ByteArray); diff --git a/src/Transports/MassTransit.KafkaIntegration/KafkaIntegration/Configuration/KafkaTopicReceiveEndpointConfiguration.cs b/src/Transports/MassTransit.KafkaIntegration/KafkaIntegration/Configuration/KafkaTopicReceiveEndpointConfiguration.cs index 56af5fd46d3..7e031355914 100644 --- a/src/Transports/MassTransit.KafkaIntegration/KafkaIntegration/Configuration/KafkaTopicReceiveEndpointConfiguration.cs +++ b/src/Transports/MassTransit.KafkaIntegration/KafkaIntegration/Configuration/KafkaTopicReceiveEndpointConfiguration.cs @@ -205,10 +205,10 @@ public override ReceiveEndpointContext CreateReceiveEndpointContext() KafkaReceiveEndpointContext CreateReceiveKafkaEndpointContext() { - var consumerConfig = _hostConfiguration.GetConsumerConfig(_consumerConfig); - ConsumerBuilder CreateConsumerBuilder() { + var consumerConfig = _hostConfiguration.GetConsumerConfig(_consumerConfig); + ConsumerBuilder consumerBuilder = new ConsumerBuilder(consumerConfig) .SetLogHandler((c, message) => _busInstance.HostConfiguration.ReceiveLogContext?.Debug?.Log(message.Message)); @@ -222,7 +222,7 @@ ConsumerBuilder CreateConsumerBuilder() return consumerBuilder; } - var builder = new KafkaReceiveEndpointBuilder(_busInstance, _hostConfiguration, consumerConfig.GroupId, this, + var builder = new KafkaReceiveEndpointBuilder(_busInstance, _hostConfiguration, _consumerConfig.GroupId, this, this, _headersDeserializer, _keyDeserializer, _valueDeserializer, CreateConsumerBuilder); ApplySpecifications(builder);