Skip to content

Commit

Permalink
Make sure that the ServiceControl.Retry.AcknowledgementQueue header i…
Browse files Browse the repository at this point in the history
…s translated (#222) (#227)

* Add check that we receive the retry ack

* Moar

* Add code to translate the retry ack header

* Typo

* Include audit in test as well

* Hardcode the audit address

* Fix logging

* Make the test logger print exception

* Don't map the ack queue when auditing

* Update src/NServiceBus.Transport.Bridge/MessageShovel.cs



---------

Co-authored-by: Szymon Pobiega <szymon.pobiega@gmail.com>
  • Loading branch information
andreasohlund and SzymonPobiega authored Mar 9, 2023
1 parent cca74ee commit 210c1ca
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 67 deletions.
2 changes: 1 addition & 1 deletion src/AcceptanceTesting/ScenarioContextLogger.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public void Log<TState>(
Exception exception,
Func<TState, Exception, string> formatter)
{
scenarioContext.AddTrace(categoryName + ": " + formatter(state, exception));
scenarioContext.AddTrace($"{categoryName}: {formatter(state, exception)} - {exception}");
}

readonly string categoryName;
Expand Down
140 changes: 79 additions & 61 deletions src/AcceptanceTests/Shared/Retry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,74 +6,65 @@
using NServiceBus.AcceptanceTesting;
using NServiceBus.AcceptanceTesting.Customization;
using NServiceBus.Faults;
using NServiceBus.Pipeline;
using NUnit.Framework;
using Conventions = NServiceBus.AcceptanceTesting.Customization.Conventions;

public class Retry : BridgeAcceptanceTest
{
[Test]
public async Task Should_forward_retry_messages()
public async Task Should_work()
{
var ctx = await Scenario.Define<Context>()
.WithEndpoint<PublishingEndpoint>(b => b
.When(c => c.EndpointsStarted, (session, c) =>
{
return session.Publish(new FaultyMessage());
}))
.WithEndpoint<ProcessingEndpoint>(builder => builder.DoNotFailOnErrorMessages())
.WithEndpoint<ErrorSpy>()
.WithEndpoint<ProcessingEndpoint>(builder =>
{
builder.DoNotFailOnErrorMessages();
builder.When(c => c.EndpointsStarted, (session, _) => session.SendLocal(new FaultyMessage()));
})
.WithEndpoint<FakeSCError>()
.WithEndpoint<FakeSCAudit>()
.WithBridge(bridgeConfiguration =>
{
var bridgeTransport = new TestableBridgeTransport(DefaultTestServer.GetTestTransportDefinition())
{
Name = "DefaultTestingTransport"
};
bridgeTransport.AddTestEndpoint<PublishingEndpoint>();
bridgeTransport.AddTestEndpoint<ErrorSpy>();
bridgeTransport.AddTestEndpoint<FakeSCError>();
bridgeTransport.AddTestEndpoint<FakeSCAudit>();
bridgeConfiguration.AddTransport(bridgeTransport);

var subscriberEndpoint =
var processingEndpoint =
new BridgeEndpoint(Conventions.EndpointNamingConvention(typeof(ProcessingEndpoint)));
subscriberEndpoint.RegisterPublisher<FaultyMessage>(
Conventions.EndpointNamingConvention(typeof(PublishingEndpoint)));

var theOtherTransport = new TestableBridgeTransport(TransportBeingTested);
theOtherTransport.HasEndpoint(subscriberEndpoint);
theOtherTransport.HasEndpoint(processingEndpoint);
bridgeConfiguration.AddTransport(theOtherTransport);
})
.Done(c => c.RetryDelivered)
.Done(c => c.GotRetrySuccessfullAck && c.MessageAudited)
.Run();

Assert.IsTrue(ctx.RetryDelivered);
Assert.IsTrue(ctx.MessageFailed);
Assert.IsTrue(ctx.RetryDelivered);
Assert.IsTrue(ctx.GotRetrySuccessfullAck);
Assert.IsTrue(ctx.MessageAudited);

foreach (var header in ctx.FailedMessageHeaders)
{
if (ctx.ReceivedMessageHeaders.TryGetValue(header.Key, out var receivedHeaderValue))
{
Assert.AreEqual(header.Value, receivedHeaderValue,
$"{header.Key} is not the same on processed message and audit message.");
$"{header.Key} is not the same on processed message and message sent to the error queue");
}
}
}

public class PublishingEndpoint : EndpointConfigurationBuilder
{
public PublishingEndpoint() =>
EndpointSetup<DefaultTestPublisher>(c =>
{
c.OnEndpointSubscribed<Context>((_, ctx) =>
{
ctx.SubscriberSubscribed = true;
});
c.ConfigureRouting().RouteToEndpoint(typeof(FaultyMessage), typeof(ProcessingEndpoint));
});
}

public class ProcessingEndpoint : EndpointConfigurationBuilder
{
public ProcessingEndpoint() => EndpointSetup<DefaultServer>(
c => c.SendFailedMessagesTo("Retry.ErrorSpy"));
public ProcessingEndpoint() => EndpointSetup<DefaultServer>(c =>
{
c.SendFailedMessagesTo(Conventions.EndpointNamingConvention(typeof(FakeSCError)));
c.AuditProcessedMessagesTo(Conventions.EndpointNamingConvention(typeof(FakeSCAudit)));
});

public class MessageHandler : IHandleMessages<FaultyMessage>
{
Expand All @@ -83,55 +74,83 @@ public class MessageHandler : IHandleMessages<FaultyMessage>

public Task Handle(FaultyMessage message, IMessageHandlerContext context)
{
testContext.ReceivedMessageHeaders =
new ReadOnlyDictionary<string, string>((IDictionary<string, string>)context.MessageHeaders);
if (testContext.MessageFailed)
{
testContext.RetryDelivered = true;
return Task.CompletedTask;
}

testContext.MessageFailed = true;
testContext.ReceivedMessageHeaders =
new ReadOnlyDictionary<string, string>((IDictionary<string, string>)context.MessageHeaders);

throw new Exception("Simulated");
}
}
}

public class RetryMessageHandler : IHandleMessages<RetryMessage>
public class FakeSCError : EndpointConfigurationBuilder
{
public FakeSCError() => EndpointSetup<DefaultTestServer>((c, runDescriptor) =>
c.Pipeline.Register(new ControlMessageBehavior(runDescriptor.ScenarioContext as Context), "Checks that the retry confirmation arrived"));

class FailedMessageHander : IHandleMessages<FaultyMessage>
{
public FailedMessageHander(Context context) => testContext = context;

public Task Handle(FaultyMessage message, IMessageHandlerContext context)
{
testContext.FailedMessageHeaders =
new ReadOnlyDictionary<string, string>((IDictionary<string, string>)context.MessageHeaders);

testContext.MessageFailed = true;

var sendOptions = new SendOptions();

//Send the message to the FailedQ address
string destination = context.MessageHeaders[FaultsHeaderKeys.FailedQ];
sendOptions.SetDestination(destination);

//ServiceControl adds these headers when retrying
sendOptions.SetHeader("ServiceControl.Retry.UniqueMessageId", "some-id");
sendOptions.SetHeader("ServiceControl.Retry.AcknowledgementQueue", Conventions.EndpointNamingConvention(typeof(FakeSCError)));
return context.Send(new FaultyMessage(), sendOptions);
}

readonly Context testContext;
}

public RetryMessageHandler(Context context) => testContext = context;
class ControlMessageBehavior : Behavior<IIncomingPhysicalMessageContext>
{
public ControlMessageBehavior(Context testContext) => this.testContext = testContext;

public Task Handle(RetryMessage message, IMessageHandlerContext context)
public override async Task Invoke(IIncomingPhysicalMessageContext context, Func<Task> next)
{
testContext.RetryDelivered = true;
if (context.MessageHeaders.ContainsKey("ServiceControl.Retry.Successful"))
{
testContext.GotRetrySuccessfullAck = true;
return;
}
await next();

return Task.CompletedTask;
}

Context testContext;
}
}

public class ErrorSpy : EndpointConfigurationBuilder
public class FakeSCAudit : EndpointConfigurationBuilder
{
public ErrorSpy()
{
var endpoint = EndpointSetup<DefaultTestServer>(c => c.AutoSubscribe().DisableFor<FaultyMessage>());
}
public FakeSCAudit() => EndpointSetup<DefaultTestServer>();

class FailedMessageHander : IHandleMessages<FaultyMessage>
{
public FailedMessageHander(Context context) => testContext = context;

public Task Handle(FaultyMessage message, IMessageHandlerContext context)
{
testContext.FailedMessageHeaders =
new ReadOnlyDictionary<string, string>((IDictionary<string, string>)context.MessageHeaders);

var sendOptions = new SendOptions();

//Send the message to the FailedQ address
string destination = context.MessageHeaders[FaultsHeaderKeys.FailedQ];
sendOptions.SetDestination(destination);
testContext.MessageAudited = true;

//ServiceControl adds this header when retrying
sendOptions.SetHeader("ServiceControl.Retry.UniqueMessageId", "XYZ");
return context.Send(new RetryMessage(), sendOptions);
return Task.CompletedTask;
}

readonly Context testContext;
Expand All @@ -144,14 +163,13 @@ public class Context : ScenarioContext
public bool MessageFailed { get; set; }
public IReadOnlyDictionary<string, string> ReceivedMessageHeaders { get; set; }
public IReadOnlyDictionary<string, string> FailedMessageHeaders { get; set; }
public IReadOnlyDictionary<string, string> AuditMessageHeaders { get; set; }
public bool RetryDelivered { get; set; }
public bool GotRetrySuccessfullAck { get; set; }
public bool MessageAudited { get; set; }
}

public class FaultyMessage : IEvent
{
}

public class RetryMessage : IMessage
public class FaultyMessage : IMessage
{
}
}
12 changes: 8 additions & 4 deletions src/NServiceBus.Transport.Bridge/MessageShovel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,17 @@ public async Task TransferMessage(TransferContext transferContext, CancellationT
//We _do not_ transform the ReplyToAddress header
TransformAddressHeader(messageToSend, targetEndpointRegistry, FaultsHeaderKeys.FailedQ);
}
else if (IsRetryMessage(messageToSend))
else if (IsAuditMessage(messageToSend))
{
//This is a message retried from ServiceControl. Its ReplyToAddress header has been preserved (as stated above) so we don't need to transform it back
//This is a message sent to the audit queue. We _do not_ transform any headers.
//This check needs to be done _before_ the retry message check because we don't want to treat audited retry messages as retry messages.
}
else if (IsAuditMessage(messageToSend))
else if (IsRetryMessage(messageToSend))
{
//This is a message sent to the audit queue. We _do not_ transform its ReplyToAddress header
//This is a message retried from ServiceControl. Its ReplyToAddress header has been preserved (as stated above) so we don't need to transform it back

//Transform the retry ack queue address
TransformAddressHeader(messageToSend, targetEndpointRegistry, "ServiceControl.Retry.AcknowledgementQueue");
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public Task<ErrorHandleResult> OnError(
{
if (handlingContext.Error.ImmediateProcessingFailures < 3)
{
logger.LogWarning("Message shovel operation failed and will be retried", handlingContext.Error.Exception);
logger.LogWarning(handlingContext.Error.Exception, "Message shovel operation failed and will be retried");
return Task.FromResult(ErrorHandleResult.RetryRequired);
}

Expand Down
29 changes: 29 additions & 0 deletions src/UnitTests/MessageShovelTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,22 @@ public async Task Should_transform_failed_queue_header()
Assert.AreEqual("error", transferDetails.OutgoingOperation.Message.Headers[FaultsHeaderKeys.FailedQ]);
}

[Test]
public async Task Should_transform_retry_ack_queue_header()
{
var transferDetails = await Transfer(retryAckQueueAddress: "error@MyMachine");

Assert.AreEqual("error", transferDetails.OutgoingOperation.Message.Headers["ServiceControl.Retry.AcknowledgementQueue"]);
}

[Test]
public async Task Should_not_transform_retry_ack_header_for_audited_message()
{
var transferDetails = await Transfer(retryAckQueueAddress: "error@MyMachine", isAuditMessage: true);

Assert.AreEqual("error@MyMachine", transferDetails.OutgoingOperation.Message.Headers["ServiceControl.Retry.AcknowledgementQueue"]);
}

[Test]
public async Task Should_handle_send_only_endpoints()
{
Expand Down Expand Up @@ -78,6 +94,8 @@ static async Task<TransferDetails> Transfer(
string targetAddress = null,
string replyToAddress = null,
string failedQueueAddress = null,
string retryAckQueueAddress = null,
bool isAuditMessage = false,
TransportTransaction transportTransaction = null,
bool passTransportTransaction = false)
{
Expand All @@ -99,6 +117,17 @@ static async Task<TransferDetails> Transfer(
headers.Add(FaultsHeaderKeys.FailedQ, failedQueueAddress);
}

if (!string.IsNullOrEmpty(retryAckQueueAddress))
{
headers.Add("ServiceControl.Retry.UniqueMessageId", "some-id");
headers.Add("ServiceControl.Retry.AcknowledgementQueue", retryAckQueueAddress);
}

if (isAuditMessage)
{
headers.Add(Headers.ProcessingEnded, DateTime.UtcNow.ToString());
}

var targetEndpoint = new BridgeEndpoint("TargetEndpoint", targetAddress);
var dispatcherRegistry = new FakeTargetEndpointRegistry("TargetTransport", targetEndpoint);
var shovel = new MessageShovel(logger, dispatcherRegistry);
Expand Down

0 comments on commit 210c1ca

Please sign in to comment.