From 3bd03e278f150640dbb6916c925b072f45d32643 Mon Sep 17 00:00:00 2001 From: "denys.kozhevnikov" Date: Tue, 25 Apr 2023 14:27:40 +0100 Subject: [PATCH] Fixed #4333 - Change query-based saga dispatch to sequential vs parallel --- .../EncryptedFallbackMessageDeserializerV2.cs | 4 ++-- src/MassTransit/Middleware/SendQuerySagaPipe.cs | 8 +++----- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/src/MassTransit.Newtonsoft/Serialization/EncryptedFallbackMessageDeserializerV2.cs b/src/MassTransit.Newtonsoft/Serialization/EncryptedFallbackMessageDeserializerV2.cs index 9a7c766d8be..164a7330140 100644 --- a/src/MassTransit.Newtonsoft/Serialization/EncryptedFallbackMessageDeserializerV2.cs +++ b/src/MassTransit.Newtonsoft/Serialization/EncryptedFallbackMessageDeserializerV2.cs @@ -40,7 +40,7 @@ public ConsumeContext Deserialize(ReceiveContext receiveContext) return Deserialize(receiveContext, false); } - public SerializerContext Deserialize(MessageBody body, Headers headers, Uri? destinationAddress = null) + public SerializerContext Deserialize(MessageBody body, Headers headers, Uri destinationAddress = null) { return Deserialize(body, headers, false, destinationAddress); } @@ -66,7 +66,7 @@ ConsumeContext Deserialize(ReceiveContext receiveContext, bool isRetry) } } - SerializerContext Deserialize(MessageBody body, Headers headers, bool isRetry, Uri? destinationAddress = null) + SerializerContext Deserialize(MessageBody body, Headers headers, bool isRetry, Uri destinationAddress = null) { try { diff --git a/src/MassTransit/Middleware/SendQuerySagaPipe.cs b/src/MassTransit/Middleware/SendQuerySagaPipe.cs index 6f12fe12897..58bc4c2ca4c 100644 --- a/src/MassTransit/Middleware/SendQuerySagaPipe.cs +++ b/src/MassTransit/Middleware/SendQuerySagaPipe.cs @@ -1,7 +1,6 @@ namespace MassTransit.Middleware { using System; - using System.Linq; using System.Threading.Tasks; using Logging; using Saga; @@ -29,7 +28,7 @@ public async Task Send(SagaRepositoryQueryContext context) { if (context.Count > 0) { - async Task LoadInstance(Guid correlationId) + async Task SendToInstance(Guid correlationId) { SagaConsumeContext sagaConsumeContext = await context.Load(correlationId).ConfigureAwait(false); if (sagaConsumeContext != null) @@ -41,9 +40,7 @@ async Task LoadInstance(Guid correlationId) await _policy.Existing(sagaConsumeContext, _next).ConfigureAwait(false); if (_policy.IsReadOnly) - { await context.Undo(sagaConsumeContext).ConfigureAwait(false); - } else { if (sagaConsumeContext.IsCompleted) @@ -71,7 +68,8 @@ async Task LoadInstance(Guid correlationId) } } - await Task.WhenAll(context.Select(LoadInstance)).ConfigureAwait(false); + foreach (var correlationId in context) + await SendToInstance(correlationId).ConfigureAwait(false); } else {