Skip to content

Commit

Permalink
Inject client builders (#7)
Browse files Browse the repository at this point in the history
* Consolidate config overrides
* Register builders
* Remove handler helper
* Scoped clients
* Example: Resolve builders/factory
  • Loading branch information
kmcclellan authored Feb 13, 2023
1 parent ce27501 commit a94e46e
Show file tree
Hide file tree
Showing 10 changed files with 458 additions and 306 deletions.
83 changes: 52 additions & 31 deletions Confluent.Kafka.DependencyInjection/Builders/ConsumerAdapter.cs
Original file line number Diff line number Diff line change
@@ -1,46 +1,67 @@
namespace Confluent.Kafka.DependencyInjection.Builders;

using Confluent.Kafka.DependencyInjection.Clients;
using Confluent.Kafka.DependencyInjection.Handlers;
using Confluent.Kafka.SyncOverAsync;

using Microsoft.Extensions.DependencyInjection;

using System;
using System.Collections.Generic;
using System.Linq;

[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Performance", "CA1812", Justification = "Instantiated by container")]
sealed class ConsumerAdapter<TKey, TValue> : ConsumerBuilder<TKey, TValue>
{
public IDictionary<string, string> ClientConfig { get; } = new Dictionary<string, string>();

public ConsumerAdapter(
HandlerHelper<IErrorHandler> errorHelper,
HandlerHelper<IStatisticsHandler> statisticsHelper,
HandlerHelper<ILogHandler> logHelper,
HandlerHelper<IPartitionsAssignedHandler> assignHelper,
HandlerHelper<IPartitionsRevokedHandler> revokeHelper,
HandlerHelper<IOffsetsCommittedHandler> commitHelper,
ConfigWrapper? config = null,
IDeserializer<TKey>? keyDeserializer = null,
IDeserializer<TValue>? valueDeserializer = null,
IAsyncDeserializer<TKey>? asyncKeyDeserializer = null,
IAsyncDeserializer<TValue>? asyncValueDeserializer = null)
: base(config?.Values)
readonly IDisposable? scope;

public ConsumerAdapter(IServiceScopeFactory scopes, ConfigWrapper config)
: this(config.Values, scopes.CreateScope(), dispose: true)
{
}

internal ConsumerAdapter(
IEnumerable<KeyValuePair<string, string>> config,
IServiceScope scope,
bool dispose)
: base(config)
{
ErrorHandler = errorHelper.Resolve(x => x.OnError, ErrorHandler);
StatisticsHandler = statisticsHelper.Resolve(x => x.OnStatistics, StatisticsHandler);
LogHandler = logHelper.Resolve(x => x.OnLog, LogHandler);
PartitionsAssignedHandler = assignHelper.Resolve(x => x.OnPartitionsAssigned, PartitionsAssignedHandler);
PartitionsRevokedHandler = revokeHelper.Resolve(x => x.OnPartitionsRevoked, PartitionsRevokedHandler);
OffsetsCommittedHandler = commitHelper.Resolve(x => x.OnOffsetsCommitted, OffsetsCommittedHandler);
KeyDeserializer = keyDeserializer ?? asyncKeyDeserializer?.AsSyncOverAsync();
ValueDeserializer = valueDeserializer ?? asyncValueDeserializer?.AsSyncOverAsync();

if (Config != null)
ErrorHandler = scope.ServiceProvider.GetServices<IErrorHandler>()
.Aggregate(default(Action<IClient, Error>), (x, y) => x + y.OnError);

StatisticsHandler = scope.ServiceProvider.GetServices<IStatisticsHandler>()
.Aggregate(default(Action<IClient, string>), (x, y) => x + y.OnStatistics);

LogHandler = scope.ServiceProvider.GetServices<ILogHandler>()
.Aggregate(default(Action<IClient, LogMessage>), (x, y) => x + y.OnLog);

PartitionsAssignedHandler = scope.ServiceProvider.GetServices<IPartitionsAssignedHandler>()
.Aggregate(
default(Func<IClient, IEnumerable<TopicPartition>, IEnumerable<TopicPartitionOffset>>),
(x, y) => x + y.OnPartitionsAssigned);

PartitionsRevokedHandler = scope.ServiceProvider.GetServices<IPartitionsRevokedHandler>()
.Aggregate(
default(Func<IClient, IEnumerable<TopicPartitionOffset>, IEnumerable<TopicPartitionOffset>>),
(x, y) => x + y.OnPartitionsRevoked);

OffsetsCommittedHandler = scope.ServiceProvider.GetServices<IOffsetsCommittedHandler>()
.Aggregate(default(Action<IClient, CommittedOffsets>), (x, y) => x + y.OnOffsetsCommitted);

KeyDeserializer = scope.ServiceProvider.GetService<IDeserializer<TKey>>() ??
scope.ServiceProvider.GetService<IAsyncDeserializer<TKey>>()?.AsSyncOverAsync();

ValueDeserializer = scope.ServiceProvider.GetService<IDeserializer<TValue>>() ??
scope.ServiceProvider.GetService<IAsyncDeserializer<TValue>>()?.AsSyncOverAsync();

if (dispose)
{
foreach (var kvp in Config)
{
ClientConfig[kvp.Key] = kvp.Value;
}
this.scope = scope;
}
}

Config = ClientConfig;
public override IConsumer<TKey, TValue> Build()
{
var consumer = base.Build();
return scope != null ? new ScopedConsumer<TKey, TValue>(consumer, scope) : consumer;
}
}
69 changes: 43 additions & 26 deletions Confluent.Kafka.DependencyInjection/Builders/ProducerAdapter.cs
Original file line number Diff line number Diff line change
@@ -1,44 +1,61 @@
namespace Confluent.Kafka.DependencyInjection.Builders;

using Confluent.Kafka.DependencyInjection.Clients;
using Confluent.Kafka.DependencyInjection.Handlers;

using Microsoft.Extensions.DependencyInjection;

using System;
using System.Collections.Generic;
using System.Linq;

[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Performance", "CA1812", Justification = "Instantiated by container")]
sealed class ProducerAdapter<TKey, TValue> : ProducerBuilder<TKey, TValue>
{
public IDictionary<string, string> ClientConfig { get; } = new Dictionary<string, string>();

public ProducerAdapter(
HandlerHelper<IErrorHandler> errorHelper,
HandlerHelper<IStatisticsHandler> statisticsHelper,
HandlerHelper<ILogHandler> logHelper,
ConfigWrapper? config = null,
ISerializer<TKey>? keySerializer = null,
ISerializer<TValue>? valueSerializer = null,
IAsyncSerializer<TKey>? asyncKeySerializer = null,
IAsyncSerializer<TValue>? asyncValueSerializer = null)
: base(config?.Values)
readonly IDisposable? scope;

public ProducerAdapter(IServiceScopeFactory scopes, ConfigWrapper config)
: this(config.Values, scopes.CreateScope(), dispose: true)
{
ErrorHandler = errorHelper.Resolve(x => x.OnError, ErrorHandler);
StatisticsHandler = statisticsHelper.Resolve(x => x.OnStatistics, StatisticsHandler);
LogHandler = logHelper.Resolve(x => x.OnLog, LogHandler);
}

internal ProducerAdapter(
IEnumerable<KeyValuePair<string, string>> config,
IServiceScope scope,
bool dispose)
: base(config)
{
ErrorHandler = scope.ServiceProvider.GetServices<IErrorHandler>()
.Aggregate(default(Action<IClient, Error>), (x, y) => x + y.OnError);

StatisticsHandler = scope.ServiceProvider.GetServices<IStatisticsHandler>()
.Aggregate(default(Action<IClient, string>), (x, y) => x + y.OnStatistics);

LogHandler = scope.ServiceProvider.GetServices<ILogHandler>()
.Aggregate(default(Action<IClient, LogMessage>), (x, y) => x + y.OnLog);

KeySerializer = keySerializer;
ValueSerializer = valueSerializer;
KeySerializer = scope.ServiceProvider.GetService<ISerializer<TKey>>();
ValueSerializer = scope.ServiceProvider.GetService<ISerializer<TValue>>();

// Setting both types of serializers is an error.
if (keySerializer == null) AsyncKeySerializer = asyncKeySerializer;
if (valueSerializer == null) AsyncValueSerializer = asyncValueSerializer;
if (KeySerializer == null)
{
AsyncKeySerializer = scope.ServiceProvider.GetService<IAsyncSerializer<TKey>>();
}

if (Config != null)
if (ValueSerializer == null)
{
foreach (var kvp in Config)
{
ClientConfig[kvp.Key] = kvp.Value;
}
AsyncValueSerializer =scope.ServiceProvider.GetService<IAsyncSerializer<TValue>>();
}

Config = ClientConfig;
if (dispose)
{
this.scope = scope;
}
}

public override IProducer<TKey, TValue> Build()
{
var producer = base.Build();
return scope != null ? new ScopedProducer<TKey, TValue>(producer, scope) : producer;
}
}
187 changes: 187 additions & 0 deletions Confluent.Kafka.DependencyInjection/Clients/ScopedConsumer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
namespace Confluent.Kafka.DependencyInjection.Clients;

using System;
using System.Collections.Generic;
using System.Threading;

class ScopedConsumer<TKey, TValue> : IConsumer<TKey, TValue>
{
readonly IConsumer<TKey, TValue> consumer;
readonly IDisposable scope;

public ScopedConsumer(IConsumer<TKey, TValue> consumer, IDisposable scope)
{
this.consumer = consumer;
this.scope = scope;
}

public Handle Handle => consumer.Handle;

public string Name => consumer.Name;

public List<string> Subscription => consumer.Subscription;

public List<TopicPartition> Assignment => consumer.Assignment;

public string MemberId => consumer.MemberId;

public IConsumerGroupMetadata ConsumerGroupMetadata => consumer.ConsumerGroupMetadata;

public int AddBrokers(string brokers)
{
return consumer.AddBrokers(brokers);
}

public void Subscribe(string topic)
{
consumer.Subscribe(topic);
}

public void Subscribe(IEnumerable<string> topics)
{
consumer.Subscribe(topics);
}

public void Unsubscribe()
{
consumer.Unsubscribe();
}

public void Assign(TopicPartition partition)
{
consumer.Assign(partition);
}

public void Assign(TopicPartitionOffset partition)
{
consumer.Assign(partition);
}

public void Assign(IEnumerable<TopicPartitionOffset> partitions)
{
consumer.Assign(partitions);
}

public void Assign(IEnumerable<TopicPartition> partitions)
{
consumer.Assign(partitions);
}

public void IncrementalAssign(IEnumerable<TopicPartitionOffset> partitions)
{
consumer.IncrementalAssign(partitions);
}

public void IncrementalAssign(IEnumerable<TopicPartition> partitions)
{
consumer.IncrementalAssign(partitions);
}

public void Unassign()
{
consumer.Unassign();
}

public void IncrementalUnassign(IEnumerable<TopicPartition> partitions)
{
consumer.IncrementalUnassign(partitions);
}

public void Seek(TopicPartitionOffset tpo)
{
consumer.Seek(tpo);
}

public ConsumeResult<TKey, TValue> Consume(int millisecondsTimeout)
{
return consumer.Consume(millisecondsTimeout);
}

public ConsumeResult<TKey, TValue> Consume(CancellationToken cancellationToken = default)
{
return consumer.Consume(cancellationToken);
}

public ConsumeResult<TKey, TValue> Consume(TimeSpan timeout)
{
return consumer.Consume(timeout);
}

public void Pause(IEnumerable<TopicPartition> partitions)
{
consumer.Pause(partitions);
}

public void Resume(IEnumerable<TopicPartition> partitions)
{
consumer.Resume(partitions);
}

public List<TopicPartitionOffset> Commit()
{
return consumer.Commit();
}

public void Commit(ConsumeResult<TKey, TValue> result)
{
consumer.Commit(result);
}

public void Commit(IEnumerable<TopicPartitionOffset> offsets)
{
consumer.Commit(offsets);
}

public void StoreOffset(ConsumeResult<TKey, TValue> result)
{
consumer.StoreOffset(result);
}

public void StoreOffset(TopicPartitionOffset offset)
{
consumer.StoreOffset(offset);
}

public Offset Position(TopicPartition partition)
{
return consumer.Position(partition);
}

public List<TopicPartitionOffset> Committed(TimeSpan timeout)
{
return consumer.Committed(timeout);
}

public List<TopicPartitionOffset> Committed(IEnumerable<TopicPartition> partitions, TimeSpan timeout)
{
return consumer.Committed(partitions, timeout);
}

public WatermarkOffsets GetWatermarkOffsets(TopicPartition topicPartition)
{
return consumer.GetWatermarkOffsets(topicPartition);
}

public WatermarkOffsets QueryWatermarkOffsets(TopicPartition topicPartition, TimeSpan timeout)
{
return consumer.QueryWatermarkOffsets(topicPartition, timeout);
}

public List<TopicPartitionOffset> OffsetsForTimes(
IEnumerable<TopicPartitionTimestamp> timestampsToSearch,
TimeSpan timeout)
{
return consumer.OffsetsForTimes(timestampsToSearch, timeout);
}

public virtual void Close()
{
consumer.Close();
}

public virtual void Dispose()
{
consumer.Dispose();
scope.Dispose();
}
}
Loading

0 comments on commit a94e46e

Please sign in to comment.