diff --git a/src/AcceptanceTesting/ScenarioContextLogger.cs b/src/AcceptanceTesting/ScenarioContextLogger.cs index bae49cd4..5fa3672d 100644 --- a/src/AcceptanceTesting/ScenarioContextLogger.cs +++ b/src/AcceptanceTesting/ScenarioContextLogger.cs @@ -19,7 +19,7 @@ public void Log( Exception exception, Func formatter) { - scenarioContext.AddTrace(categoryName + ": " + formatter(state, exception)); + scenarioContext.AddTrace($"{categoryName}: {formatter(state, exception)} - {exception}"); } readonly string categoryName; diff --git a/src/AcceptanceTests/Shared/Retry.cs b/src/AcceptanceTests/Shared/Retry.cs index 98d92c7b..fd8a6e0b 100644 --- a/src/AcceptanceTests/Shared/Retry.cs +++ b/src/AcceptanceTests/Shared/Retry.cs @@ -6,74 +6,65 @@ using NServiceBus.AcceptanceTesting; using NServiceBus.AcceptanceTesting.Customization; using NServiceBus.Faults; +using NServiceBus.Pipeline; using NUnit.Framework; using Conventions = NServiceBus.AcceptanceTesting.Customization.Conventions; public class Retry : BridgeAcceptanceTest { [Test] - public async Task Should_forward_retry_messages() + public async Task Should_work() { var ctx = await Scenario.Define() - .WithEndpoint(b => b - .When(c => c.EndpointsStarted, (session, c) => - { - return session.Publish(new FaultyMessage()); - })) - .WithEndpoint(builder => builder.DoNotFailOnErrorMessages()) - .WithEndpoint() + .WithEndpoint(builder => + { + builder.DoNotFailOnErrorMessages(); + builder.When(c => c.EndpointsStarted, (session, _) => session.SendLocal(new FaultyMessage())); + }) + .WithEndpoint() + .WithEndpoint() .WithBridge(bridgeConfiguration => { var bridgeTransport = new TestableBridgeTransport(DefaultTestServer.GetTestTransportDefinition()) { Name = "DefaultTestingTransport" }; - bridgeTransport.AddTestEndpoint(); - bridgeTransport.AddTestEndpoint(); + bridgeTransport.AddTestEndpoint(); + bridgeTransport.AddTestEndpoint(); bridgeConfiguration.AddTransport(bridgeTransport); - var subscriberEndpoint = + var processingEndpoint = new BridgeEndpoint(Conventions.EndpointNamingConvention(typeof(ProcessingEndpoint))); - subscriberEndpoint.RegisterPublisher( - Conventions.EndpointNamingConvention(typeof(PublishingEndpoint))); var theOtherTransport = new TestableBridgeTransport(TransportBeingTested); - theOtherTransport.HasEndpoint(subscriberEndpoint); + theOtherTransport.HasEndpoint(processingEndpoint); bridgeConfiguration.AddTransport(theOtherTransport); }) - .Done(c => c.RetryDelivered) + .Done(c => c.GotRetrySuccessfullAck && c.MessageAudited) .Run(); - Assert.IsTrue(ctx.RetryDelivered); Assert.IsTrue(ctx.MessageFailed); + Assert.IsTrue(ctx.RetryDelivered); + Assert.IsTrue(ctx.GotRetrySuccessfullAck); + Assert.IsTrue(ctx.MessageAudited); foreach (var header in ctx.FailedMessageHeaders) { if (ctx.ReceivedMessageHeaders.TryGetValue(header.Key, out var receivedHeaderValue)) { Assert.AreEqual(header.Value, receivedHeaderValue, - $"{header.Key} is not the same on processed message and audit message."); + $"{header.Key} is not the same on processed message and message sent to the error queue"); } } } - public class PublishingEndpoint : EndpointConfigurationBuilder - { - public PublishingEndpoint() => - EndpointSetup(c => - { - c.OnEndpointSubscribed((_, ctx) => - { - ctx.SubscriberSubscribed = true; - }); - c.ConfigureRouting().RouteToEndpoint(typeof(FaultyMessage), typeof(ProcessingEndpoint)); - }); - } - public class ProcessingEndpoint : EndpointConfigurationBuilder { - public ProcessingEndpoint() => EndpointSetup( - c => c.SendFailedMessagesTo("Retry.ErrorSpy")); + public ProcessingEndpoint() => EndpointSetup(c => + { + c.SendFailedMessagesTo(Conventions.EndpointNamingConvention(typeof(FakeSCError))); + c.AuditProcessedMessagesTo(Conventions.EndpointNamingConvention(typeof(FakeSCAudit))); + }); public class MessageHandler : IHandleMessages { @@ -83,36 +74,73 @@ public class MessageHandler : IHandleMessages public Task Handle(FaultyMessage message, IMessageHandlerContext context) { - testContext.ReceivedMessageHeaders = - new ReadOnlyDictionary((IDictionary)context.MessageHeaders); + if (testContext.MessageFailed) + { + testContext.RetryDelivered = true; + return Task.CompletedTask; + } - testContext.MessageFailed = true; + testContext.ReceivedMessageHeaders = + new ReadOnlyDictionary((IDictionary)context.MessageHeaders); throw new Exception("Simulated"); } } + } - public class RetryMessageHandler : IHandleMessages + public class FakeSCError : EndpointConfigurationBuilder + { + public FakeSCError() => EndpointSetup((c, runDescriptor) => + c.Pipeline.Register(new ControlMessageBehavior(runDescriptor.ScenarioContext as Context), "Checks that the retry confirmation arrived")); + + class FailedMessageHander : IHandleMessages { + public FailedMessageHander(Context context) => testContext = context; + + public Task Handle(FaultyMessage message, IMessageHandlerContext context) + { + testContext.FailedMessageHeaders = + new ReadOnlyDictionary((IDictionary)context.MessageHeaders); + + testContext.MessageFailed = true; + + var sendOptions = new SendOptions(); + + //Send the message to the FailedQ address + string destination = context.MessageHeaders[FaultsHeaderKeys.FailedQ]; + sendOptions.SetDestination(destination); + + //ServiceControl adds these headers when retrying + sendOptions.SetHeader("ServiceControl.Retry.UniqueMessageId", "some-id"); + sendOptions.SetHeader("ServiceControl.Retry.AcknowledgementQueue", Conventions.EndpointNamingConvention(typeof(FakeSCError))); + return context.Send(new FaultyMessage(), sendOptions); + } + readonly Context testContext; + } - public RetryMessageHandler(Context context) => testContext = context; + class ControlMessageBehavior : Behavior + { + public ControlMessageBehavior(Context testContext) => this.testContext = testContext; - public Task Handle(RetryMessage message, IMessageHandlerContext context) + public override async Task Invoke(IIncomingPhysicalMessageContext context, Func next) { - testContext.RetryDelivered = true; + if (context.MessageHeaders.ContainsKey("ServiceControl.Retry.Successful")) + { + testContext.GotRetrySuccessfullAck = true; + return; + } + await next(); - return Task.CompletedTask; } + + Context testContext; } } - public class ErrorSpy : EndpointConfigurationBuilder + public class FakeSCAudit : EndpointConfigurationBuilder { - public ErrorSpy() - { - var endpoint = EndpointSetup(c => c.AutoSubscribe().DisableFor()); - } + public FakeSCAudit() => EndpointSetup(); class FailedMessageHander : IHandleMessages { @@ -120,18 +148,9 @@ class FailedMessageHander : IHandleMessages public Task Handle(FaultyMessage message, IMessageHandlerContext context) { - testContext.FailedMessageHeaders = - new ReadOnlyDictionary((IDictionary)context.MessageHeaders); - - var sendOptions = new SendOptions(); - - //Send the message to the FailedQ address - string destination = context.MessageHeaders[FaultsHeaderKeys.FailedQ]; - sendOptions.SetDestination(destination); + testContext.MessageAudited = true; - //ServiceControl adds this header when retrying - sendOptions.SetHeader("ServiceControl.Retry.UniqueMessageId", "XYZ"); - return context.Send(new RetryMessage(), sendOptions); + return Task.CompletedTask; } readonly Context testContext; @@ -144,14 +163,13 @@ public class Context : ScenarioContext public bool MessageFailed { get; set; } public IReadOnlyDictionary ReceivedMessageHeaders { get; set; } public IReadOnlyDictionary FailedMessageHeaders { get; set; } + public IReadOnlyDictionary AuditMessageHeaders { get; set; } public bool RetryDelivered { get; set; } + public bool GotRetrySuccessfullAck { get; set; } + public bool MessageAudited { get; set; } } - public class FaultyMessage : IEvent - { - } - - public class RetryMessage : IMessage + public class FaultyMessage : IMessage { } } \ No newline at end of file diff --git a/src/NServiceBus.Transport.Bridge/MessageShovel.cs b/src/NServiceBus.Transport.Bridge/MessageShovel.cs index 63db21a8..11364bf3 100644 --- a/src/NServiceBus.Transport.Bridge/MessageShovel.cs +++ b/src/NServiceBus.Transport.Bridge/MessageShovel.cs @@ -37,13 +37,17 @@ public async Task TransferMessage(TransferContext transferContext, CancellationT //We _do not_ transform the ReplyToAddress header TransformAddressHeader(messageToSend, targetEndpointRegistry, FaultsHeaderKeys.FailedQ); } - else if (IsRetryMessage(messageToSend)) + else if (IsAuditMessage(messageToSend)) { - //This is a message retried from ServiceControl. Its ReplyToAddress header has been preserved (as stated above) so we don't need to transform it back + //This is a message sent to the audit queue. We _do not_ transform any headers. + //This check needs to be done _before_ the retry message check because we don't want to treat audited retry messages as retry messages. } - else if (IsAuditMessage(messageToSend)) + else if (IsRetryMessage(messageToSend)) { - //This is a message sent to the audit queue. We _do not_ transform its ReplyToAddress header + //This is a message retried from ServiceControl. Its ReplyToAddress header has been preserved (as stated above) so we don't need to transform it back + + //Transform the retry ack queue address + TransformAddressHeader(messageToSend, targetEndpointRegistry, "ServiceControl.Retry.AcknowledgementQueue"); } else { diff --git a/src/NServiceBus.Transport.Bridge/MessageShovelErrorHandlingPolicy.cs b/src/NServiceBus.Transport.Bridge/MessageShovelErrorHandlingPolicy.cs index cb3666c0..da76a8b3 100644 --- a/src/NServiceBus.Transport.Bridge/MessageShovelErrorHandlingPolicy.cs +++ b/src/NServiceBus.Transport.Bridge/MessageShovelErrorHandlingPolicy.cs @@ -19,7 +19,7 @@ public Task OnError( { if (handlingContext.Error.ImmediateProcessingFailures < 3) { - logger.LogWarning("Message shovel operation failed and will be retried", handlingContext.Error.Exception); + logger.LogWarning(handlingContext.Error.Exception, "Message shovel operation failed and will be retried"); return Task.FromResult(ErrorHandleResult.RetryRequired); } diff --git a/src/UnitTests/MessageShovelTests.cs b/src/UnitTests/MessageShovelTests.cs index 1f8ece45..48159b25 100644 --- a/src/UnitTests/MessageShovelTests.cs +++ b/src/UnitTests/MessageShovelTests.cs @@ -28,6 +28,22 @@ public async Task Should_transform_failed_queue_header() Assert.AreEqual("error", transferDetails.OutgoingOperation.Message.Headers[FaultsHeaderKeys.FailedQ]); } + [Test] + public async Task Should_transform_retry_ack_queue_header() + { + var transferDetails = await Transfer(retryAckQueueAddress: "error@MyMachine"); + + Assert.AreEqual("error", transferDetails.OutgoingOperation.Message.Headers["ServiceControl.Retry.AcknowledgementQueue"]); + } + + [Test] + public async Task Should_not_transform_retry_ack_header_for_audited_message() + { + var transferDetails = await Transfer(retryAckQueueAddress: "error@MyMachine", isAuditMessage: true); + + Assert.AreEqual("error@MyMachine", transferDetails.OutgoingOperation.Message.Headers["ServiceControl.Retry.AcknowledgementQueue"]); + } + [Test] public async Task Should_handle_send_only_endpoints() { @@ -78,6 +94,8 @@ static async Task Transfer( string targetAddress = null, string replyToAddress = null, string failedQueueAddress = null, + string retryAckQueueAddress = null, + bool isAuditMessage = false, TransportTransaction transportTransaction = null, bool passTransportTransaction = false) { @@ -99,6 +117,17 @@ static async Task Transfer( headers.Add(FaultsHeaderKeys.FailedQ, failedQueueAddress); } + if (!string.IsNullOrEmpty(retryAckQueueAddress)) + { + headers.Add("ServiceControl.Retry.UniqueMessageId", "some-id"); + headers.Add("ServiceControl.Retry.AcknowledgementQueue", retryAckQueueAddress); + } + + if (isAuditMessage) + { + headers.Add(Headers.ProcessingEnded, DateTime.UtcNow.ToString()); + } + var targetEndpoint = new BridgeEndpoint("TargetEndpoint", targetAddress); var dispatcherRegistry = new FakeTargetEndpointRegistry("TargetTransport", targetEndpoint); var shovel = new MessageShovel(logger, dispatcherRegistry);