From d66f125af8810bd210ff1e7ddaa3469d94fb6d96 Mon Sep 17 00:00:00 2001 From: Kyle McClellan Date: Wed, 15 Feb 2023 18:37:04 -0500 Subject: [PATCH] Inject builders (take 2) (#8) * FIX: Support handler overrides * Resolve builders within scope * Consolidate builder namespace --- .../Builders/ConsumerAdapter.cs | 67 ---------------- .../Builders/ProducerAdapter.cs | 61 --------------- .../{Builders => Clients}/ConfigWrapper.cs | 2 +- .../Clients/DIConsumerBuilder.cs | 76 +++++++++++++++++++ .../Clients/DIProducerBuilder.cs | 62 +++++++++++++++ .../Clients/ScopedConsumer.cs | 31 +++++++- .../Clients/ScopedProducer.cs | 28 ++++++- .../Clients/ServiceConsumer.cs | 10 +-- .../Clients/ServiceProducer.cs | 10 +-- .../KafkaFactory.cs | 14 ++-- .../ServiceCollectionExtensions.cs | 5 +- 11 files changed, 208 insertions(+), 158 deletions(-) delete mode 100644 Confluent.Kafka.DependencyInjection/Builders/ConsumerAdapter.cs delete mode 100644 Confluent.Kafka.DependencyInjection/Builders/ProducerAdapter.cs rename Confluent.Kafka.DependencyInjection/{Builders => Clients}/ConfigWrapper.cs (87%) create mode 100644 Confluent.Kafka.DependencyInjection/Clients/DIConsumerBuilder.cs create mode 100644 Confluent.Kafka.DependencyInjection/Clients/DIProducerBuilder.cs diff --git a/Confluent.Kafka.DependencyInjection/Builders/ConsumerAdapter.cs b/Confluent.Kafka.DependencyInjection/Builders/ConsumerAdapter.cs deleted file mode 100644 index 1d50ad9..0000000 --- a/Confluent.Kafka.DependencyInjection/Builders/ConsumerAdapter.cs +++ /dev/null @@ -1,67 +0,0 @@ -namespace Confluent.Kafka.DependencyInjection.Builders; - -using Confluent.Kafka.DependencyInjection.Clients; -using Confluent.Kafka.DependencyInjection.Handlers; -using Confluent.Kafka.SyncOverAsync; - -using Microsoft.Extensions.DependencyInjection; - -using System; -using System.Collections.Generic; -using System.Linq; - -sealed class ConsumerAdapter : ConsumerBuilder -{ - readonly IDisposable? scope; - - public ConsumerAdapter(IServiceScopeFactory scopes, ConfigWrapper config) - : this(config.Values, scopes.CreateScope(), dispose: true) - { - } - - internal ConsumerAdapter( - IEnumerable> config, - IServiceScope scope, - bool dispose) - : base(config) - { - ErrorHandler = scope.ServiceProvider.GetServices() - .Aggregate(default(Action), (x, y) => x + y.OnError); - - StatisticsHandler = scope.ServiceProvider.GetServices() - .Aggregate(default(Action), (x, y) => x + y.OnStatistics); - - LogHandler = scope.ServiceProvider.GetServices() - .Aggregate(default(Action), (x, y) => x + y.OnLog); - - PartitionsAssignedHandler = scope.ServiceProvider.GetServices() - .Aggregate( - default(Func, IEnumerable>), - (x, y) => x + y.OnPartitionsAssigned); - - PartitionsRevokedHandler = scope.ServiceProvider.GetServices() - .Aggregate( - default(Func, IEnumerable>), - (x, y) => x + y.OnPartitionsRevoked); - - OffsetsCommittedHandler = scope.ServiceProvider.GetServices() - .Aggregate(default(Action), (x, y) => x + y.OnOffsetsCommitted); - - KeyDeserializer = scope.ServiceProvider.GetService>() ?? - scope.ServiceProvider.GetService>()?.AsSyncOverAsync(); - - ValueDeserializer = scope.ServiceProvider.GetService>() ?? - scope.ServiceProvider.GetService>()?.AsSyncOverAsync(); - - if (dispose) - { - this.scope = scope; - } - } - - public override IConsumer Build() - { - var consumer = base.Build(); - return scope != null ? new ScopedConsumer(consumer, scope) : consumer; - } -} diff --git a/Confluent.Kafka.DependencyInjection/Builders/ProducerAdapter.cs b/Confluent.Kafka.DependencyInjection/Builders/ProducerAdapter.cs deleted file mode 100644 index 2078cb0..0000000 --- a/Confluent.Kafka.DependencyInjection/Builders/ProducerAdapter.cs +++ /dev/null @@ -1,61 +0,0 @@ -namespace Confluent.Kafka.DependencyInjection.Builders; - -using Confluent.Kafka.DependencyInjection.Clients; -using Confluent.Kafka.DependencyInjection.Handlers; - -using Microsoft.Extensions.DependencyInjection; - -using System; -using System.Collections.Generic; -using System.Linq; - -sealed class ProducerAdapter : ProducerBuilder -{ - readonly IDisposable? scope; - - public ProducerAdapter(IServiceScopeFactory scopes, ConfigWrapper config) - : this(config.Values, scopes.CreateScope(), dispose: true) - { - } - - internal ProducerAdapter( - IEnumerable> config, - IServiceScope scope, - bool dispose) - : base(config) - { - ErrorHandler = scope.ServiceProvider.GetServices() - .Aggregate(default(Action), (x, y) => x + y.OnError); - - StatisticsHandler = scope.ServiceProvider.GetServices() - .Aggregate(default(Action), (x, y) => x + y.OnStatistics); - - LogHandler = scope.ServiceProvider.GetServices() - .Aggregate(default(Action), (x, y) => x + y.OnLog); - - KeySerializer = scope.ServiceProvider.GetService>(); - ValueSerializer = scope.ServiceProvider.GetService>(); - - // Setting both types of serializers is an error. - if (KeySerializer == null) - { - AsyncKeySerializer = scope.ServiceProvider.GetService>(); - } - - if (ValueSerializer == null) - { - AsyncValueSerializer =scope.ServiceProvider.GetService>(); - } - - if (dispose) - { - this.scope = scope; - } - } - - public override IProducer Build() - { - var producer = base.Build(); - return scope != null ? new ScopedProducer(producer, scope) : producer; - } -} diff --git a/Confluent.Kafka.DependencyInjection/Builders/ConfigWrapper.cs b/Confluent.Kafka.DependencyInjection/Clients/ConfigWrapper.cs similarity index 87% rename from Confluent.Kafka.DependencyInjection/Builders/ConfigWrapper.cs rename to Confluent.Kafka.DependencyInjection/Clients/ConfigWrapper.cs index 4e7e24a..ad4ac93 100644 --- a/Confluent.Kafka.DependencyInjection/Builders/ConfigWrapper.cs +++ b/Confluent.Kafka.DependencyInjection/Clients/ConfigWrapper.cs @@ -1,4 +1,4 @@ -namespace Confluent.Kafka.DependencyInjection.Builders; +namespace Confluent.Kafka.DependencyInjection.Clients; using System.Collections.Generic; diff --git a/Confluent.Kafka.DependencyInjection/Clients/DIConsumerBuilder.cs b/Confluent.Kafka.DependencyInjection/Clients/DIConsumerBuilder.cs new file mode 100644 index 0000000..64001ea --- /dev/null +++ b/Confluent.Kafka.DependencyInjection/Clients/DIConsumerBuilder.cs @@ -0,0 +1,76 @@ +namespace Confluent.Kafka.DependencyInjection.Clients; + +using Confluent.Kafka.DependencyInjection.Handlers; +using Confluent.Kafka.SyncOverAsync; + +using System; +using System.Collections.Generic; +using System.Linq; + +sealed class DIConsumerBuilder : ConsumerBuilder +{ + readonly IEnumerable errorHandlers; + readonly IEnumerable statisticsHandlers; + readonly IEnumerable logHandlers; + readonly IEnumerable assignHandlers; + readonly IEnumerable revokeHandlers; + readonly IEnumerable commitHandlers; + readonly IDeserializer? keyDeserializer; + readonly IDeserializer? valueDeserializer; + readonly IAsyncDeserializer? asyncKeyDeserializer; + readonly IAsyncDeserializer? asyncValueDeserializer; + + public DIConsumerBuilder( + ConfigWrapper config, + IEnumerable errorHandlers, + IEnumerable statisticsHandlers, + IEnumerable logHandlers, + IEnumerable assignHandlers, + IEnumerable revokeHandlers, + IEnumerable commitHandlers, + IDeserializer? keyDeserializer = null, + IDeserializer? valueDeserializer = null, + IAsyncDeserializer? asyncKeyDeserializer = null, + IAsyncDeserializer? asyncValueDeserializer = null) + : base(config.Values) + { + this.errorHandlers = errorHandlers; + this.statisticsHandlers = statisticsHandlers; + this.logHandlers = logHandlers; + this.assignHandlers = assignHandlers; + this.revokeHandlers = revokeHandlers; + this.commitHandlers = commitHandlers; + this.keyDeserializer = keyDeserializer; + this.valueDeserializer = valueDeserializer; + this.asyncKeyDeserializer = asyncKeyDeserializer; + this.asyncValueDeserializer = asyncValueDeserializer; + } + + public override IConsumer Build() + { + ErrorHandler ??= errorHandlers.Aggregate(default(Action), (x, y) => x + y.OnError); + + StatisticsHandler ??= statisticsHandlers.Aggregate( + default(Action), + (x, y) => x + y.OnStatistics); + + LogHandler ??= logHandlers.Aggregate(default(Action), (x, y) => x + y.OnLog); + + PartitionsAssignedHandler ??= assignHandlers.Aggregate( + default(Func, IEnumerable>), + (x, y) => x + y.OnPartitionsAssigned); + + PartitionsRevokedHandler ??= revokeHandlers.Aggregate( + default(Func, IEnumerable>), + (x, y) => x + y.OnPartitionsRevoked); + + OffsetsCommittedHandler ??= commitHandlers.Aggregate( + default(Action), + (x, y) => x + y.OnOffsetsCommitted); + + KeyDeserializer ??= keyDeserializer ?? asyncKeyDeserializer?.AsSyncOverAsync(); + ValueDeserializer ??= valueDeserializer ?? asyncValueDeserializer?.AsSyncOverAsync(); + + return base.Build(); + } +} diff --git a/Confluent.Kafka.DependencyInjection/Clients/DIProducerBuilder.cs b/Confluent.Kafka.DependencyInjection/Clients/DIProducerBuilder.cs new file mode 100644 index 0000000..4cdc465 --- /dev/null +++ b/Confluent.Kafka.DependencyInjection/Clients/DIProducerBuilder.cs @@ -0,0 +1,62 @@ +namespace Confluent.Kafka.DependencyInjection.Clients; + +using Confluent.Kafka.DependencyInjection.Handlers; + +using System; +using System.Collections.Generic; +using System.Linq; + +sealed class DIProducerBuilder : ProducerBuilder +{ + readonly IEnumerable errorHandlers; + readonly IEnumerable statisticsHandlers; + readonly IEnumerable logHandlers; + readonly ISerializer? keySerializer; + readonly ISerializer? valueSerializer; + readonly IAsyncSerializer? asyncKeySerializer; + readonly IAsyncSerializer? asyncValueSerializer; + + public DIProducerBuilder( + ConfigWrapper config, + IEnumerable errorHandlers, + IEnumerable statisticsHandlers, + IEnumerable logHandlers, + ISerializer? keySerializer = null, + ISerializer? valueSerializer = null, + IAsyncSerializer? asyncKeySerializer = null, + IAsyncSerializer? asyncValueSerializer = null) + : base(config.Values) + { + this.errorHandlers = errorHandlers; + this.statisticsHandlers = statisticsHandlers; + this.logHandlers = logHandlers; + this.keySerializer = keySerializer; + this.valueSerializer = valueSerializer; + this.asyncKeySerializer = asyncKeySerializer; + this.asyncValueSerializer = asyncValueSerializer; + } + + public override IProducer Build() + { + ErrorHandler ??= errorHandlers.Aggregate(default(Action), (x, y) => x + y.OnError); + + StatisticsHandler ??= statisticsHandlers.Aggregate( + default(Action), + (x, y) => x + y.OnStatistics); + + LogHandler ??= logHandlers.Aggregate(default(Action), (x, y) => x + y.OnLog); + + // Setting both types of serializers is an error. + if ((KeySerializer ??= keySerializer) == null) + { + AsyncKeySerializer ??= asyncKeySerializer; + } + + if ((ValueSerializer ??= valueSerializer) == null) + { + AsyncValueSerializer ??= asyncValueSerializer; + } + + return base.Build(); + } +} diff --git a/Confluent.Kafka.DependencyInjection/Clients/ScopedConsumer.cs b/Confluent.Kafka.DependencyInjection/Clients/ScopedConsumer.cs index fb2037d..63800c9 100644 --- a/Confluent.Kafka.DependencyInjection/Clients/ScopedConsumer.cs +++ b/Confluent.Kafka.DependencyInjection/Clients/ScopedConsumer.cs @@ -1,5 +1,9 @@ namespace Confluent.Kafka.DependencyInjection.Clients; +using Confluent.Kafka.DependencyInjection.Handlers; + +using Microsoft.Extensions.DependencyInjection; + using System; using System.Collections.Generic; using System.Threading; @@ -9,10 +13,11 @@ class ScopedConsumer : IConsumer readonly IConsumer consumer; readonly IDisposable scope; - public ScopedConsumer(IConsumer consumer, IDisposable scope) + public ScopedConsumer(IServiceScopeFactory scopes, IEnumerable>? config = null) { - this.consumer = consumer; - this.scope = scope; + IServiceScope scope; + this.scope = scope = scopes.CreateScope(); + this.consumer = GetBuilder(scope.ServiceProvider, config).Build(); } public Handle Handle => consumer.Handle; @@ -27,6 +32,26 @@ public ScopedConsumer(IConsumer consumer, IDisposable scope) public IConsumerGroupMetadata ConsumerGroupMetadata => consumer.ConsumerGroupMetadata; + static ConsumerBuilder GetBuilder( + IServiceProvider services, + IEnumerable>? config) + { + return config != null + ? new DIConsumerBuilder( + new(config), + services.GetServices(), + services.GetServices(), + services.GetServices(), + services.GetServices(), + services.GetServices(), + services.GetServices(), + services.GetService>(), + services.GetService>(), + services.GetService>(), + services.GetService>()) + : services.GetRequiredService>(); + } + public int AddBrokers(string brokers) { return consumer.AddBrokers(brokers); diff --git a/Confluent.Kafka.DependencyInjection/Clients/ScopedProducer.cs b/Confluent.Kafka.DependencyInjection/Clients/ScopedProducer.cs index 8097596..18263ce 100644 --- a/Confluent.Kafka.DependencyInjection/Clients/ScopedProducer.cs +++ b/Confluent.Kafka.DependencyInjection/Clients/ScopedProducer.cs @@ -1,5 +1,9 @@ namespace Confluent.Kafka.DependencyInjection.Clients; +using Confluent.Kafka.DependencyInjection.Handlers; + +using Microsoft.Extensions.DependencyInjection; + using System; using System.Collections.Generic; using System.Threading; @@ -10,16 +14,34 @@ class ScopedProducer : IProducer readonly IProducer producer; readonly IDisposable scope; - public ScopedProducer(IProducer producer, IDisposable scope) + public ScopedProducer(IServiceScopeFactory scopes, IEnumerable>? config = null) { - this.producer = producer; - this.scope = scope; + IServiceScope scope; + this.scope = scope = scopes.CreateScope(); + this.producer = GetBuilder(scope.ServiceProvider, config).Build(); } public Handle Handle => producer.Handle; public string Name => producer.Name; + static ProducerBuilder GetBuilder( + IServiceProvider services, + IEnumerable>? config) + { + return config != null + ? new DIProducerBuilder( + new(config), + services.GetServices(), + services.GetServices(), + services.GetServices(), + services.GetService>(), + services.GetService>(), + services.GetService>(), + services.GetService>()) + : services.GetRequiredService>(); + } + public int AddBrokers(string brokers) { return producer.AddBrokers(brokers); diff --git a/Confluent.Kafka.DependencyInjection/Clients/ServiceConsumer.cs b/Confluent.Kafka.DependencyInjection/Clients/ServiceConsumer.cs index b033228..dd3469a 100644 --- a/Confluent.Kafka.DependencyInjection/Clients/ServiceConsumer.cs +++ b/Confluent.Kafka.DependencyInjection/Clients/ServiceConsumer.cs @@ -1,7 +1,5 @@ namespace Confluent.Kafka.DependencyInjection.Clients; -using Confluent.Kafka.DependencyInjection.Builders; - using Microsoft.Extensions.DependencyInjection; using System.Collections.Generic; @@ -11,7 +9,7 @@ namespace Confluent.Kafka.DependencyInjection.Clients; sealed class ServiceConsumer : ServiceConsumer { public ServiceConsumer(IServiceScopeFactory scopes, ConfigWrapper config, ConfigWrapper? global = null) - : base(global?.Values.Concat(config.Values) ?? config.Values, scopes.CreateScope()) + : base(scopes, global?.Values.Concat(config.Values) ?? config.Values) { } } @@ -21,12 +19,12 @@ class ServiceConsumer : ScopedConsumer bool closed; public ServiceConsumer(IServiceScopeFactory scopes, ConfigWrapper config) - : this(config.Values, scopes.CreateScope()) + : base(scopes, config.Values) { } - protected ServiceConsumer(IEnumerable> config, IServiceScope scope) - : base(new ConsumerAdapter(config, scope, dispose: false).Build(), scope) + protected ServiceConsumer(IServiceScopeFactory scopes, IEnumerable> config) + : base(scopes, config) { } diff --git a/Confluent.Kafka.DependencyInjection/Clients/ServiceProducer.cs b/Confluent.Kafka.DependencyInjection/Clients/ServiceProducer.cs index cad1c26..166a568 100644 --- a/Confluent.Kafka.DependencyInjection/Clients/ServiceProducer.cs +++ b/Confluent.Kafka.DependencyInjection/Clients/ServiceProducer.cs @@ -1,7 +1,5 @@ namespace Confluent.Kafka.DependencyInjection.Clients; -using Confluent.Kafka.DependencyInjection.Builders; - using Microsoft.Extensions.DependencyInjection; using System.Collections.Generic; @@ -11,7 +9,7 @@ namespace Confluent.Kafka.DependencyInjection.Clients; sealed class ServiceProducer : ServiceProducer { public ServiceProducer(IServiceScopeFactory scopes, ConfigWrapper config, ConfigWrapper? global = null) - : base(global?.Values.Concat(config.Values) ?? config.Values, scopes.CreateScope()) + : base(scopes, global?.Values.Concat(config.Values) ?? config.Values) { } } @@ -19,12 +17,12 @@ public ServiceProducer(IServiceScopeFactory scopes, ConfigWrapper con class ServiceProducer : ScopedProducer { public ServiceProducer(IServiceScopeFactory scopes, ConfigWrapper config) - : this(config.Values, scopes.CreateScope()) + : this(scopes, config.Values) { } - protected ServiceProducer(IEnumerable> config, IServiceScope scope) - : base(new ProducerAdapter(config, scope, dispose: false).Build(), scope) + protected ServiceProducer(IServiceScopeFactory scopes, IEnumerable> config) + : base(scopes, config) { } } diff --git a/Confluent.Kafka.DependencyInjection/KafkaFactory.cs b/Confluent.Kafka.DependencyInjection/KafkaFactory.cs index 91c4e69..6f795b2 100644 --- a/Confluent.Kafka.DependencyInjection/KafkaFactory.cs +++ b/Confluent.Kafka.DependencyInjection/KafkaFactory.cs @@ -1,11 +1,9 @@ namespace Confluent.Kafka.DependencyInjection; -using Confluent.Kafka.DependencyInjection.Builders; using Confluent.Kafka.DependencyInjection.Clients; using Microsoft.Extensions.DependencyInjection; -using System; using System.Collections.Generic; using System.Linq; @@ -24,19 +22,19 @@ public KafkaFactory(IServiceScopeFactory scopes, ConfigWrapper? config = null) public IProducer CreateProducer( IEnumerable>? configuration = null) { - return new ProducerAdapter(Merge(configuration), scopes.CreateScope(), dispose: true).Build(); + return new ScopedProducer(scopes, Merge(configuration)); } public IConsumer CreateConsumer( IEnumerable>? configuration = null) { - return new ConsumerAdapter(Merge(configuration), scopes.CreateScope(), dispose: true).Build(); + return new ScopedConsumer(scopes, Merge(configuration)); } - IEnumerable> Merge(IEnumerable>? overrides) + IEnumerable>? Merge(IEnumerable>? configuration) { - return overrides != null - ? config?.Values.Concat(overrides) ?? overrides - : config?.Values ?? throw new InvalidOperationException("Configuration is required."); + return configuration != null + ? this.config?.Values.Concat(configuration) ?? configuration + : null; } } diff --git a/Confluent.Kafka.DependencyInjection/ServiceCollectionExtensions.cs b/Confluent.Kafka.DependencyInjection/ServiceCollectionExtensions.cs index e5bb3e4..0019ed8 100644 --- a/Confluent.Kafka.DependencyInjection/ServiceCollectionExtensions.cs +++ b/Confluent.Kafka.DependencyInjection/ServiceCollectionExtensions.cs @@ -1,6 +1,5 @@ namespace Confluent.Kafka.DependencyInjection; -using Confluent.Kafka.DependencyInjection.Builders; using Confluent.Kafka.DependencyInjection.Clients; using Confluent.Kafka.DependencyInjection.Handlers; using Confluent.Kafka.DependencyInjection.Handlers.Default; @@ -124,8 +123,8 @@ static IServiceCollection AddAdapters(this IServiceCollection services) services.TryAddSingleton(); // These must be transient to consume scoped handlers. - services.TryAddTransient(typeof(ProducerBuilder<,>), typeof(ProducerAdapter<,>)); - services.TryAddTransient(typeof(ConsumerBuilder<,>), typeof(ConsumerAdapter<,>)); + services.TryAddTransient(typeof(ProducerBuilder<,>), typeof(DIProducerBuilder<,>)); + services.TryAddTransient(typeof(ConsumerBuilder<,>), typeof(DIConsumerBuilder<,>)); return services; }