Skip to content

Commit

Permalink
Merge pull request #488 from Particular/revert-besteffort-replyto-3.0
Browse files Browse the repository at this point in the history
Revert #484
  • Loading branch information
tmasternak authored Aug 6, 2024
2 parents 969ba9f + 0a835ac commit bd5f72d
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 108 deletions.
103 changes: 34 additions & 69 deletions src/AcceptanceTests/Shared/Retry.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Linq;
using System.Threading.Tasks;
using NServiceBus;
using NServiceBus.AcceptanceTesting;
using NServiceBus.AcceptanceTesting.Customization;
using NServiceBus.Faults;
using NServiceBus.Logging;
using NServiceBus.Pipeline;
using NUnit.Framework;
using Conventions = NServiceBus.AcceptanceTesting.Customization.Conventions;
Expand All @@ -23,104 +22,48 @@ public async Task Should_work()
builder.When(c => c.EndpointsStarted, (session, _) => session.SendLocal(new FaultyMessage()));
})
.WithEndpoint<FakeSCError>()
.WithEndpoint<FakeSCAudit>()
.WithBridge(bridgeConfiguration =>
{
var bridgeTransport = new TestableBridgeTransport(DefaultTestServer.GetTestTransportDefinition())
{
Name = "DefaultTestingTransport"
};
bridgeTransport.AddTestEndpoint<FakeSCError>();
bridgeTransport.AddTestEndpoint<FakeSCAudit>();
bridgeConfiguration.AddTransport(bridgeTransport);

var processingEndpoint =
new BridgeEndpoint(Conventions.EndpointNamingConvention(typeof(ProcessingEndpoint)));

var theOtherTransport = new TestableBridgeTransport(TransportBeingTested);
theOtherTransport.AddTestEndpoint<ProcessingEndpoint>();
theOtherTransport.HasEndpoint(processingEndpoint);
bridgeConfiguration.AddTransport(theOtherTransport);
})
.Done(c => c.GotRetrySuccessfullAck)
.Done(c => c.GotRetrySuccessfullAck && c.MessageAudited)
.Run();

Assert.IsTrue(ctx.MessageFailed);
Assert.IsTrue(ctx.RetryDelivered);
Assert.IsTrue(ctx.GotRetrySuccessfullAck);
Assert.IsTrue(ctx.MessageAudited);

foreach (var header in ctx.FailedMessageHeaders)
{
if (ctx.ReceivedMessageHeaders.TryGetValue(header.Key, out var receivedHeaderValue))
{
if (header.Key == Headers.ReplyToAddress)
{
Assert.IsTrue(receivedHeaderValue.Contains(nameof(ProcessingEndpoint), StringComparison.InvariantCultureIgnoreCase),
$"The ReplyToAddress received by ServiceControl ({TransportBeingTested} physical address) should contain the logical name of the endpoint.");
}
else
{
Assert.AreEqual(header.Value, receivedHeaderValue,
$"{header.Key} is not the same on processed message and message sent to the error queue");
}
Assert.AreEqual(header.Value, receivedHeaderValue,
$"{header.Key} is not the same on processed message and message sent to the error queue");
}
}
}

[Test]
public async Task Should_log_warn_when_best_effort_ReplyToAddress_fails()
{
var ctx = await Scenario.Define<Context>()
.WithEndpoint<SendingEndpoint>(builder =>
{
builder.DoNotFailOnErrorMessages();
builder.When(c => c.EndpointsStarted, (session, _) => session.Send(new FaultyMessage()));
})
.WithEndpoint<ProcessingEndpoint>(builder =>
{
builder.DoNotFailOnErrorMessages();
builder.When(c => c.EndpointsStarted, (session, _) =>
{
var options = new SendOptions();
options.RouteToThisEndpoint();
options.SetHeader(Headers.ReplyToAddress, "address-not-declared-in-the-bridge");
return session.Send(new FaultyMessage(), options);
});
})
.WithEndpoint<FakeSCError>()
.WithBridge(bridgeConfiguration =>
{
var bridgeTransport = new TestableBridgeTransport(DefaultTestServer.GetTestTransportDefinition())
{
Name = "DefaultTestingTransport"
};
bridgeTransport.AddTestEndpoint<FakeSCError>();
bridgeConfiguration.AddTransport(bridgeTransport);

var theOtherTransport = new TestableBridgeTransport(TransportBeingTested);
theOtherTransport.AddTestEndpoint<ProcessingEndpoint>();
bridgeConfiguration.AddTransport(theOtherTransport);
})
.Done(c => c.GotRetrySuccessfullAck)
.Run();

var translationFailureLogs = ctx.Logs.ToArray().Where(i =>
i.Message.Contains("Could not translate") &&
i.Message.Contains("address. Consider using `.HasEndpoint()`"));

//There is only one warning here because the ServiceControl testing fake does not properly set the ReplyToAddress header value
Assert.AreEqual(1, translationFailureLogs.Count(),
"Bridge should log warnings when ReplyToAddress cannot be translated for failed message and retry.");
}

public class SendingEndpoint : EndpointConfigurationBuilder
{
public SendingEndpoint() => EndpointSetup<DefaultServer>(c =>
{
c.SendFailedMessagesTo(Conventions.EndpointNamingConvention(typeof(FakeSCError)));
c.ConfigureRouting().RouteToEndpoint(typeof(FaultyMessage), Conventions.EndpointNamingConvention(typeof(ProcessingEndpoint)));
});
}

public class ProcessingEndpoint : EndpointConfigurationBuilder
{
public ProcessingEndpoint() => EndpointSetup<DefaultServer>(c =>
{
c.SendFailedMessagesTo(Conventions.EndpointNamingConvention(typeof(FakeSCError)));
c.AuditProcessedMessagesTo(Conventions.EndpointNamingConvention(typeof(FakeSCAudit)));
});

public class MessageHandler : IHandleMessages<FaultyMessage>
Expand Down Expand Up @@ -195,13 +138,35 @@ public override async Task Invoke(IIncomingPhysicalMessageContext context, Func<
}
}

public class FakeSCAudit : EndpointConfigurationBuilder
{
public FakeSCAudit() => EndpointSetup<DefaultTestServer>();

class FailedMessageHander : IHandleMessages<FaultyMessage>
{
public FailedMessageHander(Context context) => testContext = context;

public Task Handle(FaultyMessage message, IMessageHandlerContext context)
{
testContext.MessageAudited = true;

return Task.CompletedTask;
}

readonly Context testContext;
}
}

public class Context : ScenarioContext
{
public bool SubscriberSubscribed { get; set; }
public bool MessageFailed { get; set; }
public IReadOnlyDictionary<string, string> ReceivedMessageHeaders { get; set; }
public IReadOnlyDictionary<string, string> FailedMessageHeaders { get; set; }
public IReadOnlyDictionary<string, string> AuditMessageHeaders { get; set; }
public bool RetryDelivered { get; set; }
public bool GotRetrySuccessfullAck { get; set; }
public bool MessageAudited { get; set; }
}

public class FaultyMessage : IMessage
Expand Down
13 changes: 7 additions & 6 deletions src/NServiceBus.MessagingBridge/EndpointRegistry.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System;
using System;
using System.Collections.Generic;
using System.Linq;
using NServiceBus;
Expand Down Expand Up @@ -60,15 +60,16 @@ public TargetEndpointDispatcher GetTargetEndpointDispatcher(string sourceEndpoin
throw new Exception($"No target endpoint dispatcher could be found for endpoint: {sourceEndpointName}. Ensure names have correct casing as mappings are case-sensitive. Nearest configured match: {nearestMatch}");
}

public bool TryTranslateToTargetAddress(string sourceAddress, out string bestMatch)
public string TranslateToTargetAddress(string sourceAddress)
{
if (targetEndpointAddressMappings.TryGetValue(sourceAddress, out bestMatch))
if (targetEndpointAddressMappings.TryGetValue(sourceAddress, out var targetAddress))
{
return true;
return targetAddress;
}

bestMatch = GetClosestMatchForExceptionMessage(sourceAddress, targetEndpointAddressMappings.Keys);
return false;
var nearestMatch = GetClosestMatchForExceptionMessage(sourceAddress, targetEndpointAddressMappings.Keys);

throw new Exception($"No target address mapping could be found for source address: {sourceAddress}. Ensure names have correct casing as mappings are case-sensitive. Nearest configured match: {nearestMatch}");
}

public string GetEndpointAddress(string endpointName)
Expand Down
2 changes: 1 addition & 1 deletion src/NServiceBus.MessagingBridge/IEndpointRegistry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
{
TargetEndpointDispatcher GetTargetEndpointDispatcher(string sourceEndpointName);

bool TryTranslateToTargetAddress(string sourceAddress, out string bestMatch);
string TranslateToTargetAddress(string sourceAddress);

string GetEndpointAddress(string endpointName);
}
38 changes: 11 additions & 27 deletions src/NServiceBus.MessagingBridge/MessageShovel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,8 @@ public async Task TransferMessage(TransferContext transferContext, CancellationT
//This is a failed message forwarded to ServiceControl. We need to transform the FailedQ header so that ServiceControl returns the message
//to the correct queue/transport on the other side

//We _do not_ transform the ReplyToAddress header
TransformAddressHeader(messageToSend, targetEndpointRegistry, FaultsHeaderKeys.FailedQ);

//Try to translate the ReplyToAddress, this is needed when e.g.:
// 1. An endpoint is migrated to the ServiceControl side before this messages is retried
// 2. An endpoint has physical instances on both sides (migration phase) and when retried
// this message can be processed either on one or the other side of the bridge
TransformAddressHeader(messageToSend, targetEndpointRegistry, Headers.ReplyToAddress, throwOnError: false);
}
else if (IsAuditMessage(messageToSend))
{
Expand All @@ -61,11 +56,10 @@ public async Task TransferMessage(TransferContext transferContext, CancellationT
}
else if (IsRetryMessage(messageToSend))
{
//This is a message retried from ServiceControl. Its ReplyToAddress header has been preserved (as stated above) so we don't need to transform it back

//Transform the retry ack queue address
TransformAddressHeader(messageToSend, targetEndpointRegistry, "ServiceControl.Retry.AcknowledgementQueue");

//This is a message retried from ServiceControl. We try to translate its ReplyToAddress.
TransformAddressHeader(messageToSend, targetEndpointRegistry, Headers.ReplyToAddress, throwOnError: false);
}
else
{
Expand Down Expand Up @@ -102,7 +96,7 @@ await targetEndpointDispatcher.Dispatch(

static bool IsRetryMessage(OutgoingMessage messageToSend) => messageToSend.Headers.ContainsKey("ServiceControl.Retry.UniqueMessageId");

void TransformRegularMessageReplyToAddress(
static void TransformRegularMessageReplyToAddress(
TransferContext transferContext,
OutgoingMessage messageToSend,
IEndpointRegistry targetEndpointRegistry)
Expand All @@ -120,33 +114,23 @@ void TransformRegularMessageReplyToAddress(
}
else
{
TransformAddressHeader(messageToSend, targetEndpointRegistry, Headers.ReplyToAddress);
messageToSend.Headers[Headers.ReplyToAddress] = targetEndpointRegistry.TranslateToTargetAddress(headerValue);
}
}

void TransformAddressHeader(
static void TransformAddressHeader(
OutgoingMessage messageToSend,
IEndpointRegistry endpointRegistry,
string addressHeaderKey,
bool throwOnError = true)
string headerKey)
{
if (!messageToSend.Headers.TryGetValue(addressHeaderKey, out var sourceAddress))
if (!messageToSend.Headers.TryGetValue(headerKey, out var headerValue))
{
return;
}

if (endpointRegistry.TryTranslateToTargetAddress(sourceAddress, out string bestMatch))
{
messageToSend.Headers[addressHeaderKey] = bestMatch;
}
else if (throwOnError == false)
{
logger.LogWarning($"Could not translate {sourceAddress} address. Consider using `.HasEndpoint()` method to add missing endpoint declaration.");
}
else
{
throw new Exception($"No target address mapping could be found for source address: {sourceAddress}. Ensure names have correct casing as mappings are case-sensitive. Nearest configured match: {bestMatch}");
}
var targetSpecificReplyToAddress = endpointRegistry.TranslateToTargetAddress(headerValue);

messageToSend.Headers[headerKey] = targetSpecificReplyToAddress;
}

readonly ILogger<MessageShovel> logger;
Expand Down
7 changes: 2 additions & 5 deletions src/UnitTests/MessageShovelTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -194,12 +194,9 @@ public TargetEndpointDispatcher GetTargetEndpointDispatcher(string sourceEndpoin
return new TargetEndpointDispatcher(targetTransport, rawEndpoint, targetEndpoint.QueueAddress.ToString());
}

public bool TryTranslateToTargetAddress(string sourceAddress, out string bestMatch)
public string TranslateToTargetAddress(string sourceAddress)
{
var result = sourceAddress.Split('@').First();

bestMatch = result;
return true;
return sourceAddress.Split('@').First();
}

public string GetEndpointAddress(string endpointName) => throw new NotImplementedException();
Expand Down

0 comments on commit bd5f72d

Please sign in to comment.