Skip to content

Commit

Permalink
Provide ability to return messages forwarded to Bridge error queue ba…
Browse files Browse the repository at this point in the history
…ck to originating Bridge queue - 2.0 branch (#275)

Co-authored-by: Ramon Smits <ramon.smits@gmail.com>
  • Loading branch information
dvdstelt and ramonsmits authored Jun 29, 2023
1 parent 843308a commit a57b5f2
Show file tree
Hide file tree
Showing 12 changed files with 156 additions and 8 deletions.
8 changes: 8 additions & 0 deletions src/AcceptanceTesting/AcceptanceTesting.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

<PropertyGroup>
<TargetFrameworks>net472;net6.0;net7.0</TargetFrameworks>
<SignAssembly>true</SignAssembly>
<AssemblyOriginatorKeyFile>..\NServiceBusTests.snk</AssemblyOriginatorKeyFile>
</PropertyGroup>

<ItemGroup>
Expand All @@ -13,4 +15,10 @@
<ProjectReference Include="..\NServiceBus.MessagingBridge\NServiceBus.MessagingBridge.csproj" />
</ItemGroup>

<ItemGroup>
<None Include="..\NServiceBusTests.snk">
<Link>NServiceBusTests.snk</Link>
</None>
</ItemGroup>

</Project>
6 changes: 6 additions & 0 deletions src/AcceptanceTesting/BridgeComponent.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using AcceptanceTesting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using NServiceBus;
Expand Down Expand Up @@ -40,6 +42,10 @@ public override async Task Start(CancellationToken cancellationToken = default)
.ConfigureServices((_, serviceCollection) =>
{
serviceCollection.AddSingleton(loggerFactory);
serviceCollection.RemoveAll(typeof(IMessageShovel));
serviceCollection.AddTransient<MessageShovel>();
serviceCollection.AddTransient<FakeShovel>();
serviceCollection.AddTransient<IMessageShovel, FakeShovel>();
});

host = await hostBuilder.StartAsync(cancellationToken).ConfigureAwait(false);
Expand Down
33 changes: 33 additions & 0 deletions src/AcceptanceTesting/FakeShovel.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
namespace AcceptanceTesting
{
using System;
using System.Threading;
using System.Threading.Tasks;

public class FakeShovelHeader
{
public const string FailureHeader = "FakeShovelFailure";
}

class FakeShovel : IMessageShovel
{

readonly IMessageShovel messageShovel;

public FakeShovel(MessageShovel shovel)
{
messageShovel = shovel;
}

public Task TransferMessage(TransferContext transferContext, CancellationToken cancellationToken = default)
{
var messageContext = transferContext.MessageToTransfer;
if (messageContext.Headers.ContainsKey(FakeShovelHeader.FailureHeader))
{
throw new Exception("Incoming message has `FakeShovelFailure` header to test infrastructure failures");
}

return messageShovel.TransferMessage(transferContext, cancellationToken);
}
}
}
89 changes: 89 additions & 0 deletions src/AcceptanceTests/Shared/TransferFailureTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Threading.Tasks;
using AcceptanceTesting;
using NServiceBus;
using NServiceBus.AcceptanceTesting;
using NUnit.Framework;

public class TransferFailureTests : BridgeAcceptanceTest
{
const string ReceiveDummyQueue = "TransferFailureTests_DummyQueue"; // Required because Bridge needs endpoints on both sides.
const string ErrorQueue = "TransferFailureTests_BridgeErrorQueue";
const string FailedQHeader = "NServiceBus.MessagingBridge.FailedQ";

[Test]
public async Task Should_add_failedq_header_when_transfer_fails()
{
var ctx = await Scenario.Define<Context>()
.WithEndpoint<ErrorSpy>()
.WithEndpoint<Sender>(b => b
.When(c => c.EndpointsStarted, async (session, c) =>
{
var opts = new SendOptions();
opts.SetHeader(FakeShovelHeader.FailureHeader, string.Empty);
await session.Send(new FaultyMessage(), opts);
}))
.WithBridge(bridgeConfiguration =>
{
var bridgeTransport = new TestableBridgeTransport(TransportBeingTested);
bridgeTransport.AddTestEndpoint<Sender>();
bridgeConfiguration.AddTransport(bridgeTransport);
bridgeTransport.ErrorQueue = ErrorQueue;

var subscriberEndpoint = new BridgeEndpoint(ReceiveDummyQueue);
bridgeConfiguration.AddTestTransportEndpoint(subscriberEndpoint);
})
.Done(c => c.MessageFailed)
.Run();

Assert.IsTrue(ctx.MessageFailed, "Message did not fail");
Assert.IsTrue(ctx.FailedMessageHeaders.ContainsKey(FailedQHeader),
$"Failed message headers does not contain {FailedQHeader}");
}

public class Sender : EndpointConfigurationBuilder
{
public Sender() =>
EndpointSetup<DefaultServer>(c =>
{
c.ConfigureRouting().RouteToEndpoint(typeof(FaultyMessage), ReceiveDummyQueue);
});
}

public class ErrorSpy : EndpointConfigurationBuilder
{
public ErrorSpy()
{
EndpointSetup<DefaultServer>(c =>
{
c.OverrideLocalAddress(ErrorQueue);
});
}

class FailedMessageHander : IHandleMessages<FaultyMessage>
{
public FailedMessageHander(Context context) => testContext = context;

public Task Handle(FaultyMessage message, IMessageHandlerContext context)
{
testContext.FailedMessageHeaders =
new ReadOnlyDictionary<string, string>((IDictionary<string, string>)context.MessageHeaders);
testContext.MessageFailed = true;
return Task.CompletedTask;
}

readonly Context testContext;
}
}

public class Context : ScenarioContext
{
public bool MessageFailed { get; set; }
public IReadOnlyDictionary<string, string> FailedMessageHeaders { get; set; }
}

public class FaultyMessage : ICommand
{
}
}
5 changes: 3 additions & 2 deletions src/NServiceBus.MessagingBridge/BridgeHeaders.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
class BridgeHeaders
{
public static string Transfer = "NServiceBus.Bridge.Transfer";
}
public const string Transfer = "NServiceBus.Bridge.Transfer";
public const string FailedQ = "NServiceBus.MessagingBridge.FailedQ";
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public static IHostBuilder UseNServiceBusBridge(
serviceCollection.AddSingleton<SubscriptionManager>();
serviceCollection.AddSingleton<EndpointRegistry>();
serviceCollection.AddSingleton<IEndpointRegistry>(sp => sp.GetRequiredService<EndpointRegistry>());
serviceCollection.AddTransient<MessageShovel>();
serviceCollection.AddTransient<IMessageShovel, MessageShovel>();
});

return hostBuilder;
Expand Down
4 changes: 2 additions & 2 deletions src/NServiceBus.MessagingBridge/EndpointProxyFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ public Task<IStartableRawEndpoint> CreateProxy(
messageContext,
shouldPassTransportTransaction);

return serviceProvider.GetRequiredService<MessageShovel>()
.TransferMessage(transferContext, ct);
return serviceProvider.GetRequiredService<IMessageShovel>()
.TransferMessage(transferContext, cancellationToken: ct);
},
translatedErrorQueue);

Expand Down
7 changes: 7 additions & 0 deletions src/NServiceBus.MessagingBridge/IMessageShovel.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
using System.Threading;
using System.Threading.Tasks;

interface IMessageShovel
{
Task TransferMessage(TransferContext transferContext, CancellationToken cancellationToken = default);
}
5 changes: 3 additions & 2 deletions src/NServiceBus.MessagingBridge/MessageShovel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
using NServiceBus.Faults;
using NServiceBus.Transport;

class MessageShovel
sealed class MessageShovel : IMessageShovel
{
public MessageShovel(
ILogger<MessageShovel> logger,
Expand All @@ -26,6 +26,7 @@ public async Task TransferMessage(TransferContext transferContext, CancellationT
var messageContext = transferContext.MessageToTransfer;

var messageToSend = new OutgoingMessage(messageContext.NativeMessageId, messageContext.Headers, messageContext.Body);
messageToSend.Headers.Remove(BridgeHeaders.FailedQ);

var transferDetails = $"{transferContext.SourceTransport}->{targetEndpointDispatcher.TransportName}";

Expand Down Expand Up @@ -98,4 +99,4 @@ void TransformAddressHeader(

readonly ILogger<MessageShovel> logger;
readonly IEndpointRegistry targetEndpointRegistry;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@

<ItemGroup>
<InternalsVisibleTo Include="UnitTests" Key="$(NServiceBusTestsKey)" />
<InternalsVisibleTo Include="AcceptanceTesting" Key="$(NServiceBusTestsKey)" />
</ItemGroup>
</Project>
</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ async Task<ErrorHandleResult> MoveToErrorQueue(ErrorContext errorContext, string
headers.Remove(Headers.DelayedRetries);
headers.Remove(Headers.ImmediateRetries);

headers.Add(BridgeHeaders.FailedQ, localAddress);
ExceptionHeaderHelper.SetExceptionHeaders(headers, errorContext.Exception);

var transportOperations = new TransportOperations(new TransportOperation(outgoingMessage, new UnicastAddressTag(errorQueue)));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
[assembly: System.Runtime.CompilerServices.InternalsVisibleTo(@"AcceptanceTesting, PublicKey=00240000048000009400000006020000002400005253413100040000010001007f16e21368ff041183fab592d9e8ed37e7be355e93323147a1d29983d6e591b04282e4da0c9e18bd901e112c0033925eb7d7872c2f1706655891c5c9d57297994f707d16ee9a8f40d978f064ee1ffc73c0db3f4712691b23bf596f75130f4ec978cf78757ec034625a5f27e6bb50c618931ea49f6f628fd74271c32959efb1c5")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleTo(@"UnitTests, PublicKey=00240000048000009400000006020000002400005253413100040000010001007f16e21368ff041183fab592d9e8ed37e7be355e93323147a1d29983d6e591b04282e4da0c9e18bd901e112c0033925eb7d7872c2f1706655891c5c9d57297994f707d16ee9a8f40d978f064ee1ffc73c0db3f4712691b23bf596f75130f4ec978cf78757ec034625a5f27e6bb50c618931ea49f6f628fd74271c32959efb1c5")]
namespace NServiceBus
{
Expand Down

0 comments on commit a57b5f2

Please sign in to comment.