Skip to content

Commit

Permalink
Use scopes for config overriding (#9)
Browse files Browse the repository at this point in the history
* Use scopes for config overriding
* Example: Typed clients
  • Loading branch information
kmcclellan authored Feb 16, 2023
1 parent d66f125 commit 723c1ba
Show file tree
Hide file tree
Showing 11 changed files with 161 additions and 186 deletions.
18 changes: 0 additions & 18 deletions Confluent.Kafka.DependencyInjection/Clients/ConfigWrapper.cs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ namespace Confluent.Kafka.DependencyInjection.Clients;
using System.Collections.Generic;
using System.Linq;

[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Performance", "CA1812", Justification = "Instantiated by container")]
sealed class DIConsumerBuilder<TKey, TValue> : ConsumerBuilder<TKey, TValue>
{
readonly IEnumerable<IErrorHandler> errorHandlers;
Expand All @@ -21,7 +22,7 @@ sealed class DIConsumerBuilder<TKey, TValue> : ConsumerBuilder<TKey, TValue>
readonly IAsyncDeserializer<TValue>? asyncValueDeserializer;

public DIConsumerBuilder(
ConfigWrapper config,
ConsumerConfig config,
IEnumerable<IErrorHandler> errorHandlers,
IEnumerable<IStatisticsHandler> statisticsHandlers,
IEnumerable<ILogHandler> logHandlers,
Expand All @@ -32,7 +33,7 @@ public DIConsumerBuilder(
IDeserializer<TValue>? valueDeserializer = null,
IAsyncDeserializer<TKey>? asyncKeyDeserializer = null,
IAsyncDeserializer<TValue>? asyncValueDeserializer = null)
: base(config.Values)
: base(config)
{
this.errorHandlers = errorHandlers;
this.statisticsHandlers = statisticsHandlers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ namespace Confluent.Kafka.DependencyInjection.Clients;
using System.Collections.Generic;
using System.Linq;

[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Performance", "CA1812", Justification = "Instantiated by container")]
sealed class DIProducerBuilder<TKey, TValue> : ProducerBuilder<TKey, TValue>
{
readonly IEnumerable<IErrorHandler> errorHandlers;
Expand All @@ -17,15 +18,15 @@ sealed class DIProducerBuilder<TKey, TValue> : ProducerBuilder<TKey, TValue>
readonly IAsyncSerializer<TValue>? asyncValueSerializer;

public DIProducerBuilder(
ConfigWrapper config,
ProducerConfig config,
IEnumerable<IErrorHandler> errorHandlers,
IEnumerable<IStatisticsHandler> statisticsHandlers,
IEnumerable<ILogHandler> logHandlers,
ISerializer<TKey>? keySerializer = null,
ISerializer<TValue>? valueSerializer = null,
IAsyncSerializer<TKey>? asyncKeySerializer = null,
IAsyncSerializer<TValue>? asyncValueSerializer = null)
: base(config.Values)
: base(config)
{
this.errorHandlers = errorHandlers;
this.statisticsHandlers = statisticsHandlers;
Expand Down
37 changes: 13 additions & 24 deletions Confluent.Kafka.DependencyInjection/Clients/ScopedConsumer.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
namespace Confluent.Kafka.DependencyInjection.Clients;

using Confluent.Kafka.DependencyInjection.Handlers;

using Microsoft.Extensions.DependencyInjection;

using System;
Expand All @@ -13,11 +11,22 @@ class ScopedConsumer<TKey, TValue> : IConsumer<TKey, TValue>
readonly IConsumer<TKey, TValue> consumer;
readonly IDisposable scope;

public ScopedConsumer(IServiceScopeFactory scopes, IEnumerable<KeyValuePair<string, string>>? config = null)
public ScopedConsumer(IServiceScopeFactory scopes, IEnumerable<KeyValuePair<string, string>>? config)
{
IServiceScope scope;
this.scope = scope = scopes.CreateScope();
this.consumer = GetBuilder(scope.ServiceProvider, config).Build();

if (config != null)
{
var merged = scope.ServiceProvider.GetRequiredService<ConsumerConfig>();

foreach (var kvp in config)
{
merged.Set(kvp.Key, kvp.Value);
}
}

this.consumer = scope.ServiceProvider.GetRequiredService<ConsumerBuilder<TKey, TValue>>().Build();
}

public Handle Handle => consumer.Handle;
Expand All @@ -32,26 +41,6 @@ public ScopedConsumer(IServiceScopeFactory scopes, IEnumerable<KeyValuePair<stri

public IConsumerGroupMetadata ConsumerGroupMetadata => consumer.ConsumerGroupMetadata;

static ConsumerBuilder<TKey, TValue> GetBuilder(
IServiceProvider services,
IEnumerable<KeyValuePair<string, string>>? config)
{
return config != null
? new DIConsumerBuilder<TKey, TValue>(
new(config),
services.GetServices<IErrorHandler>(),
services.GetServices<IStatisticsHandler>(),
services.GetServices<ILogHandler>(),
services.GetServices<IPartitionsAssignedHandler>(),
services.GetServices<IPartitionsRevokedHandler>(),
services.GetServices<IOffsetsCommittedHandler>(),
services.GetService<IDeserializer<TKey>>(),
services.GetService<IDeserializer<TValue>>(),
services.GetService<IAsyncDeserializer<TKey>>(),
services.GetService<IAsyncDeserializer<TValue>>())
: services.GetRequiredService<ConsumerBuilder<TKey, TValue>>();
}

public int AddBrokers(string brokers)
{
return consumer.AddBrokers(brokers);
Expand Down
34 changes: 13 additions & 21 deletions Confluent.Kafka.DependencyInjection/Clients/ScopedProducer.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
namespace Confluent.Kafka.DependencyInjection.Clients;

using Confluent.Kafka.DependencyInjection.Handlers;

using Microsoft.Extensions.DependencyInjection;

using System;
Expand All @@ -14,34 +12,28 @@ class ScopedProducer<TKey, TValue> : IProducer<TKey, TValue>
readonly IProducer<TKey, TValue> producer;
readonly IDisposable scope;

public ScopedProducer(IServiceScopeFactory scopes, IEnumerable<KeyValuePair<string, string>>? config = null)
public ScopedProducer(IServiceScopeFactory scopes, IEnumerable<KeyValuePair<string, string>>? config)
{
IServiceScope scope;
this.scope = scope = scopes.CreateScope();
this.producer = GetBuilder(scope.ServiceProvider, config).Build();

if (config != null)
{
var merged = scope.ServiceProvider.GetRequiredService<ProducerConfig>();

foreach (var kvp in config)
{
merged.Set(kvp.Key, kvp.Value);
}
}

this.producer = scope.ServiceProvider.GetRequiredService<ProducerBuilder<TKey, TValue>>().Build();
}

public Handle Handle => producer.Handle;

public string Name => producer.Name;

static ProducerBuilder<TKey, TValue> GetBuilder(
IServiceProvider services,
IEnumerable<KeyValuePair<string, string>>? config)
{
return config != null
? new DIProducerBuilder<TKey, TValue>(
new(config),
services.GetServices<IErrorHandler>(),
services.GetServices<IStatisticsHandler>(),
services.GetServices<ILogHandler>(),
services.GetService<ISerializer<TKey>>(),
services.GetService<ISerializer<TValue>>(),
services.GetService<IAsyncSerializer<TKey>>(),
services.GetService<IAsyncSerializer<TValue>>())
: services.GetRequiredService<ProducerBuilder<TKey, TValue>>();
}

public int AddBrokers(string brokers)
{
return producer.AddBrokers(brokers);
Expand Down
20 changes: 2 additions & 18 deletions Confluent.Kafka.DependencyInjection/Clients/ServiceConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,12 @@ namespace Confluent.Kafka.DependencyInjection.Clients;

using Microsoft.Extensions.DependencyInjection;

using System.Collections.Generic;
using System.Linq;

[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Performance", "CA1812", Justification = "Instantiated by container")]
sealed class ServiceConsumer<TReceiver, TKey, TValue> : ServiceConsumer<TKey, TValue>
{
public ServiceConsumer(IServiceScopeFactory scopes, ConfigWrapper<TReceiver> config, ConfigWrapper? global = null)
: base(scopes, global?.Values.Concat(config.Values) ?? config.Values)
{
}
}

class ServiceConsumer<TKey, TValue> : ScopedConsumer<TKey, TValue>
sealed class ServiceConsumer<TKey, TValue> : ScopedConsumer<TKey, TValue>
{
bool closed;

public ServiceConsumer(IServiceScopeFactory scopes, ConfigWrapper config)
: base(scopes, config.Values)
{
}

protected ServiceConsumer(IServiceScopeFactory scopes, IEnumerable<KeyValuePair<string, string>> config)
public ServiceConsumer(IServiceScopeFactory scopes, ConsumerConfig config)
: base(scopes, config)
{
}
Expand Down
20 changes: 2 additions & 18 deletions Confluent.Kafka.DependencyInjection/Clients/ServiceProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,10 @@ namespace Confluent.Kafka.DependencyInjection.Clients;

using Microsoft.Extensions.DependencyInjection;

using System.Collections.Generic;
using System.Linq;

[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Performance", "CA1812", Justification = "Instantiated by container")]
sealed class ServiceProducer<TReceiver, TKey, TValue> : ServiceProducer<TKey, TValue>
sealed class ServiceProducer<TKey, TValue> : ScopedProducer<TKey, TValue>
{
public ServiceProducer(IServiceScopeFactory scopes, ConfigWrapper<TReceiver> config, ConfigWrapper? global = null)
: base(scopes, global?.Values.Concat(config.Values) ?? config.Values)
{
}
}

class ServiceProducer<TKey, TValue> : ScopedProducer<TKey, TValue>
{
public ServiceProducer(IServiceScopeFactory scopes, ConfigWrapper config)
: this(scopes, config.Values)
{
}

protected ServiceProducer(IServiceScopeFactory scopes, IEnumerable<KeyValuePair<string, string>> config)
public ServiceProducer(IServiceScopeFactory scopes, ProducerConfig config)
: base(scopes, config)
{
}
Expand Down
33 changes: 0 additions & 33 deletions Confluent.Kafka.DependencyInjection/GenericServiceMapper.cs

This file was deleted.

16 changes: 3 additions & 13 deletions Confluent.Kafka.DependencyInjection/KafkaFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,26 @@ namespace Confluent.Kafka.DependencyInjection;
using Microsoft.Extensions.DependencyInjection;

using System.Collections.Generic;
using System.Linq;

[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Performance", "CA1812", Justification = "Instantiated by container")]
sealed class KafkaFactory : IKafkaFactory
{
readonly IServiceScopeFactory scopes;
readonly ConfigWrapper? config;

public KafkaFactory(IServiceScopeFactory scopes, ConfigWrapper? config = null)
public KafkaFactory(IServiceScopeFactory scopes)
{
this.scopes = scopes;
this.config = config;
}

public IProducer<TKey, TValue> CreateProducer<TKey, TValue>(
IEnumerable<KeyValuePair<string, string>>? configuration = null)
{
return new ScopedProducer<TKey, TValue>(scopes, Merge(configuration));
return new ScopedProducer<TKey, TValue>(scopes, configuration);
}

public IConsumer<TKey, TValue> CreateConsumer<TKey, TValue>(
IEnumerable<KeyValuePair<string, string>>? configuration = null)
{
return new ScopedConsumer<TKey, TValue>(scopes, Merge(configuration));
}

IEnumerable<KeyValuePair<string, string>>? Merge(IEnumerable<KeyValuePair<string, string>>? configuration)
{
return configuration != null
? this.config?.Values.Concat(configuration) ?? configuration
: null;
return new ScopedConsumer<TKey, TValue>(scopes, configuration);
}
}
Loading

0 comments on commit 723c1ba

Please sign in to comment.