Skip to content

Commit

Permalink
Fixed MassTransit#4286 - address not including alternative exchange h…
Browse files Browse the repository at this point in the history
…eader
  • Loading branch information
phatboyg committed Apr 11, 2023
1 parent 1aae9cb commit 01ad7f4
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ public void SetExchangeArgument(string key, TimeSpan value)

public virtual RabbitMqEndpointAddress GetEndpointAddress(Uri hostAddress)
{
return new RabbitMqEndpointAddress(hostAddress, ExchangeName, ExchangeType, Durable, AutoDelete);
return new RabbitMqEndpointAddress(hostAddress, ExchangeName, ExchangeType, Durable, AutoDelete,
delayedType: ExchangeArguments.TryGetValue("x-delayed-type", out var argument) ? (string)argument : default,
alternateExchange: ExchangeArguments.TryGetValue(RabbitMQ.Client.Headers.AlternateExchange, out argument) ? (string)argument : default);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public RabbitMqSendSettings(RabbitMqEndpointAddress address)
}

if (!string.IsNullOrWhiteSpace(address.AlternateExchange))
SetExchangeArgument("alternate-exchange", address.AlternateExchange);
SetExchangeArgument(Headers.AlternateExchange, address.AlternateExchange);

if (address.SingleActiveConsumer)
SetQueueArgument(Headers.XSingleActiveConsumer, true);
Expand All @@ -46,8 +46,9 @@ public RabbitMqSendSettings(RabbitMqEndpointAddress address)
public RabbitMqEndpointAddress GetSendAddress(Uri hostAddress)
{
return new RabbitMqEndpointAddress(hostAddress, ExchangeName, ExchangeType, Durable, AutoDelete, _bindToQueue, _queueName,
ExchangeArguments.ContainsKey("x-delayed-type") ? (string)ExchangeArguments["x-delayed-type"] : default,
_exchangeBindings.Count > 0 ? _exchangeBindings.Select(x => x.ExchangeName).ToArray() : default);
ExchangeArguments.TryGetValue("x-delayed-type", out var argument) ? (string)argument : default,
_exchangeBindings.Count > 0 ? _exchangeBindings.Select(x => x.ExchangeName).ToArray() : default,
alternateExchange: ExchangeArguments.TryGetValue(Headers.AlternateExchange, out argument) ? (string)argument : default);
}

public BrokerTopology GetBrokerTopology()
Expand Down
104 changes: 104 additions & 0 deletions tests/MassTransit.RabbitMqTransport.Tests/AlternateExchange_Specs.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,102 @@
namespace MassTransit.RabbitMqTransport.Tests
{
using System;
using System.Threading.Tasks;
using MassTransit.Testing;
using Microsoft.Extensions.DependencyInjection;
using NUnit.Framework;
using RabbitMQ.Client;
using TestFramework.Messages;


public class Using_alternate_exchange_with_the_outbox
{
[Test]
public async Task Should_deal_with_the_alternate_exchange()
{
const string alternateExchangeName = "unused-message";
const string alternateQueueName = "unused-message-queue";

await using var provider = new ServiceCollection()
.ConfigureRabbitMqTestOptions(options =>
{
options.CleanVirtualHost = true;
options.CreateVirtualHostIfNotExists = true;
})
.AddMassTransitTestHarness(x =>
{
x.AddOptions<RabbitMqTransportOptions>()
.Configure(options => options.VHost = "test");

x.AddInMemoryInboxOutbox();

x.AddHandler((UnusedMessage _) => Task.CompletedTask)
.Endpoint(e =>
{
e.ConfigureConsumeTopology = false;
e.Name = alternateQueueName;
});

x.AddConsumer<MessageHandler>()
.Endpoint(e => e.Name = "outbox-normal");

x.SetTestTimeouts(testInactivityTimeout: TimeSpan.FromSeconds(10));

x.AddConfigureEndpointsCallback((provider, name, cfg) =>
{
if (cfg is IRabbitMqReceiveEndpointConfigurator rmq)
{
if(name == alternateQueueName)
{
rmq.Bind(alternateExchangeName);
}
}

cfg.UseInMemoryInboxOutbox(provider);
});

x.UsingRabbitMq((context, cfg) =>
{
cfg.PublishTopology.GetMessageTopology<UnusedMessage>()
.BindAlternateExchangeQueue(alternateExchangeName);

cfg.DeployPublishTopology = true;

cfg.ConfigureEndpoints(context);
});
}).BuildServiceProvider();

var harness = provider.GetTestHarness();

await harness.Start();

await harness.Bus.Publish(new PingMessage(), Pipe.Execute<SendContext>(context =>
{
}), harness.CancellationToken);

IReceivedMessage<UnusedMessage> message = await harness.Consumed.SelectAsync<UnusedMessage>().FirstOrDefault();

Assert.That(message, Is.Not.Null);

// Assert.That(message.Context.TryGetPayload<RabbitMqBasicConsumeContext>(out var rmqContext), Is.True);
// Assert.That(rmqContext.Properties.Headers.TryGetValue(), Is.EqualTo(3));
}


class MessageHandler :
IConsumer<PingMessage>
{
public Task Consume(ConsumeContext<PingMessage> context)
{
return context.Publish(new UnusedMessage());
}
}


class UnusedMessage
{
}
}


[TestFixture]
Expand All @@ -17,6 +111,16 @@ public async Task Should_create_and_bind_the_exchange_and_properties()
await _handled;
}

[Test]
public async Task Should_have_the_proper_address()
{
Assert.That(Bus.Topology.TryGetPublishAddress<TheWorldImploded>(out var address));

Assert.That(address,
Is.EqualTo(new Uri(
"rabbitmq://localhost/test/MassTransit.RabbitMqTransport.Tests:AlternateExchange_Specs-TheWorldImploded?alternateexchange=publish-not-delivered")));
}

Task<ConsumeContext<TheWorldImploded>> _handled;

const string AlternateExchangeName = "publish-not-delivered";
Expand Down

0 comments on commit 01ad7f4

Please sign in to comment.