Skip to content

Commit

Permalink
Inject builders (take 2) (#8)
Browse files Browse the repository at this point in the history
* FIX: Support handler overrides
* Resolve builders within scope
* Consolidate builder namespace
  • Loading branch information
kmcclellan authored Feb 15, 2023
1 parent a94e46e commit d66f125
Show file tree
Hide file tree
Showing 11 changed files with 208 additions and 158 deletions.
67 changes: 0 additions & 67 deletions Confluent.Kafka.DependencyInjection/Builders/ConsumerAdapter.cs

This file was deleted.

61 changes: 0 additions & 61 deletions Confluent.Kafka.DependencyInjection/Builders/ProducerAdapter.cs

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace Confluent.Kafka.DependencyInjection.Builders;
namespace Confluent.Kafka.DependencyInjection.Clients;

using System.Collections.Generic;

Expand Down
76 changes: 76 additions & 0 deletions Confluent.Kafka.DependencyInjection/Clients/DIConsumerBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
namespace Confluent.Kafka.DependencyInjection.Clients;

using Confluent.Kafka.DependencyInjection.Handlers;
using Confluent.Kafka.SyncOverAsync;

using System;
using System.Collections.Generic;
using System.Linq;

sealed class DIConsumerBuilder<TKey, TValue> : ConsumerBuilder<TKey, TValue>
{
readonly IEnumerable<IErrorHandler> errorHandlers;
readonly IEnumerable<IStatisticsHandler> statisticsHandlers;
readonly IEnumerable<ILogHandler> logHandlers;
readonly IEnumerable<IPartitionsAssignedHandler> assignHandlers;
readonly IEnumerable<IPartitionsRevokedHandler> revokeHandlers;
readonly IEnumerable<IOffsetsCommittedHandler> commitHandlers;
readonly IDeserializer<TKey>? keyDeserializer;
readonly IDeserializer<TValue>? valueDeserializer;
readonly IAsyncDeserializer<TKey>? asyncKeyDeserializer;
readonly IAsyncDeserializer<TValue>? asyncValueDeserializer;

public DIConsumerBuilder(
ConfigWrapper config,
IEnumerable<IErrorHandler> errorHandlers,
IEnumerable<IStatisticsHandler> statisticsHandlers,
IEnumerable<ILogHandler> logHandlers,
IEnumerable<IPartitionsAssignedHandler> assignHandlers,
IEnumerable<IPartitionsRevokedHandler> revokeHandlers,
IEnumerable<IOffsetsCommittedHandler> commitHandlers,
IDeserializer<TKey>? keyDeserializer = null,
IDeserializer<TValue>? valueDeserializer = null,
IAsyncDeserializer<TKey>? asyncKeyDeserializer = null,
IAsyncDeserializer<TValue>? asyncValueDeserializer = null)
: base(config.Values)
{
this.errorHandlers = errorHandlers;
this.statisticsHandlers = statisticsHandlers;
this.logHandlers = logHandlers;
this.assignHandlers = assignHandlers;
this.revokeHandlers = revokeHandlers;
this.commitHandlers = commitHandlers;
this.keyDeserializer = keyDeserializer;
this.valueDeserializer = valueDeserializer;
this.asyncKeyDeserializer = asyncKeyDeserializer;
this.asyncValueDeserializer = asyncValueDeserializer;
}

public override IConsumer<TKey, TValue> Build()
{
ErrorHandler ??= errorHandlers.Aggregate(default(Action<IClient, Error>), (x, y) => x + y.OnError);

StatisticsHandler ??= statisticsHandlers.Aggregate(
default(Action<IClient, string>),
(x, y) => x + y.OnStatistics);

LogHandler ??= logHandlers.Aggregate(default(Action<IClient, LogMessage>), (x, y) => x + y.OnLog);

PartitionsAssignedHandler ??= assignHandlers.Aggregate(
default(Func<IClient, IEnumerable<TopicPartition>, IEnumerable<TopicPartitionOffset>>),
(x, y) => x + y.OnPartitionsAssigned);

PartitionsRevokedHandler ??= revokeHandlers.Aggregate(
default(Func<IClient, IEnumerable<TopicPartitionOffset>, IEnumerable<TopicPartitionOffset>>),
(x, y) => x + y.OnPartitionsRevoked);

OffsetsCommittedHandler ??= commitHandlers.Aggregate(
default(Action<IClient, CommittedOffsets>),
(x, y) => x + y.OnOffsetsCommitted);

KeyDeserializer ??= keyDeserializer ?? asyncKeyDeserializer?.AsSyncOverAsync();
ValueDeserializer ??= valueDeserializer ?? asyncValueDeserializer?.AsSyncOverAsync();

return base.Build();
}
}
62 changes: 62 additions & 0 deletions Confluent.Kafka.DependencyInjection/Clients/DIProducerBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
namespace Confluent.Kafka.DependencyInjection.Clients;

using Confluent.Kafka.DependencyInjection.Handlers;

using System;
using System.Collections.Generic;
using System.Linq;

sealed class DIProducerBuilder<TKey, TValue> : ProducerBuilder<TKey, TValue>
{
readonly IEnumerable<IErrorHandler> errorHandlers;
readonly IEnumerable<IStatisticsHandler> statisticsHandlers;
readonly IEnumerable<ILogHandler> logHandlers;
readonly ISerializer<TKey>? keySerializer;
readonly ISerializer<TValue>? valueSerializer;
readonly IAsyncSerializer<TKey>? asyncKeySerializer;
readonly IAsyncSerializer<TValue>? asyncValueSerializer;

public DIProducerBuilder(
ConfigWrapper config,
IEnumerable<IErrorHandler> errorHandlers,
IEnumerable<IStatisticsHandler> statisticsHandlers,
IEnumerable<ILogHandler> logHandlers,
ISerializer<TKey>? keySerializer = null,
ISerializer<TValue>? valueSerializer = null,
IAsyncSerializer<TKey>? asyncKeySerializer = null,
IAsyncSerializer<TValue>? asyncValueSerializer = null)
: base(config.Values)
{
this.errorHandlers = errorHandlers;
this.statisticsHandlers = statisticsHandlers;
this.logHandlers = logHandlers;
this.keySerializer = keySerializer;
this.valueSerializer = valueSerializer;
this.asyncKeySerializer = asyncKeySerializer;
this.asyncValueSerializer = asyncValueSerializer;
}

public override IProducer<TKey, TValue> Build()
{
ErrorHandler ??= errorHandlers.Aggregate(default(Action<IClient, Error>), (x, y) => x + y.OnError);

StatisticsHandler ??= statisticsHandlers.Aggregate(
default(Action<IClient, string>),
(x, y) => x + y.OnStatistics);

LogHandler ??= logHandlers.Aggregate(default(Action<IClient, LogMessage>), (x, y) => x + y.OnLog);

// Setting both types of serializers is an error.
if ((KeySerializer ??= keySerializer) == null)
{
AsyncKeySerializer ??= asyncKeySerializer;
}

if ((ValueSerializer ??= valueSerializer) == null)
{
AsyncValueSerializer ??= asyncValueSerializer;
}

return base.Build();
}
}
31 changes: 28 additions & 3 deletions Confluent.Kafka.DependencyInjection/Clients/ScopedConsumer.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
namespace Confluent.Kafka.DependencyInjection.Clients;

using Confluent.Kafka.DependencyInjection.Handlers;

using Microsoft.Extensions.DependencyInjection;

using System;
using System.Collections.Generic;
using System.Threading;
Expand All @@ -9,10 +13,11 @@ class ScopedConsumer<TKey, TValue> : IConsumer<TKey, TValue>
readonly IConsumer<TKey, TValue> consumer;
readonly IDisposable scope;

public ScopedConsumer(IConsumer<TKey, TValue> consumer, IDisposable scope)
public ScopedConsumer(IServiceScopeFactory scopes, IEnumerable<KeyValuePair<string, string>>? config = null)
{
this.consumer = consumer;
this.scope = scope;
IServiceScope scope;
this.scope = scope = scopes.CreateScope();
this.consumer = GetBuilder(scope.ServiceProvider, config).Build();
}

public Handle Handle => consumer.Handle;
Expand All @@ -27,6 +32,26 @@ public ScopedConsumer(IConsumer<TKey, TValue> consumer, IDisposable scope)

public IConsumerGroupMetadata ConsumerGroupMetadata => consumer.ConsumerGroupMetadata;

static ConsumerBuilder<TKey, TValue> GetBuilder(
IServiceProvider services,
IEnumerable<KeyValuePair<string, string>>? config)
{
return config != null
? new DIConsumerBuilder<TKey, TValue>(
new(config),
services.GetServices<IErrorHandler>(),
services.GetServices<IStatisticsHandler>(),
services.GetServices<ILogHandler>(),
services.GetServices<IPartitionsAssignedHandler>(),
services.GetServices<IPartitionsRevokedHandler>(),
services.GetServices<IOffsetsCommittedHandler>(),
services.GetService<IDeserializer<TKey>>(),
services.GetService<IDeserializer<TValue>>(),
services.GetService<IAsyncDeserializer<TKey>>(),
services.GetService<IAsyncDeserializer<TValue>>())
: services.GetRequiredService<ConsumerBuilder<TKey, TValue>>();
}

public int AddBrokers(string brokers)
{
return consumer.AddBrokers(brokers);
Expand Down
28 changes: 25 additions & 3 deletions Confluent.Kafka.DependencyInjection/Clients/ScopedProducer.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
namespace Confluent.Kafka.DependencyInjection.Clients;

using Confluent.Kafka.DependencyInjection.Handlers;

using Microsoft.Extensions.DependencyInjection;

using System;
using System.Collections.Generic;
using System.Threading;
Expand All @@ -10,16 +14,34 @@ class ScopedProducer<TKey, TValue> : IProducer<TKey, TValue>
readonly IProducer<TKey, TValue> producer;
readonly IDisposable scope;

public ScopedProducer(IProducer<TKey, TValue> producer, IDisposable scope)
public ScopedProducer(IServiceScopeFactory scopes, IEnumerable<KeyValuePair<string, string>>? config = null)
{
this.producer = producer;
this.scope = scope;
IServiceScope scope;
this.scope = scope = scopes.CreateScope();
this.producer = GetBuilder(scope.ServiceProvider, config).Build();
}

public Handle Handle => producer.Handle;

public string Name => producer.Name;

static ProducerBuilder<TKey, TValue> GetBuilder(
IServiceProvider services,
IEnumerable<KeyValuePair<string, string>>? config)
{
return config != null
? new DIProducerBuilder<TKey, TValue>(
new(config),
services.GetServices<IErrorHandler>(),
services.GetServices<IStatisticsHandler>(),
services.GetServices<ILogHandler>(),
services.GetService<ISerializer<TKey>>(),
services.GetService<ISerializer<TValue>>(),
services.GetService<IAsyncSerializer<TKey>>(),
services.GetService<IAsyncSerializer<TValue>>())
: services.GetRequiredService<ProducerBuilder<TKey, TValue>>();
}

public int AddBrokers(string brokers)
{
return producer.AddBrokers(brokers);
Expand Down
Loading

0 comments on commit d66f125

Please sign in to comment.