Skip to content

Commit

Permalink
Add ability to rewind stream to custom offset
Browse files Browse the repository at this point in the history
  • Loading branch information
NooNameR authored and phatboyg committed May 22, 2023
1 parent 6603d24 commit 40b24c8
Show file tree
Hide file tree
Showing 8 changed files with 135 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ public interface IKafkaTopicReceiveEndpointConfigurator :
/// </summary>
string GroupInstanceId { set; }

/// <summary>
/// Specifies starting consume offset for the topic. <inheritdoc cref="Confluent.Kafka.Offset"/>. This is can be useful when stream needs to be rewound
/// default: <see cref="Confluent.Kafka.Offset.Unset"/> (<inheritdoc cref="Confluent.Kafka.Offset.Unset"/>)
/// </summary>
long Offset { set; }

/// <summary>
/// Sets interval before checkpoint, low interval will decrease throughput (default: 1min)
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public int? QueueBufferingBackpressureThreshold

public TimeSpan? RetryBackoff
{
set => _producerConfig.RetryBackoffMs = value == null ? null : (int?)value.Value.TotalMilliseconds;
set => _producerConfig.RetryBackoffMs = (int?)value?.TotalMilliseconds;
}

public int? MessageSendMaxRetries
Expand All @@ -72,7 +72,7 @@ public int? MessageSendMaxRetries

public TimeSpan? Linger
{
set => _producerConfig.LingerMs = value == null ? null : (int?)value.Value.TotalMilliseconds;
set => _producerConfig.LingerMs = (int?)value?.TotalMilliseconds;
}

public int? QueueBufferingMaxKbytes
Expand All @@ -97,7 +97,7 @@ public bool? EnableIdempotence

public TimeSpan? TransactionTimeout
{
set => _producerConfig.TransactionTimeoutMs = value == null ? null : (int?)value.Value.TotalMilliseconds;
set => _producerConfig.TransactionTimeoutMs = (int?)value?.TotalMilliseconds;
}

public string TransactionalId
Expand All @@ -112,12 +112,12 @@ public Partitioner? Partitioner

public TimeSpan? MessageTimeout
{
set => _producerConfig.MessageTimeoutMs = value == null ? null : (int?)value.Value.TotalMilliseconds;
set => _producerConfig.MessageTimeoutMs = (int?)value?.TotalMilliseconds;
}

public TimeSpan? RequestTimeout
{
set => _producerConfig.RequestTimeoutMs = value == null ? null : (int?)value.Value.TotalMilliseconds;
set => _producerConfig.RequestTimeoutMs = (int?)value?.TotalMilliseconds;
}

public string DeliveryReportFields
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ public KafkaTopicReceiveEndpointConfiguration(IKafkaHostConfiguration hostConfig

_consumerConfigurator = new PipeConfigurator<ConsumerContext>();

// https://github.com/confluentinc/confluent-kafka-dotnet/blob/0e6bc8be05988f0cafacfe2b71aa8950aabe8cb5/src/Confluent.Kafka/ConsumerBuilder.cs#L387
Offset = Confluent.Kafka.Offset.Unset;

PublishFaults = false;

this.DiscardFaultedMessages();
Expand All @@ -79,12 +82,12 @@ public PartitionAssignmentStrategy? PartitionAssignmentStrategy

public TimeSpan? SessionTimeout
{
set => _consumerConfig.SessionTimeoutMs = value == null ? (int?)null : Convert.ToInt32(value.Value.TotalMilliseconds);
set => _consumerConfig.SessionTimeoutMs = (int?)value?.TotalMilliseconds;
}

public TimeSpan? HeartbeatInterval
{
set => _consumerConfig.HeartbeatIntervalMs = value == null ? (int?)null : Convert.ToInt32(value.Value.TotalMilliseconds);
set => _consumerConfig.HeartbeatIntervalMs = (int?)value?.TotalMilliseconds;
}

public string GroupProtocolType
Expand All @@ -94,12 +97,12 @@ public string GroupProtocolType

public TimeSpan? CoordinatorQueryInterval
{
set => _consumerConfig.CoordinatorQueryIntervalMs = value == null ? (int?)null : Convert.ToInt32(value.Value.TotalMilliseconds);
set => _consumerConfig.CoordinatorQueryIntervalMs = (int?)value?.TotalMilliseconds;
}

public TimeSpan? MaxPollInterval
{
set => _consumerConfig.MaxPollIntervalMs = value == null ? (int?)null : Convert.ToInt32(value.Value.TotalMilliseconds);
set => _consumerConfig.MaxPollIntervalMs = (int?)value?.TotalMilliseconds;
}

public bool? EnableAutoOffsetStore
Expand Down Expand Up @@ -181,6 +184,7 @@ public void CreateIfMissing(Action<KafkaTopicOptions> configure)

int ReceiveSettings.ConcurrentMessageLimit => Transport.GetConcurrentMessageLimit();

public long Offset { get; set; }
public string Topic { get; }
public ushort MessageLimit { get; set; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public Task Faulted(ConsumeResult<byte[], byte[]> result, Exception exception)
return Task.CompletedTask;
}

public void OnAssigned(IConsumer<byte[], byte[]> consumer, IEnumerable<TopicPartition> partitions)
public IEnumerable<TopicPartitionOffset> OnAssigned(IConsumer<byte[], byte[]> consumer, IEnumerable<TopicPartition> partitions)
{
LogContext.SetCurrentIfNull(_context.LogContext);

Expand All @@ -85,11 +85,14 @@ public void OnAssigned(IConsumer<byte[], byte[]> consumer, IEnumerable<TopicPart
if (!_data.TryAdd(partition, p => new PartitionCheckpointData(consumer, _receiveSettings, _pending)))
continue;

LogContext.Debug?.Log("Partition: {PartitionId} was assigned to: {MemberId}", partition, consumer.MemberId);
LogContext.Debug?.Log("Partition: {PartitionId} with {Offset} was assigned to: {MemberId}", partition, _receiveSettings.Offset,
consumer.MemberId);

yield return new TopicPartitionOffset(partition, _receiveSettings.Offset);
}
}

public void OnPartitionLost(IConsumer<byte[], byte[]> consumer, IEnumerable<TopicPartitionOffset> partitions)
public IEnumerable<TopicPartitionOffset> OnPartitionLost(IConsumer<byte[], byte[]> consumer, IEnumerable<TopicPartitionOffset> partitions)
{
LogContext.SetCurrentIfNull(_context.LogContext);

Expand All @@ -103,9 +106,10 @@ Task LostAndDelete(TopicPartitionOffset topicPartition)
}

TaskUtil.Await(Task.WhenAll(partitions.Select(partition => LostAndDelete(partition))));
return Array.Empty<TopicPartitionOffset>();
}

public void OnUnAssigned(IConsumer<byte[], byte[]> consumer, IEnumerable<TopicPartitionOffset> partitions)
public IEnumerable<TopicPartitionOffset> OnUnAssigned(IConsumer<byte[], byte[]> consumer, IEnumerable<TopicPartitionOffset> partitions)
{
LogContext.SetCurrentIfNull(_context.LogContext);

Expand All @@ -119,6 +123,7 @@ Task CloseAndDelete(TopicPartitionOffset topicPartition)
}

TaskUtil.Await(Task.WhenAll(partitions.Select(partition => CloseAndDelete(partition))));
return Array.Empty<TopicPartitionOffset>();
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ namespace MassTransit.KafkaIntegration

public interface KafkaConsumerBuilderContext
{
void OnAssigned(IConsumer<byte[], byte[]> consumer, IEnumerable<TopicPartition> partitions);
void OnUnAssigned(IConsumer<byte[], byte[]> consumer, IEnumerable<TopicPartitionOffset> partitions);
void OnPartitionLost(IConsumer<byte[], byte[]> consumer, IEnumerable<TopicPartitionOffset> partitions);
IEnumerable<TopicPartitionOffset> OnAssigned(IConsumer<byte[], byte[]> consumer, IEnumerable<TopicPartition> partitions);
IEnumerable<TopicPartitionOffset> OnUnAssigned(IConsumer<byte[], byte[]> consumer, IEnumerable<TopicPartitionOffset> partitions);
IEnumerable<TopicPartitionOffset> OnPartitionLost(IConsumer<byte[], byte[]> consumer, IEnumerable<TopicPartitionOffset> partitions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ namespace MassTransit.KafkaIntegration

public interface ReceiveSettings
{
long Offset { get; }
string Topic { get; }
ushort MessageLimit { get; }
ushort CheckpointMessageCount { get; }
Expand Down
102 changes: 102 additions & 0 deletions tests/MassTransit.KafkaIntegration.Tests/TopicConnector_Specs.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace MassTransit.KafkaIntegration.Tests
{
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using Confluent.Kafka;
using Internals;
Expand Down Expand Up @@ -101,4 +102,105 @@ public interface KafkaMessage
string Text { get; }
}
}


public class TopicConnector_With_Custom_Offset_Specs :
InMemoryTestFixture
{
const string Topic = "endpoint-connector-with-offset";

[Test]
public async Task Should_receive_on_connected_topic()
{
var counters = new ConcurrentDictionary<Guid, int>();
await using var provider = new ServiceCollection()
.AddSingleton(counters)
.ConfigureKafkaTestOptions(options =>
{
options.CreateTopicsIfNotExists = true;
options.TopicNames = new[] { Topic };
})
.AddMassTransitTestHarness(x =>
{
x.AddTaskCompletionSource<ConsumeContext<KafkaMessage>>();
x.SetTestTimeouts(testInactivityTimeout: TimeSpan.FromSeconds(30));

x.AddRider(rider =>
{
rider.AddProducer<KafkaMessage>(Topic);
rider.AddConsumer<CounterConsumer>();

rider.UsingKafka((_, _) =>
{
});
});
}).BuildServiceProvider();

var harness = provider.GetTestHarness();
await harness.Start();

var kafka = provider.GetRequiredService<IKafkaRider>();

var id = NewId.NextGuid();
ITopicProducer<KafkaMessage> producer = harness.GetProducer<KafkaMessage>();

await producer.Produce(new { Id = id }, harness.CancellationToken);

var connected = kafka.ConnectTopicEndpoint<KafkaMessage>(Topic, nameof(TopicConnector_With_Custom_Offset_Specs), (context, configurator) =>
{
configurator.AutoOffsetReset = AutoOffsetReset.Earliest;
configurator.ConfigureConsumer<CounterConsumer>(context);
});

await connected.Ready.OrCanceled(harness.CancellationToken);

IReceivedMessage<KafkaMessage> received = await harness.Consumed.SelectAsync<KafkaMessage>(harness.CancellationToken).FirstOrDefault();

Assert.That(received, Is.Not.Null);

Assert.That(received.Context.TryGetPayload(out KafkaConsumeContext kafkaConsumeContext), Is.True);

await connected.StopAsync(harness.CancellationToken);

connected = kafka.ConnectTopicEndpoint<KafkaMessage>(Topic, nameof(TopicConnector_With_Custom_Offset_Specs), (context, configurator) =>
{
configurator.Offset = kafkaConsumeContext.Offset;
configurator.AutoOffsetReset = AutoOffsetReset.Earliest;

configurator.ConfigureConsumer<CounterConsumer>(context);
});

await connected.Ready.OrCanceled(harness.CancellationToken);

while (counters.TryGetValue(id, out var count) && count < 2)
{
harness.CancellationToken.ThrowIfCancellationRequested();
await Task.Delay(20);
}
}


public interface KafkaMessage
{
Guid Id { get; }
}


class CounterConsumer :
IConsumer<KafkaMessage>
{
readonly ConcurrentDictionary<Guid, int> _result;

public CounterConsumer(ConcurrentDictionary<Guid, int> result)
{
_result = result;
}

public Task Consume(ConsumeContext<KafkaMessage> context)
{
_result.AddOrUpdate(context.Message.Id, _ => 1, (_, v) => v + 1);
return Task.CompletedTask;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ await Task.WhenAll(
{
cfg.UseMessageRetry(r => r.Immediate(5));
cfg.UseMessageScope(context);
cfg.UseInMemoryOutbox(context);
cfg.UseInMemoryOutbox();

cfg.ConfigureEndpoints(context);
});
Expand Down

0 comments on commit 40b24c8

Please sign in to comment.