From a57b5f27c556e1f491deea31a182a1883c21bea7 Mon Sep 17 00:00:00 2001 From: Dennis van der Stelt Date: Thu, 29 Jun 2023 06:53:49 -0400 Subject: [PATCH] Provide ability to return messages forwarded to Bridge error queue back to originating Bridge queue - 2.0 branch (#275) Co-authored-by: Ramon Smits --- .../AcceptanceTesting.csproj | 8 ++ src/AcceptanceTesting/BridgeComponent.cs | 6 ++ src/AcceptanceTesting/FakeShovel.cs | 33 +++++++ .../Shared/TransferFailureTests.cs | 89 +++++++++++++++++++ .../BridgeHeaders.cs | 5 +- .../Configuration/HostBuilderExtensions.cs | 2 +- .../EndpointProxyFactory.cs | 4 +- .../IMessageShovel.cs | 7 ++ .../MessageShovel.cs | 5 +- .../NServiceBus.MessagingBridge.csproj | 3 +- .../RawEndpointErrorHandlingPolicy.cs | 1 + .../APIApprovals.PublicApi.approved.txt | 1 + 12 files changed, 156 insertions(+), 8 deletions(-) create mode 100644 src/AcceptanceTesting/FakeShovel.cs create mode 100644 src/AcceptanceTests/Shared/TransferFailureTests.cs create mode 100644 src/NServiceBus.MessagingBridge/IMessageShovel.cs diff --git a/src/AcceptanceTesting/AcceptanceTesting.csproj b/src/AcceptanceTesting/AcceptanceTesting.csproj index 6d9069f0..abcbddc3 100644 --- a/src/AcceptanceTesting/AcceptanceTesting.csproj +++ b/src/AcceptanceTesting/AcceptanceTesting.csproj @@ -2,6 +2,8 @@ net472;net6.0;net7.0 + true + ..\NServiceBusTests.snk @@ -13,4 +15,10 @@ + + + NServiceBusTests.snk + + + diff --git a/src/AcceptanceTesting/BridgeComponent.cs b/src/AcceptanceTesting/BridgeComponent.cs index 4ee90551..fa634bb9 100644 --- a/src/AcceptanceTesting/BridgeComponent.cs +++ b/src/AcceptanceTesting/BridgeComponent.cs @@ -1,7 +1,9 @@ using System; using System.Threading; using System.Threading.Tasks; +using AcceptanceTesting; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using NServiceBus; @@ -40,6 +42,10 @@ public override async Task Start(CancellationToken cancellationToken = default) .ConfigureServices((_, serviceCollection) => { serviceCollection.AddSingleton(loggerFactory); + serviceCollection.RemoveAll(typeof(IMessageShovel)); + serviceCollection.AddTransient(); + serviceCollection.AddTransient(); + serviceCollection.AddTransient(); }); host = await hostBuilder.StartAsync(cancellationToken).ConfigureAwait(false); diff --git a/src/AcceptanceTesting/FakeShovel.cs b/src/AcceptanceTesting/FakeShovel.cs new file mode 100644 index 00000000..423487fe --- /dev/null +++ b/src/AcceptanceTesting/FakeShovel.cs @@ -0,0 +1,33 @@ +namespace AcceptanceTesting +{ + using System; + using System.Threading; + using System.Threading.Tasks; + + public class FakeShovelHeader + { + public const string FailureHeader = "FakeShovelFailure"; + } + + class FakeShovel : IMessageShovel + { + + readonly IMessageShovel messageShovel; + + public FakeShovel(MessageShovel shovel) + { + messageShovel = shovel; + } + + public Task TransferMessage(TransferContext transferContext, CancellationToken cancellationToken = default) + { + var messageContext = transferContext.MessageToTransfer; + if (messageContext.Headers.ContainsKey(FakeShovelHeader.FailureHeader)) + { + throw new Exception("Incoming message has `FakeShovelFailure` header to test infrastructure failures"); + } + + return messageShovel.TransferMessage(transferContext, cancellationToken); + } + } +} \ No newline at end of file diff --git a/src/AcceptanceTests/Shared/TransferFailureTests.cs b/src/AcceptanceTests/Shared/TransferFailureTests.cs new file mode 100644 index 00000000..df883232 --- /dev/null +++ b/src/AcceptanceTests/Shared/TransferFailureTests.cs @@ -0,0 +1,89 @@ +using System.Collections.Generic; +using System.Collections.ObjectModel; +using System.Threading.Tasks; +using AcceptanceTesting; +using NServiceBus; +using NServiceBus.AcceptanceTesting; +using NUnit.Framework; + +public class TransferFailureTests : BridgeAcceptanceTest +{ + const string ReceiveDummyQueue = "TransferFailureTests_DummyQueue"; // Required because Bridge needs endpoints on both sides. + const string ErrorQueue = "TransferFailureTests_BridgeErrorQueue"; + const string FailedQHeader = "NServiceBus.MessagingBridge.FailedQ"; + + [Test] + public async Task Should_add_failedq_header_when_transfer_fails() + { + var ctx = await Scenario.Define() + .WithEndpoint() + .WithEndpoint(b => b + .When(c => c.EndpointsStarted, async (session, c) => + { + var opts = new SendOptions(); + opts.SetHeader(FakeShovelHeader.FailureHeader, string.Empty); + await session.Send(new FaultyMessage(), opts); + })) + .WithBridge(bridgeConfiguration => + { + var bridgeTransport = new TestableBridgeTransport(TransportBeingTested); + bridgeTransport.AddTestEndpoint(); + bridgeConfiguration.AddTransport(bridgeTransport); + bridgeTransport.ErrorQueue = ErrorQueue; + + var subscriberEndpoint = new BridgeEndpoint(ReceiveDummyQueue); + bridgeConfiguration.AddTestTransportEndpoint(subscriberEndpoint); + }) + .Done(c => c.MessageFailed) + .Run(); + + Assert.IsTrue(ctx.MessageFailed, "Message did not fail"); + Assert.IsTrue(ctx.FailedMessageHeaders.ContainsKey(FailedQHeader), + $"Failed message headers does not contain {FailedQHeader}"); + } + + public class Sender : EndpointConfigurationBuilder + { + public Sender() => + EndpointSetup(c => + { + c.ConfigureRouting().RouteToEndpoint(typeof(FaultyMessage), ReceiveDummyQueue); + }); + } + + public class ErrorSpy : EndpointConfigurationBuilder + { + public ErrorSpy() + { + EndpointSetup(c => + { + c.OverrideLocalAddress(ErrorQueue); + }); + } + + class FailedMessageHander : IHandleMessages + { + public FailedMessageHander(Context context) => testContext = context; + + public Task Handle(FaultyMessage message, IMessageHandlerContext context) + { + testContext.FailedMessageHeaders = + new ReadOnlyDictionary((IDictionary)context.MessageHeaders); + testContext.MessageFailed = true; + return Task.CompletedTask; + } + + readonly Context testContext; + } + } + + public class Context : ScenarioContext + { + public bool MessageFailed { get; set; } + public IReadOnlyDictionary FailedMessageHeaders { get; set; } + } + + public class FaultyMessage : ICommand + { + } +} \ No newline at end of file diff --git a/src/NServiceBus.MessagingBridge/BridgeHeaders.cs b/src/NServiceBus.MessagingBridge/BridgeHeaders.cs index 35273c37..7d6a992b 100644 --- a/src/NServiceBus.MessagingBridge/BridgeHeaders.cs +++ b/src/NServiceBus.MessagingBridge/BridgeHeaders.cs @@ -1,4 +1,5 @@ class BridgeHeaders { - public static string Transfer = "NServiceBus.Bridge.Transfer"; -} + public const string Transfer = "NServiceBus.Bridge.Transfer"; + public const string FailedQ = "NServiceBus.MessagingBridge.FailedQ"; +} \ No newline at end of file diff --git a/src/NServiceBus.MessagingBridge/Configuration/HostBuilderExtensions.cs b/src/NServiceBus.MessagingBridge/Configuration/HostBuilderExtensions.cs index 46bcd0db..a69ad499 100644 --- a/src/NServiceBus.MessagingBridge/Configuration/HostBuilderExtensions.cs +++ b/src/NServiceBus.MessagingBridge/Configuration/HostBuilderExtensions.cs @@ -55,7 +55,7 @@ public static IHostBuilder UseNServiceBusBridge( serviceCollection.AddSingleton(); serviceCollection.AddSingleton(); serviceCollection.AddSingleton(sp => sp.GetRequiredService()); - serviceCollection.AddTransient(); + serviceCollection.AddTransient(); }); return hostBuilder; diff --git a/src/NServiceBus.MessagingBridge/EndpointProxyFactory.cs b/src/NServiceBus.MessagingBridge/EndpointProxyFactory.cs index aaca02da..0daf1a04 100644 --- a/src/NServiceBus.MessagingBridge/EndpointProxyFactory.cs +++ b/src/NServiceBus.MessagingBridge/EndpointProxyFactory.cs @@ -47,8 +47,8 @@ public Task CreateProxy( messageContext, shouldPassTransportTransaction); - return serviceProvider.GetRequiredService() - .TransferMessage(transferContext, ct); + return serviceProvider.GetRequiredService() + .TransferMessage(transferContext, cancellationToken: ct); }, translatedErrorQueue); diff --git a/src/NServiceBus.MessagingBridge/IMessageShovel.cs b/src/NServiceBus.MessagingBridge/IMessageShovel.cs new file mode 100644 index 00000000..5d86dcd4 --- /dev/null +++ b/src/NServiceBus.MessagingBridge/IMessageShovel.cs @@ -0,0 +1,7 @@ +using System.Threading; +using System.Threading.Tasks; + +interface IMessageShovel +{ + Task TransferMessage(TransferContext transferContext, CancellationToken cancellationToken = default); +} \ No newline at end of file diff --git a/src/NServiceBus.MessagingBridge/MessageShovel.cs b/src/NServiceBus.MessagingBridge/MessageShovel.cs index 11364bf3..70e5704e 100644 --- a/src/NServiceBus.MessagingBridge/MessageShovel.cs +++ b/src/NServiceBus.MessagingBridge/MessageShovel.cs @@ -6,7 +6,7 @@ using NServiceBus.Faults; using NServiceBus.Transport; -class MessageShovel +sealed class MessageShovel : IMessageShovel { public MessageShovel( ILogger logger, @@ -26,6 +26,7 @@ public async Task TransferMessage(TransferContext transferContext, CancellationT var messageContext = transferContext.MessageToTransfer; var messageToSend = new OutgoingMessage(messageContext.NativeMessageId, messageContext.Headers, messageContext.Body); + messageToSend.Headers.Remove(BridgeHeaders.FailedQ); var transferDetails = $"{transferContext.SourceTransport}->{targetEndpointDispatcher.TransportName}"; @@ -98,4 +99,4 @@ void TransformAddressHeader( readonly ILogger logger; readonly IEndpointRegistry targetEndpointRegistry; -} +} \ No newline at end of file diff --git a/src/NServiceBus.MessagingBridge/NServiceBus.MessagingBridge.csproj b/src/NServiceBus.MessagingBridge/NServiceBus.MessagingBridge.csproj index c10bde73..5b9873d6 100644 --- a/src/NServiceBus.MessagingBridge/NServiceBus.MessagingBridge.csproj +++ b/src/NServiceBus.MessagingBridge/NServiceBus.MessagingBridge.csproj @@ -18,5 +18,6 @@ + - + \ No newline at end of file diff --git a/src/NServiceBus.MessagingBridge/RawEndpoints/RawEndpointErrorHandlingPolicy.cs b/src/NServiceBus.MessagingBridge/RawEndpoints/RawEndpointErrorHandlingPolicy.cs index 2cc940f5..dde40978 100644 --- a/src/NServiceBus.MessagingBridge/RawEndpoints/RawEndpointErrorHandlingPolicy.cs +++ b/src/NServiceBus.MessagingBridge/RawEndpoints/RawEndpointErrorHandlingPolicy.cs @@ -35,6 +35,7 @@ async Task MoveToErrorQueue(ErrorContext errorContext, string headers.Remove(Headers.DelayedRetries); headers.Remove(Headers.ImmediateRetries); + headers.Add(BridgeHeaders.FailedQ, localAddress); ExceptionHeaderHelper.SetExceptionHeaders(headers, errorContext.Exception); var transportOperations = new TransportOperations(new TransportOperation(outgoingMessage, new UnicastAddressTag(errorQueue))); diff --git a/src/UnitTests/ApprovalFiles/APIApprovals.PublicApi.approved.txt b/src/UnitTests/ApprovalFiles/APIApprovals.PublicApi.approved.txt index 1e4e957c..1eb09e9f 100644 --- a/src/UnitTests/ApprovalFiles/APIApprovals.PublicApi.approved.txt +++ b/src/UnitTests/ApprovalFiles/APIApprovals.PublicApi.approved.txt @@ -1,3 +1,4 @@ +[assembly: System.Runtime.CompilerServices.InternalsVisibleTo(@"AcceptanceTesting, PublicKey=00240000048000009400000006020000002400005253413100040000010001007f16e21368ff041183fab592d9e8ed37e7be355e93323147a1d29983d6e591b04282e4da0c9e18bd901e112c0033925eb7d7872c2f1706655891c5c9d57297994f707d16ee9a8f40d978f064ee1ffc73c0db3f4712691b23bf596f75130f4ec978cf78757ec034625a5f27e6bb50c618931ea49f6f628fd74271c32959efb1c5")] [assembly: System.Runtime.CompilerServices.InternalsVisibleTo(@"UnitTests, PublicKey=00240000048000009400000006020000002400005253413100040000010001007f16e21368ff041183fab592d9e8ed37e7be355e93323147a1d29983d6e591b04282e4da0c9e18bd901e112c0033925eb7d7872c2f1706655891c5c9d57297994f707d16ee9a8f40d978f064ee1ffc73c0db3f4712691b23bf596f75130f4ec978cf78757ec034625a5f27e6bb50c618931ea49f6f628fd74271c32959efb1c5")] namespace NServiceBus {