diff --git a/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Configuration/RabbitMqExchangeConfigurator.cs b/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Configuration/RabbitMqExchangeConfigurator.cs index 3126d6849e1..16efdafab64 100644 --- a/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Configuration/RabbitMqExchangeConfigurator.cs +++ b/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Configuration/RabbitMqExchangeConfigurator.cs @@ -53,7 +53,9 @@ public void SetExchangeArgument(string key, TimeSpan value) public virtual RabbitMqEndpointAddress GetEndpointAddress(Uri hostAddress) { - return new RabbitMqEndpointAddress(hostAddress, ExchangeName, ExchangeType, Durable, AutoDelete); + return new RabbitMqEndpointAddress(hostAddress, ExchangeName, ExchangeType, Durable, AutoDelete, + delayedType: ExchangeArguments.TryGetValue("x-delayed-type", out var argument) ? (string)argument : default, + alternateExchange: ExchangeArguments.TryGetValue(RabbitMQ.Client.Headers.AlternateExchange, out argument) ? (string)argument : default); } } } diff --git a/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Topology/RabbitMqSendSettings.cs b/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Topology/RabbitMqSendSettings.cs index cf60b7da707..5fcc965f56e 100644 --- a/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Topology/RabbitMqSendSettings.cs +++ b/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Topology/RabbitMqSendSettings.cs @@ -35,7 +35,7 @@ public RabbitMqSendSettings(RabbitMqEndpointAddress address) } if (!string.IsNullOrWhiteSpace(address.AlternateExchange)) - SetExchangeArgument("alternate-exchange", address.AlternateExchange); + SetExchangeArgument(Headers.AlternateExchange, address.AlternateExchange); if (address.SingleActiveConsumer) SetQueueArgument(Headers.XSingleActiveConsumer, true); @@ -46,8 +46,9 @@ public RabbitMqSendSettings(RabbitMqEndpointAddress address) public RabbitMqEndpointAddress GetSendAddress(Uri hostAddress) { return new RabbitMqEndpointAddress(hostAddress, ExchangeName, ExchangeType, Durable, AutoDelete, _bindToQueue, _queueName, - ExchangeArguments.ContainsKey("x-delayed-type") ? (string)ExchangeArguments["x-delayed-type"] : default, - _exchangeBindings.Count > 0 ? _exchangeBindings.Select(x => x.ExchangeName).ToArray() : default); + ExchangeArguments.TryGetValue("x-delayed-type", out var argument) ? (string)argument : default, + _exchangeBindings.Count > 0 ? _exchangeBindings.Select(x => x.ExchangeName).ToArray() : default, + alternateExchange: ExchangeArguments.TryGetValue(Headers.AlternateExchange, out argument) ? (string)argument : default); } public BrokerTopology GetBrokerTopology() diff --git a/tests/MassTransit.RabbitMqTransport.Tests/AlternateExchange_Specs.cs b/tests/MassTransit.RabbitMqTransport.Tests/AlternateExchange_Specs.cs index dcdb7b7aeec..9b6d545038f 100644 --- a/tests/MassTransit.RabbitMqTransport.Tests/AlternateExchange_Specs.cs +++ b/tests/MassTransit.RabbitMqTransport.Tests/AlternateExchange_Specs.cs @@ -1,8 +1,102 @@ namespace MassTransit.RabbitMqTransport.Tests { + using System; using System.Threading.Tasks; + using MassTransit.Testing; + using Microsoft.Extensions.DependencyInjection; using NUnit.Framework; using RabbitMQ.Client; + using TestFramework.Messages; + + + public class Using_alternate_exchange_with_the_outbox + { + [Test] + public async Task Should_deal_with_the_alternate_exchange() + { + const string alternateExchangeName = "unused-message"; + const string alternateQueueName = "unused-message-queue"; + + await using var provider = new ServiceCollection() + .ConfigureRabbitMqTestOptions(options => + { + options.CleanVirtualHost = true; + options.CreateVirtualHostIfNotExists = true; + }) + .AddMassTransitTestHarness(x => + { + x.AddOptions() + .Configure(options => options.VHost = "test"); + + x.AddInMemoryInboxOutbox(); + + x.AddHandler((UnusedMessage _) => Task.CompletedTask) + .Endpoint(e => + { + e.ConfigureConsumeTopology = false; + e.Name = alternateQueueName; + }); + + x.AddConsumer() + .Endpoint(e => e.Name = "outbox-normal"); + + x.SetTestTimeouts(testInactivityTimeout: TimeSpan.FromSeconds(10)); + + x.AddConfigureEndpointsCallback((provider, name, cfg) => + { + if (cfg is IRabbitMqReceiveEndpointConfigurator rmq) + { + if(name == alternateQueueName) + { + rmq.Bind(alternateExchangeName); + } + } + + cfg.UseInMemoryInboxOutbox(provider); + }); + + x.UsingRabbitMq((context, cfg) => + { + cfg.PublishTopology.GetMessageTopology() + .BindAlternateExchangeQueue(alternateExchangeName); + + cfg.DeployPublishTopology = true; + + cfg.ConfigureEndpoints(context); + }); + }).BuildServiceProvider(); + + var harness = provider.GetTestHarness(); + + await harness.Start(); + + await harness.Bus.Publish(new PingMessage(), Pipe.Execute(context => + { + }), harness.CancellationToken); + + IReceivedMessage message = await harness.Consumed.SelectAsync().FirstOrDefault(); + + Assert.That(message, Is.Not.Null); + + // Assert.That(message.Context.TryGetPayload(out var rmqContext), Is.True); + // Assert.That(rmqContext.Properties.Headers.TryGetValue(), Is.EqualTo(3)); + } + + + class MessageHandler : + IConsumer + { + public Task Consume(ConsumeContext context) + { + return context.Publish(new UnusedMessage()); + } + } + + + class UnusedMessage + { + } + } [TestFixture] @@ -17,6 +111,16 @@ public async Task Should_create_and_bind_the_exchange_and_properties() await _handled; } + [Test] + public async Task Should_have_the_proper_address() + { + Assert.That(Bus.Topology.TryGetPublishAddress(out var address)); + + Assert.That(address, + Is.EqualTo(new Uri( + "rabbitmq://localhost/test/MassTransit.RabbitMqTransport.Tests:AlternateExchange_Specs-TheWorldImploded?alternateexchange=publish-not-delivered"))); + } + Task> _handled; const string AlternateExchangeName = "publish-not-delivered";