From 40b24c88e3d955516e009f7d4b8d227c34ca990d Mon Sep 17 00:00:00 2001 From: "denys.kozhevnikov" Date: Fri, 12 May 2023 21:46:02 +0100 Subject: [PATCH] Add ability to rewind stream to custom offset --- .../IKafkaTopicReceiveEndpointConfigurator.cs | 6 ++ .../KafkaProducerSpecification.cs | 10 +- .../KafkaTopicReceiveEndpointConfiguration.cs | 12 ++- .../KafkaIntegration/ConsumerLockContext.cs | 13 ++- .../KafkaConsumerBuilderContext.cs | 6 +- .../KafkaIntegration/ReceiveSettings.cs | 1 + .../TopicConnector_Specs.cs | 102 ++++++++++++++++++ .../Concurrency_Specs.cs | 2 +- 8 files changed, 135 insertions(+), 17 deletions(-) diff --git a/src/Transports/MassTransit.KafkaIntegration/Configuration/IKafkaTopicReceiveEndpointConfigurator.cs b/src/Transports/MassTransit.KafkaIntegration/Configuration/IKafkaTopicReceiveEndpointConfigurator.cs index 4af99558dbf..3c0b38f2b52 100644 --- a/src/Transports/MassTransit.KafkaIntegration/Configuration/IKafkaTopicReceiveEndpointConfigurator.cs +++ b/src/Transports/MassTransit.KafkaIntegration/Configuration/IKafkaTopicReceiveEndpointConfigurator.cs @@ -27,6 +27,12 @@ public interface IKafkaTopicReceiveEndpointConfigurator : /// string GroupInstanceId { set; } + /// + /// Specifies starting consume offset for the topic. . This is can be useful when stream needs to be rewound + /// default: () + /// + long Offset { set; } + /// /// Sets interval before checkpoint, low interval will decrease throughput (default: 1min) /// diff --git a/src/Transports/MassTransit.KafkaIntegration/KafkaIntegration/Configuration/KafkaProducerSpecification.cs b/src/Transports/MassTransit.KafkaIntegration/KafkaIntegration/Configuration/KafkaProducerSpecification.cs index f2fdd985b58..ae745d6b06e 100644 --- a/src/Transports/MassTransit.KafkaIntegration/KafkaIntegration/Configuration/KafkaProducerSpecification.cs +++ b/src/Transports/MassTransit.KafkaIntegration/KafkaIntegration/Configuration/KafkaProducerSpecification.cs @@ -62,7 +62,7 @@ public int? QueueBufferingBackpressureThreshold public TimeSpan? RetryBackoff { - set => _producerConfig.RetryBackoffMs = value == null ? null : (int?)value.Value.TotalMilliseconds; + set => _producerConfig.RetryBackoffMs = (int?)value?.TotalMilliseconds; } public int? MessageSendMaxRetries @@ -72,7 +72,7 @@ public int? MessageSendMaxRetries public TimeSpan? Linger { - set => _producerConfig.LingerMs = value == null ? null : (int?)value.Value.TotalMilliseconds; + set => _producerConfig.LingerMs = (int?)value?.TotalMilliseconds; } public int? QueueBufferingMaxKbytes @@ -97,7 +97,7 @@ public bool? EnableIdempotence public TimeSpan? TransactionTimeout { - set => _producerConfig.TransactionTimeoutMs = value == null ? null : (int?)value.Value.TotalMilliseconds; + set => _producerConfig.TransactionTimeoutMs = (int?)value?.TotalMilliseconds; } public string TransactionalId @@ -112,12 +112,12 @@ public Partitioner? Partitioner public TimeSpan? MessageTimeout { - set => _producerConfig.MessageTimeoutMs = value == null ? null : (int?)value.Value.TotalMilliseconds; + set => _producerConfig.MessageTimeoutMs = (int?)value?.TotalMilliseconds; } public TimeSpan? RequestTimeout { - set => _producerConfig.RequestTimeoutMs = value == null ? null : (int?)value.Value.TotalMilliseconds; + set => _producerConfig.RequestTimeoutMs = (int?)value?.TotalMilliseconds; } public string DeliveryReportFields diff --git a/src/Transports/MassTransit.KafkaIntegration/KafkaIntegration/Configuration/KafkaTopicReceiveEndpointConfiguration.cs b/src/Transports/MassTransit.KafkaIntegration/KafkaIntegration/Configuration/KafkaTopicReceiveEndpointConfiguration.cs index c53f39da40a..8b33b2cb268 100644 --- a/src/Transports/MassTransit.KafkaIntegration/KafkaIntegration/Configuration/KafkaTopicReceiveEndpointConfiguration.cs +++ b/src/Transports/MassTransit.KafkaIntegration/KafkaIntegration/Configuration/KafkaTopicReceiveEndpointConfiguration.cs @@ -54,6 +54,9 @@ public KafkaTopicReceiveEndpointConfiguration(IKafkaHostConfiguration hostConfig _consumerConfigurator = new PipeConfigurator(); + // https://github.com/confluentinc/confluent-kafka-dotnet/blob/0e6bc8be05988f0cafacfe2b71aa8950aabe8cb5/src/Confluent.Kafka/ConsumerBuilder.cs#L387 + Offset = Confluent.Kafka.Offset.Unset; + PublishFaults = false; this.DiscardFaultedMessages(); @@ -79,12 +82,12 @@ public PartitionAssignmentStrategy? PartitionAssignmentStrategy public TimeSpan? SessionTimeout { - set => _consumerConfig.SessionTimeoutMs = value == null ? (int?)null : Convert.ToInt32(value.Value.TotalMilliseconds); + set => _consumerConfig.SessionTimeoutMs = (int?)value?.TotalMilliseconds; } public TimeSpan? HeartbeatInterval { - set => _consumerConfig.HeartbeatIntervalMs = value == null ? (int?)null : Convert.ToInt32(value.Value.TotalMilliseconds); + set => _consumerConfig.HeartbeatIntervalMs = (int?)value?.TotalMilliseconds; } public string GroupProtocolType @@ -94,12 +97,12 @@ public string GroupProtocolType public TimeSpan? CoordinatorQueryInterval { - set => _consumerConfig.CoordinatorQueryIntervalMs = value == null ? (int?)null : Convert.ToInt32(value.Value.TotalMilliseconds); + set => _consumerConfig.CoordinatorQueryIntervalMs = (int?)value?.TotalMilliseconds; } public TimeSpan? MaxPollInterval { - set => _consumerConfig.MaxPollIntervalMs = value == null ? (int?)null : Convert.ToInt32(value.Value.TotalMilliseconds); + set => _consumerConfig.MaxPollIntervalMs = (int?)value?.TotalMilliseconds; } public bool? EnableAutoOffsetStore @@ -181,6 +184,7 @@ public void CreateIfMissing(Action configure) int ReceiveSettings.ConcurrentMessageLimit => Transport.GetConcurrentMessageLimit(); + public long Offset { get; set; } public string Topic { get; } public ushort MessageLimit { get; set; } diff --git a/src/Transports/MassTransit.KafkaIntegration/KafkaIntegration/ConsumerLockContext.cs b/src/Transports/MassTransit.KafkaIntegration/KafkaIntegration/ConsumerLockContext.cs index f5ad4bfcd43..b09ea6eeac5 100644 --- a/src/Transports/MassTransit.KafkaIntegration/KafkaIntegration/ConsumerLockContext.cs +++ b/src/Transports/MassTransit.KafkaIntegration/KafkaIntegration/ConsumerLockContext.cs @@ -76,7 +76,7 @@ public Task Faulted(ConsumeResult result, Exception exception) return Task.CompletedTask; } - public void OnAssigned(IConsumer consumer, IEnumerable partitions) + public IEnumerable OnAssigned(IConsumer consumer, IEnumerable partitions) { LogContext.SetCurrentIfNull(_context.LogContext); @@ -85,11 +85,14 @@ public void OnAssigned(IConsumer consumer, IEnumerable new PartitionCheckpointData(consumer, _receiveSettings, _pending))) continue; - LogContext.Debug?.Log("Partition: {PartitionId} was assigned to: {MemberId}", partition, consumer.MemberId); + LogContext.Debug?.Log("Partition: {PartitionId} with {Offset} was assigned to: {MemberId}", partition, _receiveSettings.Offset, + consumer.MemberId); + + yield return new TopicPartitionOffset(partition, _receiveSettings.Offset); } } - public void OnPartitionLost(IConsumer consumer, IEnumerable partitions) + public IEnumerable OnPartitionLost(IConsumer consumer, IEnumerable partitions) { LogContext.SetCurrentIfNull(_context.LogContext); @@ -103,9 +106,10 @@ Task LostAndDelete(TopicPartitionOffset topicPartition) } TaskUtil.Await(Task.WhenAll(partitions.Select(partition => LostAndDelete(partition)))); + return Array.Empty(); } - public void OnUnAssigned(IConsumer consumer, IEnumerable partitions) + public IEnumerable OnUnAssigned(IConsumer consumer, IEnumerable partitions) { LogContext.SetCurrentIfNull(_context.LogContext); @@ -119,6 +123,7 @@ Task CloseAndDelete(TopicPartitionOffset topicPartition) } TaskUtil.Await(Task.WhenAll(partitions.Select(partition => CloseAndDelete(partition)))); + return Array.Empty(); } diff --git a/src/Transports/MassTransit.KafkaIntegration/KafkaIntegration/KafkaConsumerBuilderContext.cs b/src/Transports/MassTransit.KafkaIntegration/KafkaIntegration/KafkaConsumerBuilderContext.cs index e5ca595d15f..53884635894 100644 --- a/src/Transports/MassTransit.KafkaIntegration/KafkaIntegration/KafkaConsumerBuilderContext.cs +++ b/src/Transports/MassTransit.KafkaIntegration/KafkaIntegration/KafkaConsumerBuilderContext.cs @@ -6,8 +6,8 @@ namespace MassTransit.KafkaIntegration public interface KafkaConsumerBuilderContext { - void OnAssigned(IConsumer consumer, IEnumerable partitions); - void OnUnAssigned(IConsumer consumer, IEnumerable partitions); - void OnPartitionLost(IConsumer consumer, IEnumerable partitions); + IEnumerable OnAssigned(IConsumer consumer, IEnumerable partitions); + IEnumerable OnUnAssigned(IConsumer consumer, IEnumerable partitions); + IEnumerable OnPartitionLost(IConsumer consumer, IEnumerable partitions); } } diff --git a/src/Transports/MassTransit.KafkaIntegration/KafkaIntegration/ReceiveSettings.cs b/src/Transports/MassTransit.KafkaIntegration/KafkaIntegration/ReceiveSettings.cs index f77df06c577..b9a94e65522 100644 --- a/src/Transports/MassTransit.KafkaIntegration/KafkaIntegration/ReceiveSettings.cs +++ b/src/Transports/MassTransit.KafkaIntegration/KafkaIntegration/ReceiveSettings.cs @@ -5,6 +5,7 @@ namespace MassTransit.KafkaIntegration public interface ReceiveSettings { + long Offset { get; } string Topic { get; } ushort MessageLimit { get; } ushort CheckpointMessageCount { get; } diff --git a/tests/MassTransit.KafkaIntegration.Tests/TopicConnector_Specs.cs b/tests/MassTransit.KafkaIntegration.Tests/TopicConnector_Specs.cs index 1c222f655db..0b2037cf8e5 100644 --- a/tests/MassTransit.KafkaIntegration.Tests/TopicConnector_Specs.cs +++ b/tests/MassTransit.KafkaIntegration.Tests/TopicConnector_Specs.cs @@ -1,6 +1,7 @@ namespace MassTransit.KafkaIntegration.Tests { using System; + using System.Collections.Concurrent; using System.Threading.Tasks; using Confluent.Kafka; using Internals; @@ -101,4 +102,105 @@ public interface KafkaMessage string Text { get; } } } + + + public class TopicConnector_With_Custom_Offset_Specs : + InMemoryTestFixture + { + const string Topic = "endpoint-connector-with-offset"; + + [Test] + public async Task Should_receive_on_connected_topic() + { + var counters = new ConcurrentDictionary(); + await using var provider = new ServiceCollection() + .AddSingleton(counters) + .ConfigureKafkaTestOptions(options => + { + options.CreateTopicsIfNotExists = true; + options.TopicNames = new[] { Topic }; + }) + .AddMassTransitTestHarness(x => + { + x.AddTaskCompletionSource>(); + x.SetTestTimeouts(testInactivityTimeout: TimeSpan.FromSeconds(30)); + + x.AddRider(rider => + { + rider.AddProducer(Topic); + rider.AddConsumer(); + + rider.UsingKafka((_, _) => + { + }); + }); + }).BuildServiceProvider(); + + var harness = provider.GetTestHarness(); + await harness.Start(); + + var kafka = provider.GetRequiredService(); + + var id = NewId.NextGuid(); + ITopicProducer producer = harness.GetProducer(); + + await producer.Produce(new { Id = id }, harness.CancellationToken); + + var connected = kafka.ConnectTopicEndpoint(Topic, nameof(TopicConnector_With_Custom_Offset_Specs), (context, configurator) => + { + configurator.AutoOffsetReset = AutoOffsetReset.Earliest; + configurator.ConfigureConsumer(context); + }); + + await connected.Ready.OrCanceled(harness.CancellationToken); + + IReceivedMessage received = await harness.Consumed.SelectAsync(harness.CancellationToken).FirstOrDefault(); + + Assert.That(received, Is.Not.Null); + + Assert.That(received.Context.TryGetPayload(out KafkaConsumeContext kafkaConsumeContext), Is.True); + + await connected.StopAsync(harness.CancellationToken); + + connected = kafka.ConnectTopicEndpoint(Topic, nameof(TopicConnector_With_Custom_Offset_Specs), (context, configurator) => + { + configurator.Offset = kafkaConsumeContext.Offset; + configurator.AutoOffsetReset = AutoOffsetReset.Earliest; + + configurator.ConfigureConsumer(context); + }); + + await connected.Ready.OrCanceled(harness.CancellationToken); + + while (counters.TryGetValue(id, out var count) && count < 2) + { + harness.CancellationToken.ThrowIfCancellationRequested(); + await Task.Delay(20); + } + } + + + public interface KafkaMessage + { + Guid Id { get; } + } + + + class CounterConsumer : + IConsumer + { + readonly ConcurrentDictionary _result; + + public CounterConsumer(ConcurrentDictionary result) + { + _result = result; + } + + public Task Consume(ConsumeContext context) + { + _result.AddOrUpdate(context.Message.Id, _ => 1, (_, v) => v + 1); + return Task.CompletedTask; + } + } + } } diff --git a/tests/MassTransit.MartenIntegration.Tests/Concurrency_Specs.cs b/tests/MassTransit.MartenIntegration.Tests/Concurrency_Specs.cs index 1acf288ea22..7e13ffc3619 100644 --- a/tests/MassTransit.MartenIntegration.Tests/Concurrency_Specs.cs +++ b/tests/MassTransit.MartenIntegration.Tests/Concurrency_Specs.cs @@ -50,7 +50,7 @@ await Task.WhenAll( { cfg.UseMessageRetry(r => r.Immediate(5)); cfg.UseMessageScope(context); - cfg.UseInMemoryOutbox(context); + cfg.UseInMemoryOutbox(); cfg.ConfigureEndpoints(context); });