Skip to content

Commit

Permalink
Fixed MassTransit#4333 - Change query-based saga dispatch to sequenti…
Browse files Browse the repository at this point in the history
…al vs parallel
  • Loading branch information
NooNameR authored and phatboyg committed May 22, 2023
1 parent 050b710 commit 3bd03e2
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public ConsumeContext Deserialize(ReceiveContext receiveContext)
return Deserialize(receiveContext, false);
}

public SerializerContext Deserialize(MessageBody body, Headers headers, Uri? destinationAddress = null)
public SerializerContext Deserialize(MessageBody body, Headers headers, Uri destinationAddress = null)
{
return Deserialize(body, headers, false, destinationAddress);
}
Expand All @@ -66,7 +66,7 @@ ConsumeContext Deserialize(ReceiveContext receiveContext, bool isRetry)
}
}

SerializerContext Deserialize(MessageBody body, Headers headers, bool isRetry, Uri? destinationAddress = null)
SerializerContext Deserialize(MessageBody body, Headers headers, bool isRetry, Uri destinationAddress = null)
{
try
{
Expand Down
8 changes: 3 additions & 5 deletions src/MassTransit/Middleware/SendQuerySagaPipe.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
namespace MassTransit.Middleware
{
using System;
using System.Linq;
using System.Threading.Tasks;
using Logging;
using Saga;
Expand Down Expand Up @@ -29,7 +28,7 @@ public async Task Send(SagaRepositoryQueryContext<TSaga, T> context)
{
if (context.Count > 0)
{
async Task LoadInstance(Guid correlationId)
async Task SendToInstance(Guid correlationId)
{
SagaConsumeContext<TSaga, T> sagaConsumeContext = await context.Load(correlationId).ConfigureAwait(false);
if (sagaConsumeContext != null)
Expand All @@ -41,9 +40,7 @@ async Task LoadInstance(Guid correlationId)
await _policy.Existing(sagaConsumeContext, _next).ConfigureAwait(false);

if (_policy.IsReadOnly)
{
await context.Undo(sagaConsumeContext).ConfigureAwait(false);
}
else
{
if (sagaConsumeContext.IsCompleted)
Expand Down Expand Up @@ -71,7 +68,8 @@ async Task LoadInstance(Guid correlationId)
}
}

await Task.WhenAll(context.Select(LoadInstance)).ConfigureAwait(false);
foreach (var correlationId in context)
await SendToInstance(correlationId).ConfigureAwait(false);
}
else
{
Expand Down

0 comments on commit 3bd03e2

Please sign in to comment.