Skip to content

Commit

Permalink
Add GroupConcurrent option to support subscriber concurrent execution (
Browse files Browse the repository at this point in the history
…#1537)

* Add support for parallel consumption by GroupConcurrent attribute.

* Remove  UseDispatchingPerGroup options.
  • Loading branch information
yang-xiaodong authored May 29, 2024
1 parent 5a6aa8e commit 19f6407
Show file tree
Hide file tree
Showing 30 changed files with 589 additions and 585 deletions.
4 changes: 2 additions & 2 deletions build/version.props
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
<Project>
<PropertyGroup>
<VersionMajor>8</VersionMajor>
<VersionMinor>1</VersionMinor>
<VersionPatch>3</VersionPatch>
<VersionMinor>2</VersionMinor>
<VersionPatch>0</VersionPatch>
<VersionQuality></VersionQuality>
<VersionPrefix>$(VersionMajor).$(VersionMinor).$(VersionPatch)</VersionPrefix>
</PropertyGroup>
Expand Down
50 changes: 49 additions & 1 deletion docs/content/user-guide/en/cap/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,52 @@ services.AddCap(capOptions =>

For specific transport and storage configuration, you can take a look at the configuration options provided by the specific components in the [Transports](../transport/general.md) section and the [Persistent](../storage/general.md) section.

## Configuration in Subscribers

Subscribers use the `[CapSubscribe]` attribute to mark themselves as subscribers. They can be located in an ASP.NET Core Controller or Service.

When you declare `[CapSubscribe]`, you can change the behavior of the subscriber by specifying the following parameters.

## [CapSubscribe] Name

> string, required
Subscribe to messages by specifying the `Name` parameter, which corresponds to the name specified when publishing the message through _cap.Publish("Name").

This name corresponds to different items in different Brokers:

- In RabbitMQ, it corresponds to the Routing Key.
- In Kafka, it corresponds to the Topic.
- In AzureServiceBus, it corresponds to the Subject.
- In NATS, it corresponds to the Subject.
- In RedisStreams, it corresponds to the Stream.

## [CapSubscribe] Group

> string, optional
Specify the `Group` parameter to place subscribers within a separate consumer group, a concept similar to consumer groups in Kafka. If this parameter is not specified, the current assembly name (`DefaultGroupName`) is used as the default.

Subscribers with the same `Name` but set to **different** groups will all receive messages. Conversely, if subscribers with the same `Name` are set to the **same** group, only one will receive the message.

It also makes sense for subscribers with different `Names` to be set to **different** groups; they can have independent threads for execution. Conversely, if subscribers with different `Names` are set to the **same** group, they will share consumption threads.

Group corresponds to different items in different Brokers:

- In RabbitMQ, it corresponds to Queue.
- In Kafka, it corresponds to Consumer Group.
- In AzureServiceBus, it corresponds to Subscription Name.
- In NATS, it corresponds to Queue Group.
- In RedisStreams, it corresponds to Consumer Group.

## [CapSubscribe] GroupConcurrent

> byte, optional
Set the parallelism of concurrent execution for subscribers by specifying the value of the `GroupConcurrent` parameter. Concurrent execution means that it needs to be on an independent thread, so if you do not specify the `Group` parameter, CAP will automatically create a Group using the value of `Name`.

Note: If you have multiple subscribers set to the same Group and also set the `GroupConcurrent` value for these subscribers, only the value set by the first subscriber will take effect.

## Custom configuration

The `CapOptions` is used to store configuration information. By default they have default values, sometimes you may need to customize them.
Expand Down Expand Up @@ -135,10 +181,12 @@ The expiration time (in seconds) of the success message. When the message is sen
The expiration time (in seconds) of the failed message. When the message is sent or consumed failed, it will be removed from database storage when the time reaches `FailedMessageExpiredAfter` seconds. You can set the expiration time by specifying this value.

#### UseDispatchingPerGroup
#### [Removed] UseDispatchingPerGroup

> Default: false
> Removed in version 8.2, already default behavior
If `true` then all consumers within the same group pushes received messages to own dispatching pipeline channel. Each channel has set thread count to `ConsumerThreadCount` value.

#### [Obsolete] EnableConsumerPrefetch
Expand Down
52 changes: 50 additions & 2 deletions docs/content/user-guide/zh/cap/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,53 @@ services.AddCap(config =>

有关具体的传输器配置和存储配置,你可以查看 Transports 章节和 Persistent 章节中具体组件提供的配置项。

## CAP 中的自定义配置
## 订阅者中的配置

订阅者使用 `[CapSubscribe]` 这个Attribute来标记成为一个订阅者,订阅者可以位于 ASP.NET Core 的 Controller 或 Service 中。

当你在声明 `[CapSubscribe]` 时候,可以通过指定以下参数来改变订阅者的行为。

## [CapSubscribe] Name

> string, 必须项
通过指定 `Name` 参数来订阅消息,其对应发布消息时通过 Publish("Name") 指定的名称。

该名称在不同的 Broker 有不同的对应项。

- 在 RabbitMQ 中对应 Routing Key。
- 在 Kafka 中对应 Topic。
- 在 AzureServiceBus 中对应 Subject。
- 在 NATS 中对应 Subject。
- 在 RedisStrems 中对应 Stream.

## [CapSubscribe] Group

> string, 可选项
通过指定 `Group` 参数来使订阅者位于单独的消费者组中,消费者组的概念类似于 Kafka 中的消费者组。如果不指定此参数将使用当前程序集名称(`DefaultGroupName`)作为默认值。

相同 `Name` 的订阅者设置为**不同的**组时,他们都会收到消息。相反如果相同 `Name` 的订阅者设置**相同的**组时,只有一个会收到消息。

不同 `Name` 的订阅者设置为**不同的**组时,也是有意义的,他们可以拥有独立的线程来执行。相反如果不同 `Name` 的订阅者设置**相同的**组时,他们将共享消费线程。

Group 在不同的 Broker 有不同的对应项。

- 在 RabbitMQ 中对应 Queue。
- 在 Kafka 中对应 Consumer Group。
- 在 AzureServiceBus 中对应 Subscription Name。
- 在 NATS 中对应 Queue Group。
- 在 RedisStrems 中对应 Consuemr Group.

## [CapSubscribe] GroupConcurrent

> byte, 可选项
通过指定 `GroupConcurrent` 参数的值来设置订阅者并行执行的并行度。并行执行意味着其需要位于独立线程中,因此如果你没有指定 `Group` 参数,则 CAP 将会以 `Name` 的值自动创建一个 Group。

注意: 如果你有多个订阅者都设置为了相同的 Group,并且也给订阅者都设置了 `GroupConcurrent` 的值,则只会有(第)一个设置的值生效。

## 自定义配置项

`AddCap``CapOptions` 对象是用来存储配置相关信息,默认情况下它们都具有一些默认值,有些时候你可能需要自定义。

Expand Down Expand Up @@ -136,10 +182,12 @@ services.AddCap(config =>
失败消息的过期时间(秒)。 当消息发送或者消费失败时候,在时间达到 `FailedMessageExpiredAfter` 秒时候将会从 Persistent 中删除,你可以通过指定此值来设置过期的时间。

#### UseDispatchingPerGroup
#### [已移除] UseDispatchingPerGroup

> 默认值: false
> 版本 8.2.0 中移除,已是默认行为。
默认情况下,CAP会将所有消费者组的消息都先放置到内存同一个Channel中,然后线性处理。
如果设置为 true,则每个消费者组都会根据 `ConsumerThreadCount` 设置的值创建单独的线程进行处理。

Expand Down
2 changes: 1 addition & 1 deletion samples/Sample.Kafka.PostgreSql/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ namespace Sample.Kafka.PostgreSql
{
public class Startup
{
public const string DbConnectionString = "User ID=postgres;Password=mysecretpassword;Host=localhost;Port=5432;Database=postgres;";
public const string DbConnectionString = "User ID=postgres;Password=mysecretpassword;Host=127.0.0.1;Port=5432;Database=postgres;";

public void ConfigureServices(IServiceCollection services)
{
Expand Down
3 changes: 0 additions & 3 deletions samples/Sample.RabbitMQ.MySql/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ public void ConfigureServices(IServiceCollection services)
x.UseEntityFramework<AppDbContext>();
x.UseRabbitMQ("localhost");
x.UseDashboard();
//x.EnableConsumerPrefetch = true;
x.UseDispatchingPerGroup = true;
x.EnableSubscriberParallelExecute = true;
x.FailedThresholdCallback = failed =>
{
var logger = failed.ServiceProvider.GetService<ILogger<Startup>>();
Expand Down
64 changes: 35 additions & 29 deletions src/DotNetCore.CAP.AmazonSQS/AmazonSQSConsumerClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,22 @@ namespace DotNetCore.CAP.AmazonSQS;

internal sealed class AmazonSQSConsumerClient : IConsumerClient
{
private static readonly SemaphoreSlim ConnectionLock = new(1, 1);
private static readonly object ConnectionLock = new();
private readonly AmazonSQSOptions _amazonSQSOptions;

private readonly SemaphoreSlim _semaphore;
private readonly string _groupId;
private readonly byte _groupConcurrent;
private string _queueUrl = string.Empty;

private IAmazonSimpleNotificationService? _snsClient;
private IAmazonSQS? _sqsClient;

public AmazonSQSConsumerClient(string groupId, IOptions<AmazonSQSOptions> options)
public AmazonSQSConsumerClient(string groupId, byte groupConcurrent, IOptions<AmazonSQSOptions> options)
{
_groupId = groupId;
_groupConcurrent = groupConcurrent;
_amazonSQSOptions = options.Value;
_semaphore = new SemaphoreSlim(groupConcurrent);
}

public Func<TransportMessage, object?, Task>? OnMessageCallback { get; set; }
Expand Down Expand Up @@ -89,16 +92,29 @@ public void Listening(TimeSpan timeout, CancellationToken cancellationToken)

if (response.Messages.Count == 1)
{
var messageObj = JsonSerializer.Deserialize<SQSReceivedMessage>(response.Messages[0].Body);
if (_groupConcurrent > 0)
{
_semaphore.Wait(cancellationToken);
Task.Run(() => Consume(), cancellationToken).ConfigureAwait(false);
}
else
{
Consume().GetAwaiter().GetResult();
}

var header = messageObj!.MessageAttributes.ToDictionary(x => x.Key, x => x.Value.Value);
var body = messageObj.Message;
Task Consume()
{
var messageObj = JsonSerializer.Deserialize<SQSReceivedMessage>(response.Messages[0].Body);

var header = messageObj!.MessageAttributes.ToDictionary(x => x.Key, x => x.Value.Value);
var body = messageObj.Message;

var message = new TransportMessage(header, body != null ? Encoding.UTF8.GetBytes(body) : null);
var message = new TransportMessage(header, body != null ? Encoding.UTF8.GetBytes(body) : null);

message.Headers.Add(Headers.Group, _groupId);
message.Headers.Add(Headers.Group, _groupId);

OnMessageCallback!(message, response.Messages[0].ReceiptHandle);
return OnMessageCallback!(message, response.Messages[0].ReceiptHandle);
}
}
else
{
Expand All @@ -113,6 +129,7 @@ public void Commit(object? sender)
try
{
_ = _sqsClient!.DeleteMessageAsync(_queueUrl, (string)sender!).GetAwaiter().GetResult();
_semaphore.Release();
}
catch (ReceiptHandleIsInvalidException ex)
{
Expand All @@ -126,6 +143,7 @@ public void Reject(object? sender)
{
// Visible again in 3 seconds
_ = _sqsClient!.ChangeMessageVisibilityAsync(_queueUrl, (string)sender!, 3).GetAwaiter().GetResult();
_semaphore.Release();
}
catch (MessageNotInflightException ex)
{
Expand All @@ -145,9 +163,7 @@ public void Connect(bool initSNS = true, bool initSQS = true)

if (_snsClient == null && initSNS)
{
ConnectionLock.Wait();

try
lock (ConnectionLock)
{
if (string.IsNullOrWhiteSpace(_amazonSQSOptions.SNSServiceUrl))
_snsClient = _amazonSQSOptions.Credentials != null
Expand All @@ -159,19 +175,13 @@ public void Connect(bool initSNS = true, bool initSQS = true)
? new AmazonSimpleNotificationServiceClient(_amazonSQSOptions.Credentials,
new AmazonSimpleNotificationServiceConfig { ServiceURL = _amazonSQSOptions.SNSServiceUrl })
: new AmazonSimpleNotificationServiceClient(new AmazonSimpleNotificationServiceConfig
{ ServiceURL = _amazonSQSOptions.SNSServiceUrl });
}
finally
{
ConnectionLock.Release();
{ ServiceURL = _amazonSQSOptions.SNSServiceUrl });
}
}

if (_sqsClient == null && initSQS)
{
ConnectionLock.Wait();

try
lock (ConnectionLock)
{
if (string.IsNullOrWhiteSpace(_amazonSQSOptions.SQSServiceUrl))
_sqsClient = _amazonSQSOptions.Credentials != null
Expand All @@ -188,10 +198,6 @@ public void Connect(bool initSNS = true, bool initSQS = true)
// the existing queue.
_queueUrl = _sqsClient.CreateQueueAsync(_groupId.NormalizeForAws()).GetAwaiter().GetResult().QueueUrl;
}
finally
{
ConnectionLock.Release();
}
}
}

Expand Down Expand Up @@ -252,11 +258,11 @@ private async Task SubscribeToTopics(IEnumerable<string> topics)
foreach (var topicArn in topics)
{
await _snsClient!.SubscribeAsync(new SubscribeRequest
{
TopicArn = topicArn,
Protocol = "sqs",
Endpoint = sqsQueueArn
})
{
TopicArn = topicArn,
Protocol = "sqs",
Endpoint = sqsQueueArn
})
.ConfigureAwait(false);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ public AmazonSQSConsumerClientFactory(IOptions<AmazonSQSOptions> amazonSQSOption
_amazonSQSOptions = amazonSQSOptions;
}

public IConsumerClient Create(string groupId)
public IConsumerClient Create(string groupName, byte groupConcurrent)
{
try
{
var client = new AmazonSQSConsumerClient(groupId, _amazonSQSOptions);
var client = new AmazonSQSConsumerClient(groupName, groupConcurrent, _amazonSQSOptions);
return client;
}
catch (Exception e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ internal sealed class AzureServiceBusConsumerClient : IConsumerClient
{
private readonly AzureServiceBusOptions _asbOptions;
private readonly SemaphoreSlim _connectionLock = new(1, 1);

private readonly ILogger _logger;
private readonly IServiceProvider _serviceProvider;
private readonly string _subscriptionName;
private readonly byte _groupConcurrent;
private readonly SemaphoreSlim _semaphore;

private ServiceBusAdministrationClient? _administrationClient;
private ServiceBusClient? _serviceBusClient;
Expand All @@ -31,11 +32,14 @@ internal sealed class AzureServiceBusConsumerClient : IConsumerClient
public AzureServiceBusConsumerClient(
ILogger logger,
string subscriptionName,
byte groupConcurrent,
IOptions<AzureServiceBusOptions> options,
IServiceProvider serviceProvider)
{
_logger = logger;
_subscriptionName = subscriptionName;
_groupConcurrent = groupConcurrent;
_semaphore = new SemaphoreSlim(groupConcurrent);
_serviceProvider = serviceProvider;
_asbOptions = options.Value ?? throw new ArgumentNullException(nameof(options));
}
Expand All @@ -46,7 +50,6 @@ public AzureServiceBusConsumerClient(

public BrokerAddress BrokerAddress => new("AzureServiceBus", _asbOptions.ConnectionString);


public void Subscribe(IEnumerable<string> topics)
{
if (topics == null) throw new ArgumentNullException(nameof(topics));
Expand Down Expand Up @@ -161,7 +164,15 @@ private async Task _serviceBusProcessor_ProcessMessageAsync(ProcessMessageEventA
{
var context = ConvertMessage(arg.Message);

await OnMessageCallback!(context, new AzureServiceBusConsumerCommitInput(arg));
if (_groupConcurrent > 0)
{
await _semaphore.WaitAsync();
_ = Task.Run(() => OnMessageCallback!(context, new AzureServiceBusConsumerCommitInput(arg))).ConfigureAwait(false);
}
else
{
await OnMessageCallback!(context, new AzureServiceBusConsumerCommitInput(arg));
}
}

private async Task _serviceBusProcessor_ProcessSessionMessageAsync(ProcessSessionMessageEventArgs arg)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ public AzureServiceBusConsumerClientFactory(
_serviceProvider = serviceProvider;
}

public IConsumerClient Create(string groupId)
public IConsumerClient Create(string groupName, byte groupConcurrent)
{
try
{
var logger = _loggerFactory.CreateLogger(typeof(AzureServiceBusConsumerClient));
var client = new AzureServiceBusConsumerClient(logger, groupId, _asbOptions, _serviceProvider);
var client = new AzureServiceBusConsumerClient(logger, groupName, groupConcurrent, _asbOptions, _serviceProvider);
client.ConnectAsync().GetAwaiter().GetResult();
return client;
}
Expand Down
Loading

0 comments on commit 19f6407

Please sign in to comment.