diff --git a/build/version.props b/build/version.props index b80db86fc..4a408b33b 100644 --- a/build/version.props +++ b/build/version.props @@ -1,8 +1,8 @@ 8 - 1 - 3 + 2 + 0 $(VersionMajor).$(VersionMinor).$(VersionPatch) diff --git a/docs/content/user-guide/en/cap/configuration.md b/docs/content/user-guide/en/cap/configuration.md index 6b407eaf4..9794c82f4 100644 --- a/docs/content/user-guide/en/cap/configuration.md +++ b/docs/content/user-guide/en/cap/configuration.md @@ -24,6 +24,52 @@ services.AddCap(capOptions => For specific transport and storage configuration, you can take a look at the configuration options provided by the specific components in the [Transports](../transport/general.md) section and the [Persistent](../storage/general.md) section. +## Configuration in Subscribers + +Subscribers use the `[CapSubscribe]` attribute to mark themselves as subscribers. They can be located in an ASP.NET Core Controller or Service. + +When you declare `[CapSubscribe]`, you can change the behavior of the subscriber by specifying the following parameters. + +## [CapSubscribe] Name + +> string, required + +Subscribe to messages by specifying the `Name` parameter, which corresponds to the name specified when publishing the message through _cap.Publish("Name"). + +This name corresponds to different items in different Brokers: + +- In RabbitMQ, it corresponds to the Routing Key. +- In Kafka, it corresponds to the Topic. +- In AzureServiceBus, it corresponds to the Subject. +- In NATS, it corresponds to the Subject. +- In RedisStreams, it corresponds to the Stream. + +## [CapSubscribe] Group + +> string, optional + +Specify the `Group` parameter to place subscribers within a separate consumer group, a concept similar to consumer groups in Kafka. If this parameter is not specified, the current assembly name (`DefaultGroupName`) is used as the default. + +Subscribers with the same `Name` but set to **different** groups will all receive messages. Conversely, if subscribers with the same `Name` are set to the **same** group, only one will receive the message. + +It also makes sense for subscribers with different `Names` to be set to **different** groups; they can have independent threads for execution. Conversely, if subscribers with different `Names` are set to the **same** group, they will share consumption threads. + +Group corresponds to different items in different Brokers: + +- In RabbitMQ, it corresponds to Queue. +- In Kafka, it corresponds to Consumer Group. +- In AzureServiceBus, it corresponds to Subscription Name. +- In NATS, it corresponds to Queue Group. +- In RedisStreams, it corresponds to Consumer Group. + +## [CapSubscribe] GroupConcurrent + +> byte, optional + +Set the parallelism of concurrent execution for subscribers by specifying the value of the `GroupConcurrent` parameter. Concurrent execution means that it needs to be on an independent thread, so if you do not specify the `Group` parameter, CAP will automatically create a Group using the value of `Name`. + +Note: If you have multiple subscribers set to the same Group and also set the `GroupConcurrent` value for these subscribers, only the value set by the first subscriber will take effect. + ## Custom configuration The `CapOptions` is used to store configuration information. By default they have default values, sometimes you may need to customize them. @@ -135,10 +181,12 @@ The expiration time (in seconds) of the success message. When the message is sen The expiration time (in seconds) of the failed message. When the message is sent or consumed failed, it will be removed from database storage when the time reaches `FailedMessageExpiredAfter` seconds. You can set the expiration time by specifying this value. -#### UseDispatchingPerGroup +#### [Removed] UseDispatchingPerGroup > Default: false +> Removed in version 8.2, already default behavior + If `true` then all consumers within the same group pushes received messages to own dispatching pipeline channel. Each channel has set thread count to `ConsumerThreadCount` value. #### [Obsolete] EnableConsumerPrefetch diff --git a/docs/content/user-guide/zh/cap/configuration.md b/docs/content/user-guide/zh/cap/configuration.md index d80378c1a..651dca295 100644 --- a/docs/content/user-guide/zh/cap/configuration.md +++ b/docs/content/user-guide/zh/cap/configuration.md @@ -24,7 +24,53 @@ services.AddCap(config => 有关具体的传输器配置和存储配置,你可以查看 Transports 章节和 Persistent 章节中具体组件提供的配置项。 -## CAP 中的自定义配置 +## 订阅者中的配置 + +订阅者使用 `[CapSubscribe]` 这个Attribute来标记成为一个订阅者,订阅者可以位于 ASP.NET Core 的 Controller 或 Service 中。 + +当你在声明 `[CapSubscribe]` 时候,可以通过指定以下参数来改变订阅者的行为。 + +## [CapSubscribe] Name + +> string, 必须项 + +通过指定 `Name` 参数来订阅消息,其对应发布消息时通过 Publish("Name") 指定的名称。 + +该名称在不同的 Broker 有不同的对应项。 + +- 在 RabbitMQ 中对应 Routing Key。 +- 在 Kafka 中对应 Topic。 +- 在 AzureServiceBus 中对应 Subject。 +- 在 NATS 中对应 Subject。 +- 在 RedisStrems 中对应 Stream. + +## [CapSubscribe] Group + +> string, 可选项 + +通过指定 `Group` 参数来使订阅者位于单独的消费者组中,消费者组的概念类似于 Kafka 中的消费者组。如果不指定此参数将使用当前程序集名称(`DefaultGroupName`)作为默认值。 + +相同 `Name` 的订阅者设置为**不同的**组时,他们都会收到消息。相反如果相同 `Name` 的订阅者设置**相同的**组时,只有一个会收到消息。 + +不同 `Name` 的订阅者设置为**不同的**组时,也是有意义的,他们可以拥有独立的线程来执行。相反如果不同 `Name` 的订阅者设置**相同的**组时,他们将共享消费线程。 + +Group 在不同的 Broker 有不同的对应项。 + +- 在 RabbitMQ 中对应 Queue。 +- 在 Kafka 中对应 Consumer Group。 +- 在 AzureServiceBus 中对应 Subscription Name。 +- 在 NATS 中对应 Queue Group。 +- 在 RedisStrems 中对应 Consuemr Group. + +## [CapSubscribe] GroupConcurrent + +> byte, 可选项 + +通过指定 `GroupConcurrent` 参数的值来设置订阅者并行执行的并行度。并行执行意味着其需要位于独立线程中,因此如果你没有指定 `Group` 参数,则 CAP 将会以 `Name` 的值自动创建一个 Group。 + +注意: 如果你有多个订阅者都设置为了相同的 Group,并且也给订阅者都设置了 `GroupConcurrent` 的值,则只会有(第)一个设置的值生效。 + +## 自定义配置项 在 `AddCap` 中 `CapOptions` 对象是用来存储配置相关信息,默认情况下它们都具有一些默认值,有些时候你可能需要自定义。 @@ -136,10 +182,12 @@ services.AddCap(config => 失败消息的过期时间(秒)。 当消息发送或者消费失败时候,在时间达到 `FailedMessageExpiredAfter` 秒时候将会从 Persistent 中删除,你可以通过指定此值来设置过期的时间。 -#### UseDispatchingPerGroup +#### [已移除] UseDispatchingPerGroup > 默认值: false +> 版本 8.2.0 中移除,已是默认行为。 + 默认情况下,CAP会将所有消费者组的消息都先放置到内存同一个Channel中,然后线性处理。 如果设置为 true,则每个消费者组都会根据 `ConsumerThreadCount` 设置的值创建单独的线程进行处理。 diff --git a/samples/Sample.Kafka.PostgreSql/Startup.cs b/samples/Sample.Kafka.PostgreSql/Startup.cs index af6e202d0..d2ea1703c 100644 --- a/samples/Sample.Kafka.PostgreSql/Startup.cs +++ b/samples/Sample.Kafka.PostgreSql/Startup.cs @@ -5,7 +5,7 @@ namespace Sample.Kafka.PostgreSql { public class Startup { - public const string DbConnectionString = "User ID=postgres;Password=mysecretpassword;Host=localhost;Port=5432;Database=postgres;"; + public const string DbConnectionString = "User ID=postgres;Password=mysecretpassword;Host=127.0.0.1;Port=5432;Database=postgres;"; public void ConfigureServices(IServiceCollection services) { diff --git a/samples/Sample.RabbitMQ.MySql/Startup.cs b/samples/Sample.RabbitMQ.MySql/Startup.cs index 3c136e225..c5dad3f41 100644 --- a/samples/Sample.RabbitMQ.MySql/Startup.cs +++ b/samples/Sample.RabbitMQ.MySql/Startup.cs @@ -16,9 +16,6 @@ public void ConfigureServices(IServiceCollection services) x.UseEntityFramework(); x.UseRabbitMQ("localhost"); x.UseDashboard(); - //x.EnableConsumerPrefetch = true; - x.UseDispatchingPerGroup = true; - x.EnableSubscriberParallelExecute = true; x.FailedThresholdCallback = failed => { var logger = failed.ServiceProvider.GetService>(); diff --git a/src/DotNetCore.CAP.AmazonSQS/AmazonSQSConsumerClient.cs b/src/DotNetCore.CAP.AmazonSQS/AmazonSQSConsumerClient.cs index 716fd6cec..ae777ce7a 100644 --- a/src/DotNetCore.CAP.AmazonSQS/AmazonSQSConsumerClient.cs +++ b/src/DotNetCore.CAP.AmazonSQS/AmazonSQSConsumerClient.cs @@ -21,19 +21,22 @@ namespace DotNetCore.CAP.AmazonSQS; internal sealed class AmazonSQSConsumerClient : IConsumerClient { - private static readonly SemaphoreSlim ConnectionLock = new(1, 1); + private static readonly object ConnectionLock = new(); private readonly AmazonSQSOptions _amazonSQSOptions; - + private readonly SemaphoreSlim _semaphore; private readonly string _groupId; + private readonly byte _groupConcurrent; private string _queueUrl = string.Empty; private IAmazonSimpleNotificationService? _snsClient; private IAmazonSQS? _sqsClient; - public AmazonSQSConsumerClient(string groupId, IOptions options) + public AmazonSQSConsumerClient(string groupId, byte groupConcurrent, IOptions options) { _groupId = groupId; + _groupConcurrent = groupConcurrent; _amazonSQSOptions = options.Value; + _semaphore = new SemaphoreSlim(groupConcurrent); } public Func? OnMessageCallback { get; set; } @@ -89,16 +92,29 @@ public void Listening(TimeSpan timeout, CancellationToken cancellationToken) if (response.Messages.Count == 1) { - var messageObj = JsonSerializer.Deserialize(response.Messages[0].Body); + if (_groupConcurrent > 0) + { + _semaphore.Wait(cancellationToken); + Task.Run(() => Consume(), cancellationToken).ConfigureAwait(false); + } + else + { + Consume().GetAwaiter().GetResult(); + } - var header = messageObj!.MessageAttributes.ToDictionary(x => x.Key, x => x.Value.Value); - var body = messageObj.Message; + Task Consume() + { + var messageObj = JsonSerializer.Deserialize(response.Messages[0].Body); + + var header = messageObj!.MessageAttributes.ToDictionary(x => x.Key, x => x.Value.Value); + var body = messageObj.Message; - var message = new TransportMessage(header, body != null ? Encoding.UTF8.GetBytes(body) : null); + var message = new TransportMessage(header, body != null ? Encoding.UTF8.GetBytes(body) : null); - message.Headers.Add(Headers.Group, _groupId); + message.Headers.Add(Headers.Group, _groupId); - OnMessageCallback!(message, response.Messages[0].ReceiptHandle); + return OnMessageCallback!(message, response.Messages[0].ReceiptHandle); + } } else { @@ -113,6 +129,7 @@ public void Commit(object? sender) try { _ = _sqsClient!.DeleteMessageAsync(_queueUrl, (string)sender!).GetAwaiter().GetResult(); + _semaphore.Release(); } catch (ReceiptHandleIsInvalidException ex) { @@ -126,6 +143,7 @@ public void Reject(object? sender) { // Visible again in 3 seconds _ = _sqsClient!.ChangeMessageVisibilityAsync(_queueUrl, (string)sender!, 3).GetAwaiter().GetResult(); + _semaphore.Release(); } catch (MessageNotInflightException ex) { @@ -145,9 +163,7 @@ public void Connect(bool initSNS = true, bool initSQS = true) if (_snsClient == null && initSNS) { - ConnectionLock.Wait(); - - try + lock (ConnectionLock) { if (string.IsNullOrWhiteSpace(_amazonSQSOptions.SNSServiceUrl)) _snsClient = _amazonSQSOptions.Credentials != null @@ -159,19 +175,13 @@ public void Connect(bool initSNS = true, bool initSQS = true) ? new AmazonSimpleNotificationServiceClient(_amazonSQSOptions.Credentials, new AmazonSimpleNotificationServiceConfig { ServiceURL = _amazonSQSOptions.SNSServiceUrl }) : new AmazonSimpleNotificationServiceClient(new AmazonSimpleNotificationServiceConfig - { ServiceURL = _amazonSQSOptions.SNSServiceUrl }); - } - finally - { - ConnectionLock.Release(); + { ServiceURL = _amazonSQSOptions.SNSServiceUrl }); } } if (_sqsClient == null && initSQS) { - ConnectionLock.Wait(); - - try + lock (ConnectionLock) { if (string.IsNullOrWhiteSpace(_amazonSQSOptions.SQSServiceUrl)) _sqsClient = _amazonSQSOptions.Credentials != null @@ -188,10 +198,6 @@ public void Connect(bool initSNS = true, bool initSQS = true) // the existing queue. _queueUrl = _sqsClient.CreateQueueAsync(_groupId.NormalizeForAws()).GetAwaiter().GetResult().QueueUrl; } - finally - { - ConnectionLock.Release(); - } } } @@ -252,11 +258,11 @@ private async Task SubscribeToTopics(IEnumerable topics) foreach (var topicArn in topics) { await _snsClient!.SubscribeAsync(new SubscribeRequest - { - TopicArn = topicArn, - Protocol = "sqs", - Endpoint = sqsQueueArn - }) + { + TopicArn = topicArn, + Protocol = "sqs", + Endpoint = sqsQueueArn + }) .ConfigureAwait(false); } } diff --git a/src/DotNetCore.CAP.AmazonSQS/AmazonSQSConsumerClientFactory.cs b/src/DotNetCore.CAP.AmazonSQS/AmazonSQSConsumerClientFactory.cs index 1d1558030..1e52ff664 100644 --- a/src/DotNetCore.CAP.AmazonSQS/AmazonSQSConsumerClientFactory.cs +++ b/src/DotNetCore.CAP.AmazonSQS/AmazonSQSConsumerClientFactory.cs @@ -16,11 +16,11 @@ public AmazonSQSConsumerClientFactory(IOptions amazonSQSOption _amazonSQSOptions = amazonSQSOptions; } - public IConsumerClient Create(string groupId) + public IConsumerClient Create(string groupName, byte groupConcurrent) { try { - var client = new AmazonSQSConsumerClient(groupId, _amazonSQSOptions); + var client = new AmazonSQSConsumerClient(groupName, groupConcurrent, _amazonSQSOptions); return client; } catch (Exception e) diff --git a/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClient.cs b/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClient.cs index 0e99d99bd..400b72ea6 100644 --- a/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClient.cs +++ b/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClient.cs @@ -19,10 +19,11 @@ internal sealed class AzureServiceBusConsumerClient : IConsumerClient { private readonly AzureServiceBusOptions _asbOptions; private readonly SemaphoreSlim _connectionLock = new(1, 1); - private readonly ILogger _logger; private readonly IServiceProvider _serviceProvider; private readonly string _subscriptionName; + private readonly byte _groupConcurrent; + private readonly SemaphoreSlim _semaphore; private ServiceBusAdministrationClient? _administrationClient; private ServiceBusClient? _serviceBusClient; @@ -31,11 +32,14 @@ internal sealed class AzureServiceBusConsumerClient : IConsumerClient public AzureServiceBusConsumerClient( ILogger logger, string subscriptionName, + byte groupConcurrent, IOptions options, IServiceProvider serviceProvider) { _logger = logger; _subscriptionName = subscriptionName; + _groupConcurrent = groupConcurrent; + _semaphore = new SemaphoreSlim(groupConcurrent); _serviceProvider = serviceProvider; _asbOptions = options.Value ?? throw new ArgumentNullException(nameof(options)); } @@ -46,7 +50,6 @@ public AzureServiceBusConsumerClient( public BrokerAddress BrokerAddress => new("AzureServiceBus", _asbOptions.ConnectionString); - public void Subscribe(IEnumerable topics) { if (topics == null) throw new ArgumentNullException(nameof(topics)); @@ -161,7 +164,15 @@ private async Task _serviceBusProcessor_ProcessMessageAsync(ProcessMessageEventA { var context = ConvertMessage(arg.Message); - await OnMessageCallback!(context, new AzureServiceBusConsumerCommitInput(arg)); + if (_groupConcurrent > 0) + { + await _semaphore.WaitAsync(); + _ = Task.Run(() => OnMessageCallback!(context, new AzureServiceBusConsumerCommitInput(arg))).ConfigureAwait(false); + } + else + { + await OnMessageCallback!(context, new AzureServiceBusConsumerCommitInput(arg)); + } } private async Task _serviceBusProcessor_ProcessSessionMessageAsync(ProcessSessionMessageEventArgs arg) diff --git a/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClientFactory.cs b/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClientFactory.cs index f61832786..333cc7359 100644 --- a/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClientFactory.cs +++ b/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClientFactory.cs @@ -24,12 +24,12 @@ public AzureServiceBusConsumerClientFactory( _serviceProvider = serviceProvider; } - public IConsumerClient Create(string groupId) + public IConsumerClient Create(string groupName, byte groupConcurrent) { try { var logger = _loggerFactory.CreateLogger(typeof(AzureServiceBusConsumerClient)); - var client = new AzureServiceBusConsumerClient(logger, groupId, _asbOptions, _serviceProvider); + var client = new AzureServiceBusConsumerClient(logger, groupName, groupConcurrent, _asbOptions, _serviceProvider); client.ConnectAsync().GetAwaiter().GetResult(); return client; } diff --git a/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs b/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs index a7bf87c44..ad1cb7f0f 100644 --- a/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs +++ b/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs @@ -19,16 +19,20 @@ namespace DotNetCore.CAP.Kafka; public class KafkaConsumerClient : IConsumerClient { - private static readonly SemaphoreSlim ConnectionLock = new(1, 1); - + private static readonly object Lock = new(); private readonly string _groupId; + private readonly byte _groupConcurrent; + private readonly SemaphoreSlim _semaphore; private readonly KafkaOptions _kafkaOptions; private readonly IServiceProvider _serviceProvider; private IConsumer? _consumerClient; - public KafkaConsumerClient(string groupId, IOptions options, IServiceProvider serviceProvider) + public KafkaConsumerClient(string groupId, byte groupConcurrent, + IOptions options, IServiceProvider serviceProvider) { _groupId = groupId; + _groupConcurrent = groupConcurrent; + _semaphore = new SemaphoreSlim(groupConcurrent); _serviceProvider = serviceProvider; _kafkaOptions = options.Value ?? throw new ArgumentNullException(nameof(options)); } @@ -94,7 +98,9 @@ public void Listening(TimeSpan timeout, CancellationToken cancellationToken) try { consumerResult = _consumerClient!.Consume(timeout); + if (consumerResult == null) continue; + if (consumerResult.IsPartitionEOF || consumerResult.Message.Value == null) continue; } catch (ConsumeException e) when (_kafkaOptions.RetriableErrorCodes.Contains(e.Error.Code)) { @@ -108,29 +114,15 @@ public void Listening(TimeSpan timeout, CancellationToken cancellationToken) continue; } - if (consumerResult.IsPartitionEOF || consumerResult.Message.Value == null) continue; - - var headers = new Dictionary(consumerResult.Message.Headers.Count); - foreach (var header in consumerResult.Message.Headers) + if (_groupConcurrent > 0) { - var val = header.GetValueBytes(); - headers.Add(header.Key, val != null ? Encoding.UTF8.GetString(val) : null); + _semaphore.Wait(cancellationToken); + Task.Run(() => Consume(consumerResult), cancellationToken).ConfigureAwait(false); } - - headers.Add(Headers.Group, _groupId); - - if (_kafkaOptions.CustomHeadersBuilder != null) + else { - var customHeaders = _kafkaOptions.CustomHeadersBuilder(consumerResult, _serviceProvider); - foreach (var customHeader in customHeaders) - { - headers[customHeader.Key] = customHeader.Value; - } + Consume(consumerResult).GetAwaiter().GetResult(); } - - var message = new TransportMessage(headers, consumerResult.Message.Value); - - OnMessageCallback!(message, consumerResult).GetAwaiter().GetResult(); } // ReSharper disable once FunctionNeverReturns } @@ -138,11 +130,13 @@ public void Listening(TimeSpan timeout, CancellationToken cancellationToken) public void Commit(object? sender) { _consumerClient!.Commit((ConsumeResult)sender!); + _semaphore.Release(); } public void Reject(object? sender) { _consumerClient!.Assign(_consumerClient.Assignment); + _semaphore.Release(); } public void Dispose() @@ -154,9 +148,7 @@ public void Connect() { if (_consumerClient != null) return; - ConnectionLock.Wait(); - - try + lock (Lock) { if (_consumerClient == null) { @@ -171,10 +163,31 @@ public void Connect() _consumerClient = BuildConsumer(config); } } - finally + } + + private async Task Consume(ConsumeResult consumerResult) + { + var headers = new Dictionary(consumerResult.Message.Headers.Count); + foreach (var header in consumerResult.Message.Headers) + { + var val = header.GetValueBytes(); + headers.Add(header.Key, val != null ? Encoding.UTF8.GetString(val) : null); + } + + headers.Add(Headers.Group, _groupId); + + if (_kafkaOptions.CustomHeadersBuilder != null) { - ConnectionLock.Release(); + var customHeaders = _kafkaOptions.CustomHeadersBuilder(consumerResult, _serviceProvider); + foreach (var customHeader in customHeaders) + { + headers[customHeader.Key] = customHeader.Value; + } } + + var message = new TransportMessage(headers, consumerResult.Message.Value); + + await OnMessageCallback!(message, consumerResult); } protected virtual IConsumer BuildConsumer(ConsumerConfig config) diff --git a/src/DotNetCore.CAP.Kafka/KafkaConsumerClientFactory.cs b/src/DotNetCore.CAP.Kafka/KafkaConsumerClientFactory.cs index d8ba27e42..d3d66b0ca 100644 --- a/src/DotNetCore.CAP.Kafka/KafkaConsumerClientFactory.cs +++ b/src/DotNetCore.CAP.Kafka/KafkaConsumerClientFactory.cs @@ -18,11 +18,11 @@ public KafkaConsumerClientFactory(IOptions kafkaOptions, IServiceP _serviceProvider = serviceProvider; } - public virtual IConsumerClient Create(string groupId) + public virtual IConsumerClient Create(string groupName, byte groupConcurrent) { try { - return new KafkaConsumerClient(groupId, _kafkaOptions, _serviceProvider); + return new KafkaConsumerClient(groupName, groupConcurrent, _kafkaOptions, _serviceProvider); } catch (Exception e) { diff --git a/src/DotNetCore.CAP.NATS/NATSConsumerClient.cs b/src/DotNetCore.CAP.NATS/NATSConsumerClient.cs index 46132da1a..025dd5844 100644 --- a/src/DotNetCore.CAP.NATS/NATSConsumerClient.cs +++ b/src/DotNetCore.CAP.NATS/NATSConsumerClient.cs @@ -17,18 +17,21 @@ namespace DotNetCore.CAP.NATS { internal sealed class NATSConsumerClient : IConsumerClient { - private static readonly SemaphoreSlim ConnectionLock = new SemaphoreSlim(initialCount: 1, maxCount: 1); + private static readonly object ConnectionLock = new(); - private readonly string _groupId; + private readonly string _groupName; + private readonly byte _groupConcurrent; private readonly IServiceProvider _serviceProvider; private readonly NATSOptions _natsOptions; - + private readonly SemaphoreSlim _semaphore; private IConnection? _consumerClient; - public NATSConsumerClient(string groupId, IOptions options, IServiceProvider serviceProvider) + public NATSConsumerClient(string groupName, byte groupConcurrent, IOptions options, IServiceProvider serviceProvider) { - _groupId = groupId; + _groupName = groupName; + _groupConcurrent = groupConcurrent; _serviceProvider = serviceProvider; + _semaphore = new SemaphoreSlim(groupConcurrent); _natsOptions = options.Value ?? throw new ArgumentNullException(nameof(options)); } @@ -90,37 +93,38 @@ public void Subscribe(IEnumerable topics) var js = _consumerClient!.CreateJetStreamContext(); var streamGroup = topics.GroupBy(x => _natsOptions.NormalizeStreamName(x)); - ConnectionLock.Wait(); - foreach (var subjectStream in streamGroup) + lock (ConnectionLock) { - var groupName = Helper.Normalized(_groupId); - - foreach (var subject in subjectStream) + foreach (var subjectStream in streamGroup) { - try - { - var consumerConfig = ConsumerConfiguration.Builder() - .WithDurable(Helper.Normalized(groupName + "-" + subject)) - .WithDeliverPolicy(DeliverPolicy.New) - .WithAckWait(30000) - .WithAckPolicy(AckPolicy.Explicit); - - _natsOptions.ConsumerOptions?.Invoke(consumerConfig); - - var pso = PushSubscribeOptions.Builder() - .WithStream(subjectStream.Key) - .WithConfiguration(consumerConfig.Build()) - .Build(); - - js.PushSubscribeAsync(subject, groupName, SubscriptionMessageHandler, false, pso); - } - catch (Exception e) + var groupName = Helper.Normalized(_groupName); + + foreach (var subject in subjectStream) { - Console.WriteLine(e); + try + { + var consumerConfig = ConsumerConfiguration.Builder() + .WithDurable(Helper.Normalized(groupName + "-" + subject)) + .WithDeliverPolicy(DeliverPolicy.New) + .WithAckWait(30000) + .WithAckPolicy(AckPolicy.Explicit); + + _natsOptions.ConsumerOptions?.Invoke(consumerConfig); + + var pso = PushSubscribeOptions.Builder() + .WithStream(subjectStream.Key) + .WithConfiguration(consumerConfig.Build()) + .Build(); + + js.PushSubscribeAsync(subject, groupName, SubscriptionMessageHandler, false, pso); + } + catch (Exception e) + { + Console.WriteLine(e); + } } } } - ConnectionLock.Release(); } public void Listening(TimeSpan timeout, CancellationToken cancellationToken) @@ -135,25 +139,38 @@ public void Listening(TimeSpan timeout, CancellationToken cancellationToken) private void SubscriptionMessageHandler(object? sender, MsgHandlerEventArgs e) { - var headers = new Dictionary(); - - foreach (string h in e.Message.Header.Keys) + if (_groupConcurrent > 0) { - headers.Add(h, e.Message.Header[h]); + _semaphore.Wait(); + Task.Run(() => Consume()).ConfigureAwait(false); + } + else + { + Consume().GetAwaiter().GetResult(); } - headers.Add(Headers.Group, _groupId); - - if (_natsOptions.CustomHeadersBuilder != null) + Task Consume() { - var customHeaders = _natsOptions.CustomHeadersBuilder(e, _serviceProvider); - foreach (var customHeader in customHeaders) + var headers = new Dictionary(); + + foreach (string h in e.Message.Header.Keys) { - headers[customHeader.Key] = customHeader.Value; + headers.Add(h, e.Message.Header[h]); } - } - OnMessageCallback!(new TransportMessage(headers, e.Message.Data), e.Message); + headers.Add(Headers.Group, _groupName); + + if (_natsOptions.CustomHeadersBuilder != null) + { + var customHeaders = _natsOptions.CustomHeadersBuilder(e, _serviceProvider); + foreach (var customHeader in customHeaders) + { + headers[customHeader.Key] = customHeader.Value; + } + } + + return OnMessageCallback!(new TransportMessage(headers, e.Message.Data), e.Message); + } } public void Commit(object? sender) @@ -162,6 +179,7 @@ public void Commit(object? sender) { msg.Ack(); } + _semaphore.Release(); } public void Reject(object? sender) @@ -170,6 +188,7 @@ public void Reject(object? sender) { msg.Nak(); } + _semaphore.Release(); } public void Dispose() @@ -184,9 +203,7 @@ public void Connect() return; } - ConnectionLock.Wait(); - - try + lock (ConnectionLock) { if (_consumerClient == null) { @@ -201,10 +218,6 @@ public void Connect() _consumerClient = new ConnectionFactory().CreateConnection(opts); } } - finally - { - ConnectionLock.Release(); - } } private void DisconnectedEventHandler(object? sender, ConnEventArgs e) diff --git a/src/DotNetCore.CAP.NATS/NATSConsumerClientFactory.cs b/src/DotNetCore.CAP.NATS/NATSConsumerClientFactory.cs index 46344ee5a..2e1386664 100644 --- a/src/DotNetCore.CAP.NATS/NATSConsumerClientFactory.cs +++ b/src/DotNetCore.CAP.NATS/NATSConsumerClientFactory.cs @@ -18,11 +18,11 @@ public NATSConsumerClientFactory(IOptions natsOptions, IServiceProv _serviceProvider = serviceProvider; } - public IConsumerClient Create(string groupId) + public IConsumerClient Create(string groupName, byte groupConcurrent) { try { - var client = new NATSConsumerClient(groupId, _natsOptions, _serviceProvider); + var client = new NATSConsumerClient(groupName, groupConcurrent, _natsOptions, _serviceProvider); client.Connect(); return client; } diff --git a/src/DotNetCore.CAP.Pulsar/PulsarConsumerClient.cs b/src/DotNetCore.CAP.Pulsar/PulsarConsumerClient.cs index 362d95abc..be12f1f1e 100644 --- a/src/DotNetCore.CAP.Pulsar/PulsarConsumerClient.cs +++ b/src/DotNetCore.CAP.Pulsar/PulsarConsumerClient.cs @@ -18,13 +18,17 @@ internal sealed class PulsarConsumerClient : IConsumerClient { private readonly PulsarClient _client; private readonly string _groupId; + private readonly byte _groupConcurrent; + private readonly SemaphoreSlim _semaphore; private readonly PulsarOptions _pulsarOptions; private IConsumer? _consumerClient; - public PulsarConsumerClient(PulsarClient client, string groupId, IOptions options) + public PulsarConsumerClient(IOptions options, PulsarClient client, string groupName, byte groupConcurrent) { _client = client; - _groupId = groupId; + _groupId = groupName; + _groupConcurrent = groupConcurrent; + _semaphore = new SemaphoreSlim(groupConcurrent); _pulsarOptions = options.Value; } @@ -56,17 +60,30 @@ public void Listening(TimeSpan timeout, CancellationToken cancellationToken) { var consumerResult = _consumerClient!.ReceiveAsync(cancellationToken).GetAwaiter().GetResult(); - var headers = new Dictionary(consumerResult.Properties.Count); - foreach (var header in consumerResult.Properties) + if (_groupConcurrent > 0) { - headers.Add(header.Key, header.Value); + _semaphore.Wait(cancellationToken); + Task.Run(() => Consume(), cancellationToken).ConfigureAwait(false); + } + else + { + Consume().GetAwaiter().GetResult(); } - headers.Add(Headers.Group, _groupId); + Task Consume() + { + var headers = new Dictionary(consumerResult.Properties.Count); + foreach (var header in consumerResult.Properties) + { + headers.Add(header.Key, header.Value); + } - var message = new TransportMessage(headers, consumerResult.Data); + headers.Add(Headers.Group, _groupId); - OnMessageCallback!(message, consumerResult.MessageId).GetAwaiter().GetResult(); + var message = new TransportMessage(headers, consumerResult.Data); + + return OnMessageCallback!(message, consumerResult.MessageId); + } } catch (Exception e) { @@ -82,11 +99,13 @@ public void Listening(TimeSpan timeout, CancellationToken cancellationToken) public void Commit(object? sender) { _consumerClient!.AcknowledgeAsync((MessageId)sender!); + _semaphore.Release(); } public void Reject(object? sender) { if (sender is MessageId id) _consumerClient!.NegativeAcknowledge(id); + _semaphore.Release(); } public void Dispose() diff --git a/src/DotNetCore.CAP.Pulsar/PulsarConsumerClientFactory.cs b/src/DotNetCore.CAP.Pulsar/PulsarConsumerClientFactory.cs index 9e8770686..274cac16a 100644 --- a/src/DotNetCore.CAP.Pulsar/PulsarConsumerClientFactory.cs +++ b/src/DotNetCore.CAP.Pulsar/PulsarConsumerClientFactory.cs @@ -23,12 +23,12 @@ public PulsarConsumerClientFactory(IConnectionFactory connection, ILoggerFactory if (_pulsarOptions.Value.EnableClientLog) PulsarClient.Logger = loggerFactory.CreateLogger(); } - public IConsumerClient Create(string groupId) + public IConsumerClient Create(string groupName, byte groupConcurrent) { try { var client = _connection.RentClient(); - var consumerClient = new PulsarConsumerClient(client, groupId, _pulsarOptions); + var consumerClient = new PulsarConsumerClient(_pulsarOptions, client, groupName, groupConcurrent); return consumerClient; } catch (Exception e) diff --git a/src/DotNetCore.CAP.RabbitMQ/RabbitMQBasicConsumer.cs b/src/DotNetCore.CAP.RabbitMQ/RabbitMQBasicConsumer.cs new file mode 100644 index 000000000..39c74a9a9 --- /dev/null +++ b/src/DotNetCore.CAP.RabbitMQ/RabbitMQBasicConsumer.cs @@ -0,0 +1,156 @@ +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using DotNetCore.CAP.Messages; +using DotNetCore.CAP.Transport; +using RabbitMQ.Client; +using RabbitMQ.Client.Events; + +namespace DotNetCore.CAP.RabbitMQ +{ + public class RabbitMQBasicConsumer : AsyncDefaultBasicConsumer + { + private readonly SemaphoreSlim _semaphore; + private readonly string _groupName; + private readonly bool _usingTaskRun; + private readonly Func _msgCallback; + private readonly Action _logCallback; + private readonly Func>>? _customHeadersBuilder; + private readonly IServiceProvider _serviceProvider; + + public RabbitMQBasicConsumer(IModel? model, + byte concurrent, string groupName, + Func msgCallback, + Action logCallback, + Func>>? customHeadersBuilder, + IServiceProvider serviceProvider) + : base(model) + { + _semaphore = new SemaphoreSlim(concurrent); + _groupName = groupName; + _usingTaskRun = concurrent > 0; + _msgCallback = msgCallback; + _logCallback = logCallback; + _customHeadersBuilder = customHeadersBuilder; + _serviceProvider = serviceProvider; + } + + public override async Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, + string routingKey, IBasicProperties properties, ReadOnlyMemory body) + { + if (_usingTaskRun) + { + await _semaphore.WaitAsync(); + + _ = Task.Run(Consume).ConfigureAwait(false); + } + else + { + await Consume().ConfigureAwait(false); + } + + Task Consume() + { + var headers = new Dictionary(); + + if (properties.Headers != null) + foreach (var header in properties.Headers) + { + if (header.Value is byte[] val) + headers.Add(header.Key, Encoding.UTF8.GetString(val)); + else + headers.Add(header.Key, header.Value?.ToString()); + } + + headers.Add(Messages.Headers.Group, _groupName); + + if (_customHeadersBuilder != null) + { + var e = new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body); + var customHeaders = _customHeadersBuilder(e, _serviceProvider); + foreach (var customHeader in customHeaders) + { + headers[customHeader.Key] = customHeader.Value; + } + } + + var message = new TransportMessage(headers, body); + + return _msgCallback(message, deliveryTag); + } + } + + public void BasicAck(ulong deliveryTag) + { + if (Model.IsOpen) + Model.BasicAck(deliveryTag, false); + + _semaphore.Release(); + } + + public void BasicReject(ulong deliveryTag) + { + if (Model.IsOpen) + Model.BasicReject(deliveryTag, true); + + _semaphore.Release(); + } + + public override async Task OnCancel(params string[] consumerTags) + { + await base.OnCancel(consumerTags); + + var args = new LogMessageEventArgs + { + LogType = MqLogType.ConsumerCancelled, + Reason = string.Join(",", consumerTags) + }; + + _logCallback(args); + } + + public override async Task HandleBasicCancelOk(string consumerTag) + { + await base.HandleBasicCancelOk(consumerTag); + + var args = new LogMessageEventArgs + { + LogType = MqLogType.ConsumerUnregistered, + Reason = consumerTag + }; + + _logCallback(args); + } + + public override async Task HandleBasicConsumeOk(string consumerTag) + { + await base.HandleBasicConsumeOk(consumerTag); + + var args = new LogMessageEventArgs + { + LogType = MqLogType.ConsumerRegistered, + Reason = consumerTag + }; + + _logCallback(args); + } + + public override async Task HandleModelShutdown(object model, ShutdownEventArgs reason) + { + await base.HandleModelShutdown(model, reason); + + var args = new LogMessageEventArgs + { + LogType = MqLogType.ConsumerShutdown, + Reason = reason.ReplyText + }; + + _logCallback(args); + } + } +} diff --git a/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs b/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs index 54b02ad5b..b1fc83423 100644 --- a/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs +++ b/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs @@ -3,34 +3,34 @@ using System; using System.Collections.Generic; -using System.Text; using System.Threading; using System.Threading.Tasks; using DotNetCore.CAP.Messages; using DotNetCore.CAP.Transport; using Microsoft.Extensions.Options; using RabbitMQ.Client; -using RabbitMQ.Client.Events; -using Headers = DotNetCore.CAP.Messages.Headers; namespace DotNetCore.CAP.RabbitMQ; internal sealed class RabbitMQConsumerClient : IConsumerClient { + private static readonly object Lock = new(); private readonly IConnectionChannelPool _connectionChannelPool; + private readonly IServiceProvider _serviceProvider; private readonly string _exchangeName; private readonly string _queueName; + private readonly byte _groupConcurrent; private readonly RabbitMQOptions _rabbitMQOptions; - private readonly IServiceProvider _serviceProvider; - private readonly object _syncLock = new(); + private RabbitMQBasicConsumer? _consumer = null; private IModel? _channel; - public RabbitMQConsumerClient(string queueName, + public RabbitMQConsumerClient(string groupName, byte groupConcurrent, IConnectionChannelPool connectionChannelPool, IOptions options, IServiceProvider serviceProvider) { - _queueName = queueName; + _queueName = groupName; + _groupConcurrent = groupConcurrent; _connectionChannelPool = connectionChannelPool; _serviceProvider = serviceProvider; _rabbitMQOptions = options.Value; @@ -59,23 +59,26 @@ public void Listening(TimeSpan timeout, CancellationToken cancellationToken) { Connect(); - var consumer = new AsyncEventingBasicConsumer(_channel); - consumer.Received += OnConsumerReceived; - consumer.Shutdown += OnConsumerShutdown; - consumer.Registered += OnConsumerRegistered; - consumer.Unregistered += OnConsumerUnregistered; - consumer.ConsumerCancelled += OnConsumerConsumerCancelled; - - if (_rabbitMQOptions.BasicQosOptions != null) + if (_groupConcurrent > 0) + { + _channel?.BasicQos(prefetchSize: 0, prefetchCount: _groupConcurrent, global: false); + } + else if (_rabbitMQOptions.BasicQosOptions != null) + { _channel?.BasicQos(0, _rabbitMQOptions.BasicQosOptions.PrefetchCount, _rabbitMQOptions.BasicQosOptions.Global); + } + + _consumer = new RabbitMQBasicConsumer(_channel, _groupConcurrent, _queueName, OnMessageCallback!, OnLogCallback!, + _rabbitMQOptions.CustomHeadersBuilder, _serviceProvider); try { - _channel.BasicConsume(_queueName, false, consumer); + _channel.BasicConsume(_queueName, false, _consumer); } catch (TimeoutException ex) { - OnConsumerShutdown(null, new ShutdownEventArgs(ShutdownInitiator.Application, 0, ex.Message + "-->" + nameof(_channel.BasicConsume))); + _consumer.HandleModelShutdown(null!, new ShutdownEventArgs(ShutdownInitiator.Application, 0, + ex.Message + "-->" + nameof(_channel.BasicConsume))).GetAwaiter().GetResult(); } while (true) @@ -89,12 +92,12 @@ public void Listening(TimeSpan timeout, CancellationToken cancellationToken) public void Commit(object? sender) { - if (_channel!.IsOpen) _channel.BasicAck((ulong)sender!, false); + _consumer!.BasicAck((ulong)sender!); } public void Reject(object? sender) { - if (_channel!.IsOpen && sender is ulong val) _channel.BasicReject(val, true); + _consumer!.BasicAck((ulong)sender!); } public void Dispose() @@ -108,7 +111,7 @@ public void Connect() { var connection = _connectionChannelPool.GetConnection(); - lock (_syncLock) + lock (Lock) { if (_channel == null || _channel.IsClosed) { @@ -133,94 +136,15 @@ public void Connect() } catch (TimeoutException ex) { - OnConsumerShutdown(null, new ShutdownEventArgs(ShutdownInitiator.Application, 0, ex.Message + "-->" + nameof(_channel.QueueDeclare))); - } - } - } - } - - #region events - - private Task OnConsumerConsumerCancelled(object? sender, ConsumerEventArgs e) - { - var args = new LogMessageEventArgs - { - LogType = MqLogType.ConsumerCancelled, - Reason = string.Join(",", e.ConsumerTags) - }; - - OnLogCallback!(args); - - return Task.CompletedTask; - } - - private Task OnConsumerUnregistered(object? sender, ConsumerEventArgs e) - { - var args = new LogMessageEventArgs - { - LogType = MqLogType.ConsumerUnregistered, - Reason = string.Join(",", e.ConsumerTags) - }; - - OnLogCallback!(args); - - return Task.CompletedTask; - } + var args = new LogMessageEventArgs + { + LogType = MqLogType.ConsumerShutdown, + Reason = ex.Message + "-->" + nameof(_channel.QueueDeclare) + }; - private Task OnConsumerRegistered(object? sender, ConsumerEventArgs e) - { - var args = new LogMessageEventArgs - { - LogType = MqLogType.ConsumerRegistered, - Reason = string.Join(",", e.ConsumerTags) - }; - - OnLogCallback!(args); - - return Task.CompletedTask; - } - - private async Task OnConsumerReceived(object? sender, BasicDeliverEventArgs e) - { - var headers = new Dictionary(); - - if (e.BasicProperties.Headers != null) - foreach (var header in e.BasicProperties.Headers) - { - if (header.Value is byte[] val) - headers.Add(header.Key, Encoding.UTF8.GetString(val)); - else - headers.Add(header.Key, header.Value?.ToString()); - } - - headers.Add(Headers.Group, _queueName); - - if (_rabbitMQOptions.CustomHeadersBuilder != null) - { - var customHeaders = _rabbitMQOptions.CustomHeadersBuilder(e, _serviceProvider); - foreach (var customHeader in customHeaders) - { - headers[customHeader.Key] = customHeader.Value; + OnLogCallback!(args); + } } } - - var message = new TransportMessage(headers, e.Body.ToArray()); - - await OnMessageCallback!(message, e.DeliveryTag); - } - - private Task OnConsumerShutdown(object? sender, ShutdownEventArgs e) - { - var args = new LogMessageEventArgs - { - LogType = MqLogType.ConsumerShutdown, - Reason = e.ReplyText - }; - - OnLogCallback!(args); - - return Task.CompletedTask; } - - #endregion } \ No newline at end of file diff --git a/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClientFactory.cs b/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClientFactory.cs index 53e3229b9..75b717ab7 100644 --- a/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClientFactory.cs +++ b/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClientFactory.cs @@ -21,13 +21,15 @@ public RabbitMQConsumerClientFactory(IOptions rabbitMQOptions, _serviceProvider = serviceProvider; } - public IConsumerClient Create(string groupId) + public IConsumerClient Create(string groupId, byte concurrent) { try { - var client = - new RabbitMQConsumerClient(groupId, _connectionChannelPool, _rabbitMQOptions, _serviceProvider); + var client = new RabbitMQConsumerClient(groupId, concurrent, _connectionChannelPool, + _rabbitMQOptions, _serviceProvider); + client.Connect(); + return client; } catch (Exception e) diff --git a/src/DotNetCore.CAP.RedisStreams/IConsumerClient.Redis.cs b/src/DotNetCore.CAP.RedisStreams/IConsumerClient.Redis.cs index cae5ccfe8..1b5b55d14 100644 --- a/src/DotNetCore.CAP.RedisStreams/IConsumerClient.Redis.cs +++ b/src/DotNetCore.CAP.RedisStreams/IConsumerClient.Redis.cs @@ -17,18 +17,23 @@ namespace DotNetCore.CAP.RedisStreams; internal class RedisConsumerClient : IConsumerClient { private readonly string _groupId; + private readonly byte _groupConcurrent; + private readonly SemaphoreSlim _semaphore; private readonly ILogger _logger; private readonly IOptions _options; private readonly IRedisStreamManager _redis; private string[] _topics = default!; - public RedisConsumerClient(string groupId, + public RedisConsumerClient(string groupId, + byte groupConcurrent, IRedisStreamManager redis, IOptions options, ILogger logger ) { _groupId = groupId; + _groupConcurrent = groupConcurrent; + _semaphore = new SemaphoreSlim(groupConcurrent); _redis = redis; _options = options; _logger = logger; @@ -69,11 +74,13 @@ public void Commit(object? sender) var (stream, group, id) = ((string stream, string group, string id))sender!; _redis.Ack(stream, group, id).GetAwaiter().GetResult(); + + _semaphore.Release(); } public void Reject(object? sender) { - // ignore + _semaphore.Release(); } public void Dispose() @@ -105,30 +112,43 @@ private async Task ConsumeMessages(IAsyncEnumerable> st { if (entry.IsNull) return; - try - { - var message = RedisMessage.Create(entry, _groupId); - await OnMessageCallback!(message, (stream.Key.ToString(), _groupId, entry.Id.ToString())); - } - catch (Exception ex) + if (_groupConcurrent > 0) { - _logger.LogError(ex.Message, ex); - var logArgs = new LogMessageEventArgs - { - LogType = MqLogType.ConsumeError, - Reason = ex.ToString() - }; - OnLogCallback!(logArgs); + _semaphore.Wait(); + _ = Task.Run(() => Consume(position, stream, entry)).ConfigureAwait(false); } - finally + else { - var positionName = position == StreamPosition.Beginning - ? nameof(StreamPosition.Beginning) - : nameof(StreamPosition.NewMessages); - _logger.LogDebug($"Redis stream entry [{entry.Id}] [position : {positionName}] was delivered."); + await Consume(position, stream, entry); } } } } + + async Task Consume(RedisValue position, RedisStream stream, StreamEntry entry) + { + try + { + var message = RedisMessage.Create(entry, _groupId); + await OnMessageCallback!(message, (stream.Key.ToString(), _groupId, entry.Id.ToString())); + } + catch (Exception ex) + { + _logger.LogError(ex.Message, ex); + var logArgs = new LogMessageEventArgs + { + LogType = MqLogType.ConsumeError, + Reason = ex.ToString() + }; + OnLogCallback!(logArgs); + } + finally + { + var positionName = position == StreamPosition.Beginning + ? nameof(StreamPosition.Beginning) + : nameof(StreamPosition.NewMessages); + _logger.LogDebug($"Redis stream entry [{entry.Id}] [position : {positionName}] was delivered."); + } + } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.RedisStreams/IConsumerClientFactory.Redis.cs b/src/DotNetCore.CAP.RedisStreams/IConsumerClientFactory.Redis.cs index 2a4e1bd19..941240967 100644 --- a/src/DotNetCore.CAP.RedisStreams/IConsumerClientFactory.Redis.cs +++ b/src/DotNetCore.CAP.RedisStreams/IConsumerClientFactory.Redis.cs @@ -21,8 +21,8 @@ public RedisConsumerClientFactory(IOptions redisOptions, IRedis _logger = logger; } - public IConsumerClient Create(string groupId) + public IConsumerClient Create(string groupName, byte groupConcurrent) { - return new RedisConsumerClient(groupId, _redis, _redisOptions, _logger); + return new RedisConsumerClient(groupName, groupConcurrent, _redis, _redisOptions, _logger); } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/CAP.Options.cs b/src/DotNetCore.CAP/CAP.Options.cs index 4d2b52005..4a3106cdd 100644 --- a/src/DotNetCore.CAP/CAP.Options.cs +++ b/src/DotNetCore.CAP/CAP.Options.cs @@ -34,7 +34,6 @@ public CapOptions() Version = "v1"; DefaultGroupName = "cap.queue." + Assembly.GetEntryAssembly()?.GetName().Name!.ToLower(); CollectorCleaningInterval = 300; - UseDispatchingPerGroup = false; FallbackWindowLookbackSeconds = 240; } @@ -127,14 +126,7 @@ public CapOptions() /// If true, the message send task will be parallel execute by .net thread pool. /// Default is false. /// - public bool EnablePublishParallelSend { get; set; } - - /// - /// If true then each message group will have own independent dispatching pipeline. Each pipeline use as many threads as - /// value is. - /// Default is false. - /// - public bool UseDispatchingPerGroup { get; set; } + public bool EnablePublishParallelSend { get; set; } /// /// Configure the retry processor to pick up the backtrack time window for Scheduled or Failed status messages. diff --git a/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs b/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs index 6f425c99b..d435d8d06 100644 --- a/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs +++ b/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs @@ -61,11 +61,7 @@ public static CapBuilder AddCap(this IServiceCollection services, Action(); - else - services.TryAddSingleton(); + services.TryAddSingleton(); foreach (var serviceExtension in options.Extensions) serviceExtension.AddServices(services); diff --git a/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs b/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs index 8b040cff7..1bbc311e4 100644 --- a/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs +++ b/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs @@ -116,10 +116,11 @@ public void Execute() foreach (var matchGroup in groupingMatches) { ICollection topics; + var limit = _selector.GetGroupConcurrentLimit(matchGroup.Key); try { // ReSharper disable once ConvertToUsingDeclaration - using (var client = _consumerClientFactory.Create(matchGroup.Key)) + using (var client = _consumerClientFactory.Create(matchGroup.Key, limit)) { client.OnLogCallback = WriteLog; topics = client.FetchTopics(matchGroup.Value.Select(x => x.TopicName)); @@ -140,14 +141,14 @@ public void Execute() try { // ReSharper disable once ConvertToUsingDeclaration - using (var client = _consumerClientFactory.Create(matchGroup.Key)) + using (var client = _consumerClientFactory.Create(matchGroup.Key, limit)) { _serverAddress = client.BrokerAddress; RegisterMessageProcessor(client); client.Subscribe(topicIds); - + client.Listening(_pollingDelay, _cts.Token); } } @@ -174,7 +175,7 @@ public void Execute() private void RegisterMessageProcessor(IConsumerClient client) { client.OnLogCallback = WriteLog; - client.OnMessageCallback = async (transportMessage,sender) => + client.OnMessageCallback = async (transportMessage, sender) => { long? tracingTimestamp = null; try @@ -232,7 +233,7 @@ private void RegisterMessageProcessor(IConsumerClient client) { var content = _serializer.Serialize(message); - await _storage.StoreReceivedExceptionMessageAsync(name, group, content); + await _storage.StoreReceivedExceptionMessageAsync(name, group, content); client.Commit(sender); @@ -262,7 +263,7 @@ private void RegisterMessageProcessor(IConsumerClient client) TracingAfter(tracingTimestamp, transportMessage, _serverAddress); await _dispatcher.EnqueueToExecute(mediumMessage, executor!); - + client.Commit(sender); } @@ -276,7 +277,7 @@ private void RegisterMessageProcessor(IConsumerClient client) TracingError(tracingTimestamp, transportMessage, client.BrokerAddress, e); } - }; + }; } private void WriteLog(LogMessageEventArgs logmsg) diff --git a/src/DotNetCore.CAP/Internal/IConsumerServiceSelector.Default.cs b/src/DotNetCore.CAP/Internal/IConsumerServiceSelector.Default.cs index ce9358814..68a4784d4 100644 --- a/src/DotNetCore.CAP/Internal/IConsumerServiceSelector.Default.cs +++ b/src/DotNetCore.CAP/Internal/IConsumerServiceSelector.Default.cs @@ -169,9 +169,17 @@ protected IEnumerable GetTopicAttributesDescription( protected virtual void SetSubscribeAttribute(TopicAttribute attribute) { var prefix = !string.IsNullOrEmpty(_capOptions.GroupNamePrefix) - ? $"{_capOptions.GroupNamePrefix}." - : string.Empty; - attribute.Group = $"{prefix}{attribute.Group ?? _capOptions.DefaultGroupName}.{_capOptions.Version}"; + ? $"{_capOptions.GroupNamePrefix}." + : string.Empty; + + if (attribute.Group == null && attribute.GroupConcurrent > 0) + { + attribute.Group = $"{prefix}{attribute.Name}.{_capOptions.Version}"; + } + else + { + attribute.Group = $"{prefix}{attribute.Group ?? _capOptions.DefaultGroupName}.{_capOptions.Version}"; + } } private ConsumerExecutorDescriptor InitDescriptor( diff --git a/src/DotNetCore.CAP/Internal/MethodMatcherCache.cs b/src/DotNetCore.CAP/Internal/MethodMatcherCache.cs index ac22018e1..39ac4a825 100644 --- a/src/DotNetCore.CAP/Internal/MethodMatcherCache.cs +++ b/src/DotNetCore.CAP/Internal/MethodMatcherCache.cs @@ -17,21 +17,29 @@ public MethodMatcherCache(IConsumerServiceSelector selector) { _selector = selector; Entries = new ConcurrentDictionary>(); + GroupConcurrent = new ConcurrentDictionary(); } private ConcurrentDictionary> Entries { get; } + private ConcurrentDictionary GroupConcurrent { get; } + /// /// Get a dictionary of candidates.In the dictionary, /// the Key is the CAPSubscribeAttribute Group, the Value for the current Group of candidates /// - public ConcurrentDictionary> - GetCandidatesMethodsOfGroupNameGrouped() + public ConcurrentDictionary> GetCandidatesMethodsOfGroupNameGrouped() { if (Entries.Count != 0) return Entries; var executorCollection = _selector.SelectCandidates(); + foreach (var executor in executorCollection) + { + GroupConcurrent.AddOrUpdate(executor.Attribute.Group, executor.Attribute.GroupConcurrent, + (group, val) => (byte)(val + executor.Attribute.GroupConcurrent)); + } + var groupedCandidates = executorCollection.GroupBy(x => x.Attribute.Group); foreach (var item in groupedCandidates) Entries.TryAdd(item.Key, item.ToList()); @@ -39,6 +47,11 @@ public ConcurrentDictionary> return Entries; } + public byte GetGroupConcurrentLimit(string group) + { + return GroupConcurrent.TryGetValue(group, out byte value) ? value : (byte)1; + } + public List GetAllTopics() { if (Entries.Count == 0) GetCandidatesMethodsOfGroupNameGrouped(); diff --git a/src/DotNetCore.CAP/Internal/TopicAttribute.cs b/src/DotNetCore.CAP/Internal/TopicAttribute.cs index ee7a49f3a..0b3e03be4 100644 --- a/src/DotNetCore.CAP/Internal/TopicAttribute.cs +++ b/src/DotNetCore.CAP/Internal/TopicAttribute.cs @@ -32,8 +32,14 @@ protected TopicAttribute(string name, bool isPartial = false) /// /// Default group name is CapOptions setting.(Assembly name) - /// kafka --> groups.id - /// rabbit MQ --> queue.name + /// Kafka --> groups.id + /// RabbitMQ --> queue.name /// public string Group { get; set; } = default!; + + /// + /// Limit the number of messages consumed concurrently. + /// If you set this value but don't specify the Group, we will automatically create a Group using the Name. + /// + public byte GroupConcurrent { get; set; } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/Processor/IDispatcher.PerGroup.cs b/src/DotNetCore.CAP/Processor/IDispatcher.PerGroup.cs deleted file mode 100644 index e09135537..000000000 --- a/src/DotNetCore.CAP/Processor/IDispatcher.PerGroup.cs +++ /dev/null @@ -1,279 +0,0 @@ -// Copyright (c) .NET Core Community. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Linq; -using System.Threading; -using System.Threading.Channels; -using System.Threading.Tasks; -using DotNetCore.CAP.Internal; -using DotNetCore.CAP.Messages; -using DotNetCore.CAP.Persistence; -using DotNetCore.CAP.Transport; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; - -namespace DotNetCore.CAP.Processor; - -internal class DispatcherPerGroup : IDispatcher -{ - private CancellationTokenSource? _tasksCts; - private readonly CancellationTokenSource _delayCts = new(); - private readonly ISubscribeExecutor _executor; - private readonly ILogger _logger; - private readonly CapOptions _options; - private readonly IMessageSender _sender; - private readonly IDataStorage _storage; - private readonly PriorityQueue _schedulerQueue; - private readonly bool _enableParallelExecute; - private readonly bool _enableParallelSend; - - private Channel _publishedChannel = default!; - private ConcurrentDictionary> _receivedChannels = default!; - - private DateTime _nextSendTime = DateTime.MaxValue; - - public DispatcherPerGroup(ILogger logger, - IMessageSender sender, - IOptions options, - ISubscribeExecutor executor, - IDataStorage storage) - { - _logger = logger; - _sender = sender; - _options = options.Value; - _executor = executor; - _schedulerQueue = new PriorityQueue(); - _storage = storage; - _enableParallelExecute = options.Value.EnableSubscriberParallelExecute; - _enableParallelSend = options.Value.EnablePublishParallelSend; - } - - public async Task Start(CancellationToken stoppingToken) - { - stoppingToken.ThrowIfCancellationRequested(); - _tasksCts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken, CancellationToken.None); - _tasksCts.Token.Register(() => _delayCts.Cancel()); - - _publishedChannel = Channel.CreateBounded( - new BoundedChannelOptions(5000) - { - AllowSynchronousContinuations = true, - SingleReader = true, - SingleWriter = true, - FullMode = BoundedChannelFullMode.Wait - }); - - await Task.Run(Sending, _tasksCts.Token).ConfigureAwait(false); //here return Value task - - _receivedChannels = - new ConcurrentDictionary>( - _options.SubscriberParallelExecuteThreadCount, _options.SubscriberParallelExecuteThreadCount * 2); - - GetOrCreateReceiverChannel(_options.DefaultGroupName); - - _ = Task.Run(async () => - { - //When canceling, place the message status of unsent in the queue to delayed - _tasksCts.Token.Register(() => - { - try - { - if (_schedulerQueue.Count == 0) return; - - var messageIds = _schedulerQueue.UnorderedItems.Select(x => x.Element.DbId).ToArray(); - _storage.ChangePublishStateToDelayedAsync(messageIds).GetAwaiter().GetResult(); - _logger.LogDebug("Update storage to delayed success of delayed message in memory queue!"); - } - catch (Exception e) - { - _logger.LogWarning(e, "Update storage fails of delayed message in memory queue!"); - } - }); - - while (!_tasksCts.Token.IsCancellationRequested) - { - try - { - while (_schedulerQueue.TryPeek(out _, out _nextSendTime)) - { - var delayTime = _nextSendTime - DateTime.Now; - - if (delayTime > new TimeSpan(500000)) //50ms - { - await Task.Delay(delayTime, _delayCts.Token); - } - _tasksCts.Token.ThrowIfCancellationRequested(); - - await _sender.SendAsync(_schedulerQueue.Dequeue()).ConfigureAwait(false); - } - _tasksCts.Token.WaitHandle.WaitOne(100); - } - catch (OperationCanceledException) - { - //Ignore - } - } - }, _tasksCts.Token).ConfigureAwait(false); - - _logger.LogInformation("Starting DispatcherPerGroup"); - } - - public async ValueTask EnqueueToScheduler(MediumMessage message, DateTime publishTime, object? transaction = null) - { - message.ExpiresAt = publishTime; - - var timeSpan = publishTime - DateTime.Now; - - if (timeSpan <= TimeSpan.FromMinutes(1)) - { - await _storage.ChangePublishStateAsync(message, StatusName.Queued, transaction); - - _schedulerQueue.Enqueue(message, publishTime); - - if (publishTime < _nextSendTime) - { - _delayCts.Cancel(); - } - } - else - { - await _storage.ChangePublishStateAsync(message, StatusName.Delayed, transaction); - } - } - - public async ValueTask EnqueueToPublish(MediumMessage message) - { - try - { - if (!_publishedChannel.Writer.TryWrite(message)) - while (await _publishedChannel.Writer.WaitToWriteAsync(_tasksCts!.Token).ConfigureAwait(false)) - if (_publishedChannel.Writer.TryWrite(message)) - return; - } - catch (OperationCanceledException) - { - //Ignore - } - } - - public async ValueTask EnqueueToExecute(MediumMessage message, ConsumerExecutorDescriptor? descriptor) - { - try - { - var group = message.Origin.GetGroup()!; - - var channel = GetOrCreateReceiverChannel(group); - - if (!channel.Writer.TryWrite((message, descriptor))) - while (await channel.Writer.WaitToWriteAsync(_tasksCts!.Token).ConfigureAwait(false)) - if (channel.Writer.TryWrite((message, descriptor))) - return; - } - catch (OperationCanceledException) - { - //Ignore - } - } - - public void Dispose() - { - _tasksCts?.Dispose(); - } - - private Channel<(MediumMessage, ConsumerExecutorDescriptor?)> GetOrCreateReceiverChannel(string key) - { - return _receivedChannels.GetOrAdd(key, group => - { - _logger.LogInformation( - "Creating receiver channel for group {ConsumerGroup} with thread count {ConsumerThreadCount}", group, - _options.ConsumerThreadCount); - - var channel = Channel.CreateBounded<(MediumMessage, ConsumerExecutorDescriptor?)>( - new BoundedChannelOptions(_options.SubscriberParallelExecuteThreadCount * _options.SubscriberParallelExecuteBufferFactor) - { - AllowSynchronousContinuations = true, - SingleReader = _options.SubscriberParallelExecuteThreadCount == 1, - SingleWriter = true, - FullMode = BoundedChannelFullMode.Wait - }); - - if (_enableParallelExecute) - { - Task.WhenAll(Enumerable.Range(0, _options.SubscriberParallelExecuteThreadCount) - .Select(_ => Task.Run(() => Processing(group, channel), _tasksCts!.Token)).ToArray()) - .ConfigureAwait(false); - } - else - { - _ = Task.Run(() => Processing(group, channel), _tasksCts!.Token).ConfigureAwait(false); - } - return channel; - }); - } - - private async ValueTask Sending() - { - try - { - while (await _publishedChannel.Reader.WaitToReadAsync(_tasksCts!.Token).ConfigureAwait(false)) - while (_publishedChannel.Reader.TryRead(out var message)) - try - { - if (_enableParallelSend) - { - _ = Task.Run(async () => - { - var result = await _sender.SendAsync(message).ConfigureAwait(false); - if (!result.Succeeded) _logger.MessagePublishException(message.Origin.GetId(), result.ToString(), result.Exception); - }); - } - else - { - var result = await _sender.SendAsync(message).ConfigureAwait(false); - if (!result.Succeeded) _logger.MessagePublishException(message.Origin.GetId(), result.ToString(), result.Exception); - } - } - catch (Exception ex) - { - _logger.LogError(ex, $"An exception occurred when sending a message to the broker. Id:{message.DbId}"); - } - } - catch (OperationCanceledException) - { - // expected - } - } - - private async ValueTask Processing(string group, Channel<(MediumMessage, ConsumerExecutorDescriptor?)> channel) - { - try - { - while (await channel.Reader.WaitToReadAsync(_tasksCts!.Token).ConfigureAwait(false)) - while (channel.Reader.TryRead(out var message)) - try - { - _logger.LogDebug("Dispatching message for group {ConsumerGroup}", group); - - var item1 = message.Item1; - var item2 = message.Item2; - await _executor.ExecuteAsync(item1, item2, _tasksCts.Token).ConfigureAwait(false); - } - catch (OperationCanceledException) - { - //expected - } - catch (Exception e) - { - _logger.LogError(e, - $"An exception occurred when invoke subscriber. MessageId:{message.Item1.DbId}"); - } - } - catch (OperationCanceledException) - { - // expected - } - } -} \ No newline at end of file diff --git a/src/DotNetCore.CAP/Transport/IConsumerClientFactory.cs b/src/DotNetCore.CAP/Transport/IConsumerClientFactory.cs index 25654f54d..19cdbce27 100644 --- a/src/DotNetCore.CAP/Transport/IConsumerClientFactory.cs +++ b/src/DotNetCore.CAP/Transport/IConsumerClientFactory.cs @@ -11,6 +11,7 @@ public interface IConsumerClientFactory /// /// Create a new instance of . /// - /// message group number - IConsumerClient Create(string groupId); + /// Message group name. + /// Message consumed concurrent limit. + IConsumerClient Create(string groupName, byte groupConcurrent); } \ No newline at end of file diff --git a/test/DotNetCore.CAP.Test/FakeInMemoryQueue/InMemoryConsumerClient.cs b/test/DotNetCore.CAP.Test/FakeInMemoryQueue/InMemoryConsumerClient.cs index f96a63ae9..b57e4d2b3 100644 --- a/test/DotNetCore.CAP.Test/FakeInMemoryQueue/InMemoryConsumerClient.cs +++ b/test/DotNetCore.CAP.Test/FakeInMemoryQueue/InMemoryConsumerClient.cs @@ -12,15 +12,16 @@ internal sealed class InMemoryConsumerClient : IConsumerClient { private readonly ILogger _logger; private readonly InMemoryQueue _queue; + private readonly SemaphoreSlim _semaphore; + private readonly byte _concurrent; private readonly string _subscriptionName; - public InMemoryConsumerClient( - ILogger logger, - InMemoryQueue queue, - string subscriptionName) + public InMemoryConsumerClient(ILogger logger, InMemoryQueue queue, string subscriptionName, byte concurrent) { _logger = logger; _queue = queue; + _concurrent = concurrent; + _semaphore = new SemaphoreSlim(_concurrent); _subscriptionName = subscriptionName; } @@ -52,12 +53,12 @@ public void Listening(TimeSpan timeout, CancellationToken cancellationToken) public void Commit(object sender) { - // ignore + _semaphore.Release(); } public void Reject(object sender) { - // ignore + _semaphore.Release(); } public void Dispose() @@ -70,8 +71,16 @@ public void Dispose() private void OnConsumerReceived(TransportMessage e) { var headers = e.Headers; - headers.TryAdd(Messages.Headers.Group, _subscriptionName); - OnMessageCallback(e, null).GetAwaiter().GetResult(); + headers.TryAdd(Headers.Group, _subscriptionName); + if (_concurrent > 0) + { + _semaphore.Wait(); + Task.Run(() => OnMessageCallback(e, null)).ConfigureAwait(false); + } + else + { + OnMessageCallback(e, null).GetAwaiter().GetResult(); + } } #endregion private methods } diff --git a/test/DotNetCore.CAP.Test/FakeInMemoryQueue/InMemoryConsumerClientFactory.cs b/test/DotNetCore.CAP.Test/FakeInMemoryQueue/InMemoryConsumerClientFactory.cs index 3e3c216f3..daff8dad0 100644 --- a/test/DotNetCore.CAP.Test/FakeInMemoryQueue/InMemoryConsumerClientFactory.cs +++ b/test/DotNetCore.CAP.Test/FakeInMemoryQueue/InMemoryConsumerClientFactory.cs @@ -14,10 +14,10 @@ public InMemoryConsumerClientFactory(ILoggerFactory loggerFactory, InMemoryQueue _queue = queue; } - public IConsumerClient Create(string groupId) + public IConsumerClient Create(string groupName, byte groupConcurrent) { var logger = _loggerFactory.CreateLogger(typeof(InMemoryConsumerClient)); - return new InMemoryConsumerClient(logger, _queue, groupId); + return new InMemoryConsumerClient(logger, _queue, groupName, groupConcurrent); } } } \ No newline at end of file