From a94e46eabc757c0c76b3f428eac20c00042c8d07 Mon Sep 17 00:00:00 2001 From: Kyle McClellan Date: Mon, 13 Feb 2023 17:10:18 -0500 Subject: [PATCH] Inject client builders (#7) * Consolidate config overrides * Register builders * Remove handler helper * Scoped clients * Example: Resolve builders/factory --- .../Builders/ConsumerAdapter.cs | 83 +++++--- .../Builders/ProducerAdapter.cs | 69 ++++--- .../Clients/ScopedConsumer.cs | 187 ++++++++++++++++++ .../Clients/ScopedProducer.cs | 118 +++++++++++ .../Clients/ServiceConsumer.cs | 152 ++------------ .../Clients/ServiceProducer.cs | 83 +------- .../Handlers/HandlerHelper.cs | 32 --- .../KafkaFactory.cs | 18 +- .../ServiceCollectionExtensions.cs | 5 +- Example/Program.cs | 17 +- 10 files changed, 458 insertions(+), 306 deletions(-) create mode 100644 Confluent.Kafka.DependencyInjection/Clients/ScopedConsumer.cs create mode 100644 Confluent.Kafka.DependencyInjection/Clients/ScopedProducer.cs delete mode 100644 Confluent.Kafka.DependencyInjection/Handlers/HandlerHelper.cs diff --git a/Confluent.Kafka.DependencyInjection/Builders/ConsumerAdapter.cs b/Confluent.Kafka.DependencyInjection/Builders/ConsumerAdapter.cs index 997dbb5..1d50ad9 100644 --- a/Confluent.Kafka.DependencyInjection/Builders/ConsumerAdapter.cs +++ b/Confluent.Kafka.DependencyInjection/Builders/ConsumerAdapter.cs @@ -1,46 +1,67 @@ namespace Confluent.Kafka.DependencyInjection.Builders; +using Confluent.Kafka.DependencyInjection.Clients; using Confluent.Kafka.DependencyInjection.Handlers; using Confluent.Kafka.SyncOverAsync; +using Microsoft.Extensions.DependencyInjection; + +using System; using System.Collections.Generic; +using System.Linq; -[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Performance", "CA1812", Justification = "Instantiated by container")] sealed class ConsumerAdapter : ConsumerBuilder { - public IDictionary ClientConfig { get; } = new Dictionary(); - - public ConsumerAdapter( - HandlerHelper errorHelper, - HandlerHelper statisticsHelper, - HandlerHelper logHelper, - HandlerHelper assignHelper, - HandlerHelper revokeHelper, - HandlerHelper commitHelper, - ConfigWrapper? config = null, - IDeserializer? keyDeserializer = null, - IDeserializer? valueDeserializer = null, - IAsyncDeserializer? asyncKeyDeserializer = null, - IAsyncDeserializer? asyncValueDeserializer = null) - : base(config?.Values) + readonly IDisposable? scope; + + public ConsumerAdapter(IServiceScopeFactory scopes, ConfigWrapper config) + : this(config.Values, scopes.CreateScope(), dispose: true) + { + } + + internal ConsumerAdapter( + IEnumerable> config, + IServiceScope scope, + bool dispose) + : base(config) { - ErrorHandler = errorHelper.Resolve(x => x.OnError, ErrorHandler); - StatisticsHandler = statisticsHelper.Resolve(x => x.OnStatistics, StatisticsHandler); - LogHandler = logHelper.Resolve(x => x.OnLog, LogHandler); - PartitionsAssignedHandler = assignHelper.Resolve(x => x.OnPartitionsAssigned, PartitionsAssignedHandler); - PartitionsRevokedHandler = revokeHelper.Resolve(x => x.OnPartitionsRevoked, PartitionsRevokedHandler); - OffsetsCommittedHandler = commitHelper.Resolve(x => x.OnOffsetsCommitted, OffsetsCommittedHandler); - KeyDeserializer = keyDeserializer ?? asyncKeyDeserializer?.AsSyncOverAsync(); - ValueDeserializer = valueDeserializer ?? asyncValueDeserializer?.AsSyncOverAsync(); - - if (Config != null) + ErrorHandler = scope.ServiceProvider.GetServices() + .Aggregate(default(Action), (x, y) => x + y.OnError); + + StatisticsHandler = scope.ServiceProvider.GetServices() + .Aggregate(default(Action), (x, y) => x + y.OnStatistics); + + LogHandler = scope.ServiceProvider.GetServices() + .Aggregate(default(Action), (x, y) => x + y.OnLog); + + PartitionsAssignedHandler = scope.ServiceProvider.GetServices() + .Aggregate( + default(Func, IEnumerable>), + (x, y) => x + y.OnPartitionsAssigned); + + PartitionsRevokedHandler = scope.ServiceProvider.GetServices() + .Aggregate( + default(Func, IEnumerable>), + (x, y) => x + y.OnPartitionsRevoked); + + OffsetsCommittedHandler = scope.ServiceProvider.GetServices() + .Aggregate(default(Action), (x, y) => x + y.OnOffsetsCommitted); + + KeyDeserializer = scope.ServiceProvider.GetService>() ?? + scope.ServiceProvider.GetService>()?.AsSyncOverAsync(); + + ValueDeserializer = scope.ServiceProvider.GetService>() ?? + scope.ServiceProvider.GetService>()?.AsSyncOverAsync(); + + if (dispose) { - foreach (var kvp in Config) - { - ClientConfig[kvp.Key] = kvp.Value; - } + this.scope = scope; } + } - Config = ClientConfig; + public override IConsumer Build() + { + var consumer = base.Build(); + return scope != null ? new ScopedConsumer(consumer, scope) : consumer; } } diff --git a/Confluent.Kafka.DependencyInjection/Builders/ProducerAdapter.cs b/Confluent.Kafka.DependencyInjection/Builders/ProducerAdapter.cs index 92b4f75..2078cb0 100644 --- a/Confluent.Kafka.DependencyInjection/Builders/ProducerAdapter.cs +++ b/Confluent.Kafka.DependencyInjection/Builders/ProducerAdapter.cs @@ -1,44 +1,61 @@ namespace Confluent.Kafka.DependencyInjection.Builders; +using Confluent.Kafka.DependencyInjection.Clients; using Confluent.Kafka.DependencyInjection.Handlers; +using Microsoft.Extensions.DependencyInjection; + +using System; using System.Collections.Generic; +using System.Linq; -[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Performance", "CA1812", Justification = "Instantiated by container")] sealed class ProducerAdapter : ProducerBuilder { - public IDictionary ClientConfig { get; } = new Dictionary(); - - public ProducerAdapter( - HandlerHelper errorHelper, - HandlerHelper statisticsHelper, - HandlerHelper logHelper, - ConfigWrapper? config = null, - ISerializer? keySerializer = null, - ISerializer? valueSerializer = null, - IAsyncSerializer? asyncKeySerializer = null, - IAsyncSerializer? asyncValueSerializer = null) - : base(config?.Values) + readonly IDisposable? scope; + + public ProducerAdapter(IServiceScopeFactory scopes, ConfigWrapper config) + : this(config.Values, scopes.CreateScope(), dispose: true) { - ErrorHandler = errorHelper.Resolve(x => x.OnError, ErrorHandler); - StatisticsHandler = statisticsHelper.Resolve(x => x.OnStatistics, StatisticsHandler); - LogHandler = logHelper.Resolve(x => x.OnLog, LogHandler); + } + + internal ProducerAdapter( + IEnumerable> config, + IServiceScope scope, + bool dispose) + : base(config) + { + ErrorHandler = scope.ServiceProvider.GetServices() + .Aggregate(default(Action), (x, y) => x + y.OnError); + + StatisticsHandler = scope.ServiceProvider.GetServices() + .Aggregate(default(Action), (x, y) => x + y.OnStatistics); + + LogHandler = scope.ServiceProvider.GetServices() + .Aggregate(default(Action), (x, y) => x + y.OnLog); - KeySerializer = keySerializer; - ValueSerializer = valueSerializer; + KeySerializer = scope.ServiceProvider.GetService>(); + ValueSerializer = scope.ServiceProvider.GetService>(); // Setting both types of serializers is an error. - if (keySerializer == null) AsyncKeySerializer = asyncKeySerializer; - if (valueSerializer == null) AsyncValueSerializer = asyncValueSerializer; + if (KeySerializer == null) + { + AsyncKeySerializer = scope.ServiceProvider.GetService>(); + } - if (Config != null) + if (ValueSerializer == null) { - foreach (var kvp in Config) - { - ClientConfig[kvp.Key] = kvp.Value; - } + AsyncValueSerializer =scope.ServiceProvider.GetService>(); } - Config = ClientConfig; + if (dispose) + { + this.scope = scope; + } + } + + public override IProducer Build() + { + var producer = base.Build(); + return scope != null ? new ScopedProducer(producer, scope) : producer; } } diff --git a/Confluent.Kafka.DependencyInjection/Clients/ScopedConsumer.cs b/Confluent.Kafka.DependencyInjection/Clients/ScopedConsumer.cs new file mode 100644 index 0000000..fb2037d --- /dev/null +++ b/Confluent.Kafka.DependencyInjection/Clients/ScopedConsumer.cs @@ -0,0 +1,187 @@ +namespace Confluent.Kafka.DependencyInjection.Clients; + +using System; +using System.Collections.Generic; +using System.Threading; + +class ScopedConsumer : IConsumer +{ + readonly IConsumer consumer; + readonly IDisposable scope; + + public ScopedConsumer(IConsumer consumer, IDisposable scope) + { + this.consumer = consumer; + this.scope = scope; + } + + public Handle Handle => consumer.Handle; + + public string Name => consumer.Name; + + public List Subscription => consumer.Subscription; + + public List Assignment => consumer.Assignment; + + public string MemberId => consumer.MemberId; + + public IConsumerGroupMetadata ConsumerGroupMetadata => consumer.ConsumerGroupMetadata; + + public int AddBrokers(string brokers) + { + return consumer.AddBrokers(brokers); + } + + public void Subscribe(string topic) + { + consumer.Subscribe(topic); + } + + public void Subscribe(IEnumerable topics) + { + consumer.Subscribe(topics); + } + + public void Unsubscribe() + { + consumer.Unsubscribe(); + } + + public void Assign(TopicPartition partition) + { + consumer.Assign(partition); + } + + public void Assign(TopicPartitionOffset partition) + { + consumer.Assign(partition); + } + + public void Assign(IEnumerable partitions) + { + consumer.Assign(partitions); + } + + public void Assign(IEnumerable partitions) + { + consumer.Assign(partitions); + } + + public void IncrementalAssign(IEnumerable partitions) + { + consumer.IncrementalAssign(partitions); + } + + public void IncrementalAssign(IEnumerable partitions) + { + consumer.IncrementalAssign(partitions); + } + + public void Unassign() + { + consumer.Unassign(); + } + + public void IncrementalUnassign(IEnumerable partitions) + { + consumer.IncrementalUnassign(partitions); + } + + public void Seek(TopicPartitionOffset tpo) + { + consumer.Seek(tpo); + } + + public ConsumeResult Consume(int millisecondsTimeout) + { + return consumer.Consume(millisecondsTimeout); + } + + public ConsumeResult Consume(CancellationToken cancellationToken = default) + { + return consumer.Consume(cancellationToken); + } + + public ConsumeResult Consume(TimeSpan timeout) + { + return consumer.Consume(timeout); + } + + public void Pause(IEnumerable partitions) + { + consumer.Pause(partitions); + } + + public void Resume(IEnumerable partitions) + { + consumer.Resume(partitions); + } + + public List Commit() + { + return consumer.Commit(); + } + + public void Commit(ConsumeResult result) + { + consumer.Commit(result); + } + + public void Commit(IEnumerable offsets) + { + consumer.Commit(offsets); + } + + public void StoreOffset(ConsumeResult result) + { + consumer.StoreOffset(result); + } + + public void StoreOffset(TopicPartitionOffset offset) + { + consumer.StoreOffset(offset); + } + + public Offset Position(TopicPartition partition) + { + return consumer.Position(partition); + } + + public List Committed(TimeSpan timeout) + { + return consumer.Committed(timeout); + } + + public List Committed(IEnumerable partitions, TimeSpan timeout) + { + return consumer.Committed(partitions, timeout); + } + + public WatermarkOffsets GetWatermarkOffsets(TopicPartition topicPartition) + { + return consumer.GetWatermarkOffsets(topicPartition); + } + + public WatermarkOffsets QueryWatermarkOffsets(TopicPartition topicPartition, TimeSpan timeout) + { + return consumer.QueryWatermarkOffsets(topicPartition, timeout); + } + + public List OffsetsForTimes( + IEnumerable timestampsToSearch, + TimeSpan timeout) + { + return consumer.OffsetsForTimes(timestampsToSearch, timeout); + } + + public virtual void Close() + { + consumer.Close(); + } + + public virtual void Dispose() + { + consumer.Dispose(); + scope.Dispose(); + } +} diff --git a/Confluent.Kafka.DependencyInjection/Clients/ScopedProducer.cs b/Confluent.Kafka.DependencyInjection/Clients/ScopedProducer.cs new file mode 100644 index 0000000..8097596 --- /dev/null +++ b/Confluent.Kafka.DependencyInjection/Clients/ScopedProducer.cs @@ -0,0 +1,118 @@ +namespace Confluent.Kafka.DependencyInjection.Clients; + +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; + +class ScopedProducer : IProducer +{ + readonly IProducer producer; + readonly IDisposable scope; + + public ScopedProducer(IProducer producer, IDisposable scope) + { + this.producer = producer; + this.scope = scope; + } + + public Handle Handle => producer.Handle; + + public string Name => producer.Name; + + public int AddBrokers(string brokers) + { + return producer.AddBrokers(brokers); + } + + public void Produce( + string topic, + Message message, + Action>? deliveryHandler = null) + { + producer.Produce(topic, message, deliveryHandler); + } + + public void Produce( + TopicPartition topicPartition, + Message message, + Action>? deliveryHandler = null) + { + producer.Produce(topicPartition, message, deliveryHandler); + } + + public Task> ProduceAsync( + string topic, + Message message, + CancellationToken cancellationToken = default) + { + return producer.ProduceAsync(topic, message, cancellationToken); + } + + public Task> ProduceAsync( + TopicPartition topicPartition, + Message message, + CancellationToken cancellationToken = default) + { + return producer.ProduceAsync(topicPartition, message, cancellationToken); + } + + public void BeginTransaction() + { + producer.BeginTransaction(); + } + + public void CommitTransaction() + { + this.producer.CommitTransaction(); + } + + public void CommitTransaction(TimeSpan timeout) + { + producer.CommitTransaction(timeout); + } + + public void AbortTransaction() + { + this.producer.AbortTransaction(); + } + + public void AbortTransaction(TimeSpan timeout) + { + producer.AbortTransaction(timeout); + } + + public void InitTransactions(TimeSpan timeout) + { + producer.InitTransactions(timeout); + } + + public void SendOffsetsToTransaction( + IEnumerable offsets, + IConsumerGroupMetadata groupMetadata, + TimeSpan timeout) + { + producer.SendOffsetsToTransaction(offsets, groupMetadata, timeout); + } + + public int Poll(TimeSpan timeout) + { + return producer.Poll(timeout); + } + + public void Flush(CancellationToken cancellationToken = default) + { + producer.Flush(cancellationToken); + } + + public int Flush(TimeSpan timeout) + { + return producer.Flush(timeout); + } + + public void Dispose() + { + producer.Dispose(); + scope.Dispose(); + } +} diff --git a/Confluent.Kafka.DependencyInjection/Clients/ServiceConsumer.cs b/Confluent.Kafka.DependencyInjection/Clients/ServiceConsumer.cs index 02f3719..b033228 100644 --- a/Confluent.Kafka.DependencyInjection/Clients/ServiceConsumer.cs +++ b/Confluent.Kafka.DependencyInjection/Clients/ServiceConsumer.cs @@ -4,164 +4,46 @@ namespace Confluent.Kafka.DependencyInjection.Clients; using Microsoft.Extensions.DependencyInjection; -using System; using System.Collections.Generic; -using System.Threading; +using System.Linq; [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Performance", "CA1812", Justification = "Instantiated by container")] sealed class ServiceConsumer : ServiceConsumer { - public ServiceConsumer(IServiceScopeFactory scopes, ConfigWrapper config) - : base(scopes, config.Values) + public ServiceConsumer(IServiceScopeFactory scopes, ConfigWrapper config, ConfigWrapper? global = null) + : base(global?.Values.Concat(config.Values) ?? config.Values, scopes.CreateScope()) { } } -[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Performance", "CA1812", Justification = "Instantiated by container")] -class ServiceConsumer : IConsumer +class ServiceConsumer : ScopedConsumer { - readonly IConsumer consumer; - readonly IServiceScope scope; - readonly bool closeOnDispose; - bool closed; - public ServiceConsumer( - IServiceScopeFactory scopes, - IEnumerable>? config = null, - bool closeOnDispose = true) + public ServiceConsumer(IServiceScopeFactory scopes, ConfigWrapper config) + : this(config.Values, scopes.CreateScope()) { - scope = scopes.CreateScope(); - var adapter = scope.ServiceProvider.GetRequiredService>(); - - if (config != null) - { - foreach (var kvp in config) - { - adapter.ClientConfig[kvp.Key] = kvp.Value; - } - } - - consumer = adapter.Build(); - - this.closeOnDispose = closeOnDispose; } - public Handle Handle => consumer.Handle; - - public string Name => consumer.Name; - - public int AddBrokers(string brokers) => - consumer.AddBrokers(brokers); - - public List Subscription => consumer.Subscription; - - public List Assignment => consumer.Assignment; - - public string MemberId => consumer.MemberId; - - public IConsumerGroupMetadata ConsumerGroupMetadata => consumer.ConsumerGroupMetadata; - - public void Subscribe(string topic) => - consumer.Subscribe(topic); - - public void Subscribe(IEnumerable topics) => - consumer.Subscribe(topics); - - public void Unsubscribe() => - consumer.Unsubscribe(); - - public void Assign(TopicPartition partition) => - consumer.Assign(partition); - - public void Assign(TopicPartitionOffset partition) => - consumer.Assign(partition); - - public void Assign(IEnumerable partitions) => - consumer.Assign(partitions); - - public void Assign(IEnumerable partitions) => - consumer.Assign(partitions); - - public void IncrementalAssign(IEnumerable partitions) => - this.consumer.IncrementalAssign(partitions); - - public void IncrementalAssign(IEnumerable partitions) => - this.consumer.IncrementalAssign(partitions); - - public void Unassign() => - consumer.Unassign(); - - public void IncrementalUnassign(IEnumerable partitions) => - this.consumer.IncrementalUnassign(partitions); - - public void Seek(TopicPartitionOffset tpo) => - consumer.Seek(tpo); - - public ConsumeResult Consume(int millisecondsTimeout) => - consumer.Consume(millisecondsTimeout); - - public ConsumeResult Consume(CancellationToken cancellationToken = default) => - consumer.Consume(cancellationToken); - - public ConsumeResult Consume(TimeSpan timeout) => - consumer.Consume(timeout); - - public void Pause(IEnumerable partitions) => - consumer.Pause(partitions); - - public void Resume(IEnumerable partitions) => - consumer.Resume(partitions); - - - public List Commit() => - consumer.Commit(); - - public void Commit(ConsumeResult result) => - consumer.Commit(result); - - public void Commit(IEnumerable offsets) => - consumer.Commit(offsets); - - public void StoreOffset(ConsumeResult result) => - consumer.StoreOffset(result); - - public void StoreOffset(TopicPartitionOffset offset) => - consumer.StoreOffset(offset); - - public Offset Position(TopicPartition partition) => - consumer.Position(partition); - - public List Committed(TimeSpan timeout) => - consumer.Committed(timeout); - - public List Committed(IEnumerable partitions, TimeSpan timeout) => - consumer.Committed(partitions, timeout); - - public WatermarkOffsets GetWatermarkOffsets(TopicPartition topicPartition) => - consumer.GetWatermarkOffsets(topicPartition); - - public WatermarkOffsets QueryWatermarkOffsets(TopicPartition topicPartition, TimeSpan timeout) => - consumer.QueryWatermarkOffsets(topicPartition, timeout); - - public List OffsetsForTimes(IEnumerable timestampsToSearch, TimeSpan timeout) => - consumer.OffsetsForTimes(timestampsToSearch, timeout); + protected ServiceConsumer(IEnumerable> config, IServiceScope scope) + : base(new ConsumerAdapter(config, scope, dispose: false).Build(), scope) + { + } - public void Close() + public override void Close() { - consumer.Close(); - closed = true; + base.Close(); + this.closed = true; } - public void Dispose() + public override void Dispose() { - if (closeOnDispose && !closed) + if (!closed) { // Close when disposed by ServiceProvider. - consumer.Close(); + this.Close(); } - consumer.Dispose(); - scope.Dispose(); + base.Dispose(); } } diff --git a/Confluent.Kafka.DependencyInjection/Clients/ServiceProducer.cs b/Confluent.Kafka.DependencyInjection/Clients/ServiceProducer.cs index 29e5142..cad1c26 100644 --- a/Confluent.Kafka.DependencyInjection/Clients/ServiceProducer.cs +++ b/Confluent.Kafka.DependencyInjection/Clients/ServiceProducer.cs @@ -4,94 +4,27 @@ namespace Confluent.Kafka.DependencyInjection.Clients; using Microsoft.Extensions.DependencyInjection; -using System; using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; +using System.Linq; [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Performance", "CA1812", Justification = "Instantiated by container")] sealed class ServiceProducer : ServiceProducer { - public ServiceProducer(IServiceScopeFactory scopes, ConfigWrapper config) - : base(scopes, config.Values) + public ServiceProducer(IServiceScopeFactory scopes, ConfigWrapper config, ConfigWrapper? global = null) + : base(global?.Values.Concat(config.Values) ?? config.Values, scopes.CreateScope()) { } } -[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Performance", "CA1812", Justification = "Instantiated by container")] -class ServiceProducer : IProducer +class ServiceProducer : ScopedProducer { - readonly IProducer producer; - readonly IServiceScope scope; - - public ServiceProducer(IServiceScopeFactory scopes, IEnumerable>? config = null) + public ServiceProducer(IServiceScopeFactory scopes, ConfigWrapper config) + : this(config.Values, scopes.CreateScope()) { - scope = scopes.CreateScope(); - var adapter = scope.ServiceProvider.GetRequiredService>(); - - if (config != null) - { - foreach (var kvp in config) - { - adapter.ClientConfig[kvp.Key] = kvp.Value; - } - } - - producer = adapter.Build(); } - public Handle Handle => producer.Handle; - - public string Name => producer.Name; - - public int AddBrokers(string brokers) => - producer.AddBrokers(brokers); - - public void Produce(string topic, Message message, Action>? deliveryHandler = null) => - producer.Produce(topic, message, deliveryHandler); - - public void Produce(TopicPartition topicPartition, Message message, Action>? deliveryHandler = null) => - producer.Produce(topicPartition, message, deliveryHandler); - - public Task> ProduceAsync(string topic, Message message, CancellationToken cancellationToken = default) => - producer.ProduceAsync(topic, message, cancellationToken); - - public Task> ProduceAsync(TopicPartition topicPartition, Message message, CancellationToken cancellationToken = default) => - producer.ProduceAsync(topicPartition, message, cancellationToken); - - public void BeginTransaction() => - producer.BeginTransaction(); - - public void CommitTransaction() => - this.producer.CommitTransaction(); - - public void CommitTransaction(TimeSpan timeout) => - producer.CommitTransaction(timeout); - - public void AbortTransaction() => - this.producer.AbortTransaction(); - - public void AbortTransaction(TimeSpan timeout) => - producer.AbortTransaction(timeout); - - public void InitTransactions(TimeSpan timeout) => - producer.InitTransactions(timeout); - - public void SendOffsetsToTransaction(IEnumerable offsets, IConsumerGroupMetadata groupMetadata, TimeSpan timeout) => - producer.SendOffsetsToTransaction(offsets, groupMetadata, timeout); - - public int Poll(TimeSpan timeout) => - producer.Poll(timeout); - - public void Flush(CancellationToken cancellationToken = default) => - producer.Flush(cancellationToken); - - public int Flush(TimeSpan timeout) => - producer.Flush(timeout); - - public void Dispose() + protected ServiceProducer(IEnumerable> config, IServiceScope scope) + : base(new ProducerAdapter(config, scope, dispose: false).Build(), scope) { - producer.Dispose(); - scope.Dispose(); } } diff --git a/Confluent.Kafka.DependencyInjection/Handlers/HandlerHelper.cs b/Confluent.Kafka.DependencyInjection/Handlers/HandlerHelper.cs deleted file mode 100644 index 739aa43..0000000 --- a/Confluent.Kafka.DependencyInjection/Handlers/HandlerHelper.cs +++ /dev/null @@ -1,32 +0,0 @@ -namespace Confluent.Kafka.DependencyInjection.Handlers; - -using System; -using System.Collections.Generic; -using System.Linq; - -[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Performance", "CA1812", Justification = "Instantiated by container")] -sealed class HandlerHelper -{ - readonly IEnumerable handlers; - - public HandlerHelper(IEnumerable handlers) - { - this.handlers = handlers; - } - - public Action? Resolve( - Func> selector, - Action? _) => - Resolve(selector, (x, y) => x + y); - - public Func? Resolve( - Func> selector, - Func? _) => - Resolve(selector, (x, y) => x + y); - - TReturn? Resolve( - Func selector, - Func combine) - where TReturn : Delegate => - handlers.Select(selector).Aggregate(default, combine); -} diff --git a/Confluent.Kafka.DependencyInjection/KafkaFactory.cs b/Confluent.Kafka.DependencyInjection/KafkaFactory.cs index 2abe097..91c4e69 100644 --- a/Confluent.Kafka.DependencyInjection/KafkaFactory.cs +++ b/Confluent.Kafka.DependencyInjection/KafkaFactory.cs @@ -1,30 +1,42 @@ namespace Confluent.Kafka.DependencyInjection; +using Confluent.Kafka.DependencyInjection.Builders; using Confluent.Kafka.DependencyInjection.Clients; using Microsoft.Extensions.DependencyInjection; +using System; using System.Collections.Generic; +using System.Linq; [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Performance", "CA1812", Justification = "Instantiated by container")] sealed class KafkaFactory : IKafkaFactory { readonly IServiceScopeFactory scopes; + readonly ConfigWrapper? config; - public KafkaFactory(IServiceScopeFactory scopes) + public KafkaFactory(IServiceScopeFactory scopes, ConfigWrapper? config = null) { this.scopes = scopes; + this.config = config; } public IProducer CreateProducer( IEnumerable>? configuration = null) { - return new ServiceProducer(scopes, configuration); + return new ProducerAdapter(Merge(configuration), scopes.CreateScope(), dispose: true).Build(); } public IConsumer CreateConsumer( IEnumerable>? configuration = null) { - return new ServiceConsumer(scopes, configuration, closeOnDispose: false); + return new ConsumerAdapter(Merge(configuration), scopes.CreateScope(), dispose: true).Build(); + } + + IEnumerable> Merge(IEnumerable>? overrides) + { + return overrides != null + ? config?.Values.Concat(overrides) ?? overrides + : config?.Values ?? throw new InvalidOperationException("Configuration is required."); } } diff --git a/Confluent.Kafka.DependencyInjection/ServiceCollectionExtensions.cs b/Confluent.Kafka.DependencyInjection/ServiceCollectionExtensions.cs index 5c10a2b..e5bb3e4 100644 --- a/Confluent.Kafka.DependencyInjection/ServiceCollectionExtensions.cs +++ b/Confluent.Kafka.DependencyInjection/ServiceCollectionExtensions.cs @@ -124,9 +124,8 @@ static IServiceCollection AddAdapters(this IServiceCollection services) services.TryAddSingleton(); // These must be transient to consume scoped handlers. - services.TryAddTransient(typeof(HandlerHelper<>)); - services.TryAddTransient(typeof(ProducerAdapter<,>)); - services.TryAddTransient(typeof(ConsumerAdapter<,>)); + services.TryAddTransient(typeof(ProducerBuilder<,>), typeof(ProducerAdapter<,>)); + services.TryAddTransient(typeof(ConsumerBuilder<,>), typeof(ConsumerAdapter<,>)); return services; } diff --git a/Example/Program.cs b/Example/Program.cs index 34078a1..4a2def0 100644 --- a/Example/Program.cs +++ b/Example/Program.cs @@ -21,4 +21,19 @@ var producer = provider.GetRequiredService>(); var consumer = provider.GetRequiredService>(); -Console.WriteLine($"Resolved clients: {producer.Name}, {consumer.Name}"); +using var producer2 = provider.GetRequiredService>().Build(); +using var consumer2 = provider.GetRequiredService>().Build(); + +using var producer3 = provider.GetRequiredService() + .CreateProducer(); + +using var consumer3 = provider.GetRequiredService() + .CreateConsumer(); + +foreach (var client in new IClient[] { producer, producer2, producer3, consumer, consumer2, consumer3 }) +{ + Console.WriteLine($"Resolved client: {client.Name}"); +} + +consumer2.Close(); +consumer3.Close();