Skip to content

Commit

Permalink
Fix a bug that could cause multiple retries when both UseStorageLock …
Browse files Browse the repository at this point in the history
…and EnableParallelExecute are enabled. (#1560)

* fix: retry received event just execute directly

* publish retry event need to do the same
  • Loading branch information
sampsonye authored Jul 5, 2024
1 parent 7b6b033 commit 2afd306
Showing 1 changed file with 16 additions and 5 deletions.
21 changes: 16 additions & 5 deletions src/DotNetCore.CAP/Processor/IDispatcher.Default.cs
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,21 @@ public async ValueTask EnqueueToPublish(MediumMessage message)
{
try
{
if (!_publishedChannel.Writer.TryWrite(message))
while (await _publishedChannel.Writer.WaitToWriteAsync(_tasksCts!.Token).ConfigureAwait(false))
if (_publishedChannel.Writer.TryWrite(message))
return;
if (_tasksCts!.IsCancellationRequested) return;

if (_enableParallelSend && message.Retries == 0)
{
if (!_publishedChannel.Writer.TryWrite(message))
while (await _publishedChannel.Writer.WaitToWriteAsync(_tasksCts!.Token).ConfigureAwait(false))
if (_publishedChannel.Writer.TryWrite(message))
return;
}
else
{
var result = await _sender.SendAsync(message).ConfigureAwait(false);
if (!result.Succeeded) _logger.MessagePublishException(message.Origin.GetId(), result.ToString(), result.Exception);

}
}
catch (OperationCanceledException)
{
Expand All @@ -170,7 +181,7 @@ public async ValueTask EnqueueToExecute(MediumMessage message, ConsumerExecutorD
{
if (_tasksCts!.IsCancellationRequested) return;

if (_enableParallelExecute)
if (_enableParallelExecute && message.Retries == 0)
{
if (!_receivedChannel.Writer.TryWrite((message, descriptor)))
{
Expand Down

0 comments on commit 2afd306

Please sign in to comment.