-
Notifications
You must be signed in to change notification settings - Fork 5
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Support configuring/resolving
IAdminClient
(#10)
* Example: Resolve admin client
- Loading branch information
1 parent
723c1ba
commit e1b8b2a
Showing
5 changed files
with
233 additions
and
1 deletion.
There are no files selected for viewing
40 changes: 40 additions & 0 deletions
40
Confluent.Kafka.DependencyInjection/Clients/DIAdminClientBuilder.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
namespace Confluent.Kafka.DependencyInjection.Clients; | ||
|
||
using Confluent.Kafka.DependencyInjection.Handlers; | ||
|
||
using System; | ||
using System.Collections.Generic; | ||
using System.Linq; | ||
|
||
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Performance", "CA1812", Justification = "Instantiated by container")] | ||
sealed class DIAdminClientBuilder : AdminClientBuilder | ||
{ | ||
readonly IEnumerable<IErrorHandler> errorHandlers; | ||
readonly IEnumerable<IStatisticsHandler> statisticsHandlers; | ||
readonly IEnumerable<ILogHandler> logHandlers; | ||
|
||
public DIAdminClientBuilder( | ||
AdminClientConfig config, | ||
IEnumerable<IErrorHandler> errorHandlers, | ||
IEnumerable<IStatisticsHandler> statisticsHandlers, | ||
IEnumerable<ILogHandler> logHandlers) | ||
: base(config) | ||
{ | ||
this.errorHandlers = errorHandlers; | ||
this.statisticsHandlers = statisticsHandlers; | ||
this.logHandlers = logHandlers; | ||
} | ||
|
||
public override IAdminClient Build() | ||
{ | ||
ErrorHandler ??= errorHandlers.Aggregate(default(Action<IClient, Error>), (x, y) => x + y.OnError); | ||
|
||
StatisticsHandler ??= statisticsHandlers.Aggregate( | ||
default(Action<IClient, string>), | ||
(x, y) => x + y.OnStatistics); | ||
|
||
LogHandler ??= logHandlers.Aggregate(default(Action<IClient, LogMessage>), (x, y) => x + y.OnLog); | ||
|
||
return base.Build(); | ||
} | ||
} |
164 changes: 164 additions & 0 deletions
164
Confluent.Kafka.DependencyInjection/Clients/ScopedAdminClient.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,164 @@ | ||
namespace Confluent.Kafka.DependencyInjection.Clients; | ||
|
||
using Confluent.Kafka.Admin; | ||
|
||
using Microsoft.Extensions.DependencyInjection; | ||
|
||
using System; | ||
using System.Collections.Generic; | ||
using System.Threading.Tasks; | ||
|
||
class ScopedAdminClient : IAdminClient | ||
{ | ||
readonly IAdminClient client; | ||
readonly IDisposable scope; | ||
|
||
public ScopedAdminClient(IServiceScopeFactory scopes, IEnumerable<KeyValuePair<string, string>>? config) | ||
{ | ||
IServiceScope scope; | ||
this.scope = scope = scopes.CreateScope(); | ||
|
||
if (config != null) | ||
{ | ||
var merged = scope.ServiceProvider.GetRequiredService<AdminClientConfig>(); | ||
|
||
foreach (var kvp in config) | ||
{ | ||
merged.Set(kvp.Key, kvp.Value); | ||
} | ||
} | ||
|
||
this.client = scope.ServiceProvider.GetRequiredService<AdminClientBuilder>().Build(); | ||
} | ||
|
||
public Handle Handle => client.Handle; | ||
|
||
public string Name => client.Name; | ||
|
||
public int AddBrokers(string brokers) | ||
{ | ||
return client.AddBrokers(brokers); | ||
} | ||
|
||
public Metadata GetMetadata(TimeSpan timeout) | ||
{ | ||
return client.GetMetadata(timeout); | ||
} | ||
|
||
public Metadata GetMetadata(string topic, TimeSpan timeout) | ||
{ | ||
return client.GetMetadata(topic, timeout); | ||
} | ||
|
||
public Task CreateTopicsAsync(IEnumerable<TopicSpecification> topics, CreateTopicsOptions? options = null) | ||
{ | ||
return client.CreateTopicsAsync(topics, options); | ||
} | ||
|
||
public Task DeleteTopicsAsync(IEnumerable<string> topics, DeleteTopicsOptions? options = null) | ||
{ | ||
return client.DeleteTopicsAsync(topics, options); | ||
} | ||
|
||
public Task CreatePartitionsAsync( | ||
IEnumerable<PartitionsSpecification> partitionsSpecifications, | ||
CreatePartitionsOptions? options = null) | ||
{ | ||
return client.CreatePartitionsAsync(partitionsSpecifications, options); | ||
} | ||
|
||
public Task<List<DeleteRecordsResult>> DeleteRecordsAsync( | ||
IEnumerable<TopicPartitionOffset> topicPartitionOffsets, | ||
DeleteRecordsOptions? options = null) | ||
{ | ||
return client.DeleteRecordsAsync(topicPartitionOffsets, options); | ||
} | ||
|
||
public GroupInfo ListGroup(string group, TimeSpan timeout) | ||
{ | ||
return client.ListGroup(group, timeout); | ||
} | ||
|
||
public List<GroupInfo> ListGroups(TimeSpan timeout) | ||
{ | ||
return client.ListGroups(timeout); | ||
} | ||
|
||
public Task<ListConsumerGroupsResult> ListConsumerGroupsAsync(ListConsumerGroupsOptions? options = null) | ||
{ | ||
return client.ListConsumerGroupsAsync(options); | ||
} | ||
|
||
public Task<DescribeConsumerGroupsResult> DescribeConsumerGroupsAsync( | ||
IEnumerable<string> groups, | ||
DescribeConsumerGroupsOptions? options = null) | ||
{ | ||
return client.DescribeConsumerGroupsAsync(groups, options); | ||
} | ||
|
||
public Task DeleteGroupsAsync(IList<string> groups, DeleteGroupsOptions? options = null) | ||
{ | ||
return client.DeleteGroupsAsync(groups, options); | ||
} | ||
|
||
public Task<List<ListConsumerGroupOffsetsResult>> ListConsumerGroupOffsetsAsync( | ||
IEnumerable<ConsumerGroupTopicPartitions> groupPartitions, | ||
ListConsumerGroupOffsetsOptions? options = null) | ||
{ | ||
return client.ListConsumerGroupOffsetsAsync(groupPartitions, options); | ||
} | ||
|
||
public Task<List<AlterConsumerGroupOffsetsResult>> AlterConsumerGroupOffsetsAsync( | ||
IEnumerable<ConsumerGroupTopicPartitionOffsets> groupPartitions, | ||
AlterConsumerGroupOffsetsOptions? options = null) | ||
{ | ||
return client.AlterConsumerGroupOffsetsAsync(groupPartitions, options); | ||
} | ||
|
||
public Task<DeleteConsumerGroupOffsetsResult> DeleteConsumerGroupOffsetsAsync( | ||
string group, | ||
IEnumerable<TopicPartition> partitions, | ||
DeleteConsumerGroupOffsetsOptions? options = null) | ||
{ | ||
return client.DeleteConsumerGroupOffsetsAsync(group, partitions, options); | ||
} | ||
|
||
public Task<List<DescribeConfigsResult>> DescribeConfigsAsync( | ||
IEnumerable<ConfigResource> resources, | ||
DescribeConfigsOptions? options = null) | ||
{ | ||
return client.DescribeConfigsAsync(resources, options); | ||
} | ||
|
||
public Task AlterConfigsAsync( | ||
Dictionary<ConfigResource, List<ConfigEntry>> configs, | ||
AlterConfigsOptions? options = null) | ||
{ | ||
return client.AlterConfigsAsync(configs, options); | ||
} | ||
|
||
public Task<DescribeAclsResult> DescribeAclsAsync( | ||
AclBindingFilter aclBindingFilter, | ||
DescribeAclsOptions? options = null) | ||
{ | ||
return client.DescribeAclsAsync(aclBindingFilter, options); | ||
} | ||
|
||
public Task CreateAclsAsync(IEnumerable<AclBinding> aclBindings, CreateAclsOptions? options = null) | ||
{ | ||
return client.CreateAclsAsync(aclBindings, options); | ||
} | ||
|
||
public Task<List<DeleteAclsResult>> DeleteAclsAsync( | ||
IEnumerable<AclBindingFilter> aclBindingFilters, | ||
DeleteAclsOptions? options = null) | ||
{ | ||
return client.DeleteAclsAsync(aclBindingFilters, options); | ||
} | ||
|
||
public void Dispose() | ||
{ | ||
client.Dispose(); | ||
scope.Dispose(); | ||
} | ||
} |
12 changes: 12 additions & 0 deletions
12
Confluent.Kafka.DependencyInjection/Clients/ServiceAdminClient.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
namespace Confluent.Kafka.DependencyInjection.Clients; | ||
|
||
using Microsoft.Extensions.DependencyInjection; | ||
|
||
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Performance", "CA1812", Justification = "Instantiated by container")] | ||
sealed class ServiceAdminClient : ScopedAdminClient | ||
{ | ||
public ServiceAdminClient(IServiceScopeFactory scopes, AdminClientConfig config) | ||
: base(scopes, config) | ||
{ | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters