From c2f5ac200817a9853882cfa9baf0670d2342a0a4 Mon Sep 17 00:00:00 2001 From: Havret Date: Fri, 1 May 2020 15:29:06 +0200 Subject: [PATCH] Message priority support --- README.md | 30 ++++++- src/ActiveMQ.Net/AnonymousProducer.cs | 2 +- .../AnonymousProducerConfiguration.cs | 7 ++ .../AutoRecoveringAnonymousProducer.cs | 6 +- .../AutoRecoveringConnection.cs | 4 +- .../Builders/AnonymousProducerBuilder.cs | 6 +- src/ActiveMQ.Net/Connection.cs | 4 +- src/ActiveMQ.Net/ConnectionExtensions.cs | 6 ++ src/ActiveMQ.Net/Header.cs | 24 ++++++ .../IBaseProducerConfiguration.cs | 7 ++ src/ActiveMQ.Net/IConnection.cs | 2 +- src/ActiveMQ.Net/Message.cs | 19 ++++- src/ActiveMQ.Net/Producer.cs | 2 +- src/ActiveMQ.Net/ProducerBase.cs | 7 +- src/ActiveMQ.Net/ProducerConfiguration.cs | 3 +- src/ActiveMQ.Net/Properties.cs | 7 +- .../AnonymousMessageProducerSpec.cs | 20 ++--- .../MessagePrioritySpec.cs | 70 ++++++++++++++++ .../MessageRoutingStrategiesSpec.cs | 20 ++--- .../ActiveMQ.Net.UnitTests/ActiveMQNetSpec.cs | 2 +- .../CreateAnonymousProducerSpec.cs | 24 ++++++ .../CreateProducerSpec.cs | 10 +++ .../MessagePrioritySpec.cs | 84 +++++++++++++++++++ 23 files changed, 323 insertions(+), 43 deletions(-) create mode 100644 src/ActiveMQ.Net/AnonymousProducerConfiguration.cs create mode 100644 src/ActiveMQ.Net/Header.cs create mode 100644 src/ActiveMQ.Net/IBaseProducerConfiguration.cs create mode 100644 test/ActiveMQ.Net.IntegrationTests/MessagePrioritySpec.cs create mode 100644 test/ActiveMQ.Net.UnitTests/CreateAnonymousProducerSpec.cs create mode 100644 test/ActiveMQ.Net.UnitTests/MessagePrioritySpec.cs diff --git a/README.md b/README.md index f7273155..f7c0ad13 100644 --- a/README.md +++ b/README.md @@ -107,9 +107,9 @@ var message = await consumer.ReceiveAsync(cts.Token); This may be particularly useful when you want to shut down your application. -### Messages +### Message payload -ActiveMQ.Net uses `Message` class to represent messages which may be sent and received. A `Message` can carry various types of payload and accompanying metadata. +ActiveMQ.Net uses `Message` class to represent messages which may be transmitted. A `Message` can carry various types of payload and accompanying metadata. A new message can be created as follows: @@ -145,6 +145,32 @@ var body = message.GetBody(); If `T` matches the type of the payload, the value will be returned, otherwise, you will get `default(T)`. +### Message properties + +#### Priority + +This property defines the level of importance of a message. ActiveMQ Artemis uses it to prioritize message delivery. Messages with higher priority will be delivered before messages with lower priority. Messages with the same priority level should be delivered according to the order they were sent with. There are 10 levels of message priority, ranging from 0 (the lowest) to 9 (the highest). If no message priority is set on the client (Priority set to `null`), the message will be treated as if it was assigned a normal priority (4). + +Default message priority can be overridden on message producer level: + +```csharp +var producer = await connection.CreateProducerAsync(new ProducerConfiguration +{ + Address = "a1", + RoutingType = AddressRoutingType.Anycast, + MessagePriority = 9 +}); +``` + +Each message sent with this producer will automatically have priority set to `9` unless specified otherwise. The priority set explicitly on the message object takes the highest precedence. + +```csharp +await producer.SendAsync(new Message("foo") +{ + Priority = 0 // takes precedence over priority specified on producer level +}); +``` + ### Resources lifespan Connections, producers, and consumers are meant to be long-lived objects. The underlying protocol is designed and optimized for long-running connections. That means that opening a new connection per operation, e.g. sending a message, is unnecessary and strongly discouraged as it will introduce a lot of network round trips and overhead. The same rule applies to all ActiveMQ. Net resources. diff --git a/src/ActiveMQ.Net/AnonymousProducer.cs b/src/ActiveMQ.Net/AnonymousProducer.cs index 992e3db7..0687a0ad 100644 --- a/src/ActiveMQ.Net/AnonymousProducer.cs +++ b/src/ActiveMQ.Net/AnonymousProducer.cs @@ -8,7 +8,7 @@ namespace ActiveMQ.Net { internal class AnonymousProducer : ProducerBase, IAnonymousProducer { - public AnonymousProducer(ILoggerFactory loggerFactory, SenderLink senderLink) : base(loggerFactory, senderLink) + public AnonymousProducer(ILoggerFactory loggerFactory, SenderLink senderLink, AnonymousProducerConfiguration configuration) : base(loggerFactory, senderLink, configuration) { } diff --git a/src/ActiveMQ.Net/AnonymousProducerConfiguration.cs b/src/ActiveMQ.Net/AnonymousProducerConfiguration.cs new file mode 100644 index 00000000..a39e8ce2 --- /dev/null +++ b/src/ActiveMQ.Net/AnonymousProducerConfiguration.cs @@ -0,0 +1,7 @@ +namespace ActiveMQ.Net +{ + public class AnonymousProducerConfiguration : IBaseProducerConfiguration + { + public byte? MessagePriority { get; set; } + } +} \ No newline at end of file diff --git a/src/ActiveMQ.Net/AutoRecovering/AutoRecoveringAnonymousProducer.cs b/src/ActiveMQ.Net/AutoRecovering/AutoRecoveringAnonymousProducer.cs index 2c13c420..1829baf8 100644 --- a/src/ActiveMQ.Net/AutoRecovering/AutoRecoveringAnonymousProducer.cs +++ b/src/ActiveMQ.Net/AutoRecovering/AutoRecoveringAnonymousProducer.cs @@ -7,10 +7,12 @@ namespace ActiveMQ.Net.AutoRecovering { internal class AutoRecoveringAnonymousProducer : AutoRecoveringProducerBase, IAnonymousProducer { + private readonly AnonymousProducerConfiguration _configuration; private IAnonymousProducer _producer; - public AutoRecoveringAnonymousProducer(ILoggerFactory loggerFactory) : base(loggerFactory) + public AutoRecoveringAnonymousProducer(ILoggerFactory loggerFactory, AnonymousProducerConfiguration configuration) : base(loggerFactory) { + _configuration = configuration; } public async Task SendAsync(string address, AddressRoutingType routingType, Message message, CancellationToken cancellationToken = default) @@ -51,7 +53,7 @@ public void Send(string address, AddressRoutingType routingType, Message message protected override async Task RecoverUnderlyingProducer(IConnection connection, CancellationToken cancellationToken) { - _producer = await connection.CreateAnonymousProducer(cancellationToken).ConfigureAwait(false); + _producer = await connection.CreateAnonymousProducer(_configuration, cancellationToken).ConfigureAwait(false); } protected override ValueTask DisposeUnderlyingProducer() diff --git a/src/ActiveMQ.Net/AutoRecovering/AutoRecoveringConnection.cs b/src/ActiveMQ.Net/AutoRecovering/AutoRecoveringConnection.cs index c00e189c..9c143e92 100644 --- a/src/ActiveMQ.Net/AutoRecovering/AutoRecoveringConnection.cs +++ b/src/ActiveMQ.Net/AutoRecovering/AutoRecoveringConnection.cs @@ -167,9 +167,9 @@ public async Task CreateProducerAsync(ProducerConfiguration configura return autoRecoveringProducer; } - public async Task CreateAnonymousProducer(CancellationToken cancellationToken) + public async Task CreateAnonymousProducer(AnonymousProducerConfiguration configuration, CancellationToken cancellationToken = default) { - var autoRecoveringAnonymousProducer = new AutoRecoveringAnonymousProducer(_loggerFactory); + var autoRecoveringAnonymousProducer = new AutoRecoveringAnonymousProducer(_loggerFactory, configuration); await PrepareRecoverable(autoRecoveringAnonymousProducer, cancellationToken); return autoRecoveringAnonymousProducer; } diff --git a/src/ActiveMQ.Net/Builders/AnonymousProducerBuilder.cs b/src/ActiveMQ.Net/Builders/AnonymousProducerBuilder.cs index a737790e..4d0844a7 100644 --- a/src/ActiveMQ.Net/Builders/AnonymousProducerBuilder.cs +++ b/src/ActiveMQ.Net/Builders/AnonymousProducerBuilder.cs @@ -21,8 +21,10 @@ public AnonymousProducerBuilder(ILoggerFactory loggerFactory, Session session) _tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); } - public async Task CreateAsync(CancellationToken cancellationToken) + public async Task CreateAsync(AnonymousProducerConfiguration configuration, CancellationToken cancellationToken) { + if (configuration == null) throw new ArgumentNullException(nameof(configuration)); + cancellationToken.ThrowIfCancellationRequested(); cancellationToken.Register(() => _tcs.TrySetCanceled()); @@ -33,7 +35,7 @@ public async Task CreateAsync(CancellationToken cancellation var senderLink = new SenderLink(_session, Guid.NewGuid().ToString(), target, OnAttached); senderLink.AddClosedCallback(OnClosed); await _tcs.Task.ConfigureAwait(false); - var producer = new AnonymousProducer(_loggerFactory, senderLink); + var producer = new AnonymousProducer(_loggerFactory, senderLink, configuration); senderLink.Closed -= OnClosed; return producer; } diff --git a/src/ActiveMQ.Net/Connection.cs b/src/ActiveMQ.Net/Connection.cs index 75d49bb1..aec7e175 100644 --- a/src/ActiveMQ.Net/Connection.cs +++ b/src/ActiveMQ.Net/Connection.cs @@ -40,11 +40,11 @@ public async Task CreateProducerAsync(ProducerConfiguration configura return await producerBuilder.CreateAsync(configuration, cancellationToken).ConfigureAwait(false); } - public async Task CreateAnonymousProducer(CancellationToken cancellationToken) + public async Task CreateAnonymousProducer(AnonymousProducerConfiguration configuration, CancellationToken cancellationToken) { var session = await CreateSession(cancellationToken).ConfigureAwait(false); var producerBuilder = new AnonymousProducerBuilder(_loggerFactory, session); - return await producerBuilder.CreateAsync(cancellationToken).ConfigureAwait(false); + return await producerBuilder.CreateAsync(configuration, cancellationToken).ConfigureAwait(false); } private Task CreateSession(CancellationToken cancellationToken) diff --git a/src/ActiveMQ.Net/ConnectionExtensions.cs b/src/ActiveMQ.Net/ConnectionExtensions.cs index 29e65ca2..ae77d902 100644 --- a/src/ActiveMQ.Net/ConnectionExtensions.cs +++ b/src/ActiveMQ.Net/ConnectionExtensions.cs @@ -35,5 +35,11 @@ public static Task CreateProducerAsync(this IConnection connection, s }; return connection.CreateProducerAsync(configuration, cancellationToken); } + + public static Task CreateAnonymousProducer(this IConnection connection, CancellationToken cancellationToken = default) + { + var configuration = new AnonymousProducerConfiguration(); + return connection.CreateAnonymousProducer(configuration, cancellationToken); + } } } \ No newline at end of file diff --git a/src/ActiveMQ.Net/Header.cs b/src/ActiveMQ.Net/Header.cs new file mode 100644 index 00000000..2a55b456 --- /dev/null +++ b/src/ActiveMQ.Net/Header.cs @@ -0,0 +1,24 @@ +namespace ActiveMQ.Net +{ + internal sealed class Header + { + private readonly Amqp.Framing.Header _innerHeader; + + public Header(Amqp.Message innerMessage) + { + _innerHeader = innerMessage.Header ??= new Amqp.Framing.Header(); + } + + public byte? Priority + { + get => _innerHeader.HasField(1) ? _innerHeader.Priority : default(byte?); + set + { + if (value != default) + _innerHeader.Priority = value.Value; + else + _innerHeader.ResetField(1); + } + } + } +} \ No newline at end of file diff --git a/src/ActiveMQ.Net/IBaseProducerConfiguration.cs b/src/ActiveMQ.Net/IBaseProducerConfiguration.cs new file mode 100644 index 00000000..44725f41 --- /dev/null +++ b/src/ActiveMQ.Net/IBaseProducerConfiguration.cs @@ -0,0 +1,7 @@ +namespace ActiveMQ.Net +{ + public interface IBaseProducerConfiguration + { + byte? MessagePriority { get; } + } +} \ No newline at end of file diff --git a/src/ActiveMQ.Net/IConnection.cs b/src/ActiveMQ.Net/IConnection.cs index 8f2f1ffe..c503eaf8 100644 --- a/src/ActiveMQ.Net/IConnection.cs +++ b/src/ActiveMQ.Net/IConnection.cs @@ -13,7 +13,7 @@ public interface IConnection : IAsyncDisposable bool IsOpened { get; } Task CreateConsumerAsync(ConsumerConfiguration configuration, CancellationToken cancellationToken = default); Task CreateProducerAsync(ProducerConfiguration configuration, CancellationToken cancellationToken = default); - Task CreateAnonymousProducer(CancellationToken cancellationToken = default); + Task CreateAnonymousProducer(AnonymousProducerConfiguration configuration, CancellationToken cancellationToken = default); event EventHandler ConnectionClosed; event EventHandler ConnectionRecovered; event EventHandler ConnectionRecoveryError; diff --git a/src/ActiveMQ.Net/Message.cs b/src/ActiveMQ.Net/Message.cs index f1b1ac29..5c8e75b0 100644 --- a/src/ActiveMQ.Net/Message.cs +++ b/src/ActiveMQ.Net/Message.cs @@ -6,10 +6,10 @@ namespace ActiveMQ.Net { public class Message { + private Header _header; private Properties _properties; private ApplicationProperties _applicationProperties; private MessageAnnotations _messageAnnotations; - internal Amqp.Message InnerMessage { get; } internal Message(Amqp.Message message) { @@ -54,11 +54,24 @@ private static RestrictedDescribed GetBodySection(object body) } } - public Properties Properties => _properties ??= new Properties(InnerMessage); + internal Amqp.Message InnerMessage { get; } - public ApplicationProperties ApplicationProperties => _applicationProperties ??= new ApplicationProperties(InnerMessage); + private Header Header => _header ??= new Header(InnerMessage); + + internal Properties Properties => _properties ??= new Properties(InnerMessage); internal MessageAnnotations MessageAnnotations => _messageAnnotations ??= new MessageAnnotations(InnerMessage); + public ApplicationProperties ApplicationProperties => _applicationProperties ??= new ApplicationProperties(InnerMessage); + + public byte? Priority + { + get => Header.Priority; + set + { + if (value > 9) throw new ArgumentOutOfRangeException(nameof(value), $"Priority value {value} is out of range (0..9)."); + Header.Priority = value; + } + } public T GetBody() { diff --git a/src/ActiveMQ.Net/Producer.cs b/src/ActiveMQ.Net/Producer.cs index 25bfe47a..d71c0b05 100644 --- a/src/ActiveMQ.Net/Producer.cs +++ b/src/ActiveMQ.Net/Producer.cs @@ -9,7 +9,7 @@ internal class Producer : ProducerBase, IProducer { private readonly ProducerConfiguration _configuration; - public Producer(ILoggerFactory loggerFactory, SenderLink senderLink, ProducerConfiguration configuration) : base(loggerFactory, senderLink) + public Producer(ILoggerFactory loggerFactory, SenderLink senderLink, ProducerConfiguration configuration) : base(loggerFactory, senderLink, configuration) { _configuration = configuration; } diff --git a/src/ActiveMQ.Net/ProducerBase.cs b/src/ActiveMQ.Net/ProducerBase.cs index 923b65bb..fa224a59 100644 --- a/src/ActiveMQ.Net/ProducerBase.cs +++ b/src/ActiveMQ.Net/ProducerBase.cs @@ -8,17 +8,19 @@ namespace ActiveMQ.Net { - internal class ProducerBase + internal abstract class ProducerBase { private static readonly OutcomeCallback _onOutcome = OnOutcome; private readonly ILogger _logger; private readonly SenderLink _senderLink; + private readonly IBaseProducerConfiguration _configuration; - public ProducerBase(ILoggerFactory loggerFactory, SenderLink senderLink) + protected ProducerBase(ILoggerFactory loggerFactory, SenderLink senderLink, IBaseProducerConfiguration configuration) { _logger = loggerFactory.CreateLogger(GetType()); _senderLink = senderLink; + _configuration = configuration; } private bool IsDetaching => _senderLink.LinkState >= LinkState.DetachPipe; @@ -78,6 +80,7 @@ private void Send(string address, try { + message.Priority ??= _configuration.MessagePriority; message.Properties.To = address; message.MessageAnnotations[SymbolUtils.RoutingType] = routingType.GetRoutingAnnotation(); diff --git a/src/ActiveMQ.Net/ProducerConfiguration.cs b/src/ActiveMQ.Net/ProducerConfiguration.cs index 064e2ca0..813ffda4 100644 --- a/src/ActiveMQ.Net/ProducerConfiguration.cs +++ b/src/ActiveMQ.Net/ProducerConfiguration.cs @@ -1,8 +1,9 @@ namespace ActiveMQ.Net { - public class ProducerConfiguration + public class ProducerConfiguration : IBaseProducerConfiguration { public string Address { get; set; } public AddressRoutingType RoutingType { get; set; } + public byte? MessagePriority { get; set; } } } \ No newline at end of file diff --git a/src/ActiveMQ.Net/Properties.cs b/src/ActiveMQ.Net/Properties.cs index 93dd476f..b09be945 100644 --- a/src/ActiveMQ.Net/Properties.cs +++ b/src/ActiveMQ.Net/Properties.cs @@ -2,7 +2,7 @@ namespace ActiveMQ.Net { - public sealed class Properties + internal sealed class Properties { private readonly Amqp.Framing.Properties _innerProperties; @@ -22,6 +22,7 @@ public string MessageId _innerProperties.ResetField(0); } } + public byte[] UserId { get => _innerProperties.UserId; @@ -142,10 +143,10 @@ public string ReplyToGroupId } } - internal string To + public string To { get => _innerProperties.To; - set + internal set { if (value != default) _innerProperties.To = value; diff --git a/test/ActiveMQ.Net.IntegrationTests/AnonymousMessageProducerSpec.cs b/test/ActiveMQ.Net.IntegrationTests/AnonymousMessageProducerSpec.cs index b159a735..282af28e 100644 --- a/test/ActiveMQ.Net.IntegrationTests/AnonymousMessageProducerSpec.cs +++ b/test/ActiveMQ.Net.IntegrationTests/AnonymousMessageProducerSpec.cs @@ -9,32 +9,32 @@ public class AnonymousMessageProducerSpec : ActiveMQNetIntegrationSpec public AnonymousMessageProducerSpec(ITestOutputHelper output) : base(output) { } - + [Fact] public async Task Should_send_message_to_specified_address_using_AnycastRoutingType() { - var connection = await CreateConnection(); + await using var connection = await CreateConnection(); var address = nameof(Should_send_message_to_specified_address_using_AnycastRoutingType); - var producer = await connection.CreateAnonymousProducer(); - var consumer = await connection.CreateConsumerAsync(address, QueueRoutingType.Anycast); + await using var producer = await connection.CreateAnonymousProducer(); + await using var consumer = await connection.CreateConsumerAsync(address, QueueRoutingType.Anycast); await producer.SendAsync(address, AddressRoutingType.Anycast, new Message("foo")); var msg = await consumer.ReceiveAsync(); - + Assert.Equal("foo", msg.GetBody()); } - + [Fact] public async Task Should_send_message_to_specified_address_using_MulticastRoutingType() { - var connection = await CreateConnection(); + await using var connection = await CreateConnection(); var address = nameof(Should_send_message_to_specified_address_using_MulticastRoutingType); - var producer = await connection.CreateAnonymousProducer(); - var consumer = await connection.CreateConsumerAsync(address, QueueRoutingType.Multicast); + await using var producer = await connection.CreateAnonymousProducer(); + await using var consumer = await connection.CreateConsumerAsync(address, QueueRoutingType.Multicast); await producer.SendAsync(address, AddressRoutingType.Multicast, new Message("foo")); var msg = await consumer.ReceiveAsync(); - + Assert.Equal("foo", msg.GetBody()); } } diff --git a/test/ActiveMQ.Net.IntegrationTests/MessagePrioritySpec.cs b/test/ActiveMQ.Net.IntegrationTests/MessagePrioritySpec.cs new file mode 100644 index 00000000..99a08df0 --- /dev/null +++ b/test/ActiveMQ.Net.IntegrationTests/MessagePrioritySpec.cs @@ -0,0 +1,70 @@ +using System.Threading.Tasks; +using Xunit; +using Xunit.Abstractions; + +namespace ActiveMQ.Net.IntegrationTests +{ + public class MessagePrioritySpec : ActiveMQNetIntegrationSpec + { + public MessagePrioritySpec(ITestOutputHelper output) : base(output) + { + } + + [Fact] + public async Task Should_receive_messages_ordered_via_priority() + { + var address = nameof(Should_receive_messages_ordered_via_priority); + await using var connection = await CreateConnection(); + await using var producer = await connection.CreateProducerAsync(address, AddressRoutingType.Anycast); + + for (var i = 0; i <= 9; i++) + { + await producer.SendAsync(new Message(i) { Priority = (byte) i }); + } + + await using var consumer = await connection.CreateConsumerAsync(address, QueueRoutingType.Anycast); + + for (var i = 9; i >= 0; i--) + { + var message = await consumer.ReceiveAsync(); + Assert.Equal((byte) i, message.Priority); + Assert.Equal(i, message.GetBody()); + } + } + + [Fact] + public async Task Messages_without_priority_should_be_delivered_with_priority_4() + { + var address = nameof(Messages_without_priority_should_be_delivered_with_priority_4); + await using var connection = await CreateConnection(); + await using var producer = await connection.CreateProducerAsync(address, AddressRoutingType.Anycast); + + await producer.SendAsync(new Message("low_priority") { Priority = 0 }); + await producer.SendAsync(new Message("normal_priority") { Priority = 4 }); + await producer.SendAsync(new Message("no_priority")); + await producer.SendAsync(new Message("normal_priority") { Priority = 4 }); + await producer.SendAsync(new Message("high_priority") { Priority = 9 }); + + await using var consumer = await connection.CreateConsumerAsync(address, QueueRoutingType.Anycast); + + Assert.Equal("high_priority", (await consumer.ReceiveAsync()).GetBody()); + Assert.Equal("normal_priority", (await consumer.ReceiveAsync()).GetBody()); + Assert.Equal("no_priority", (await consumer.ReceiveAsync()).GetBody()); + Assert.Equal("normal_priority", (await consumer.ReceiveAsync()).GetBody()); + Assert.Equal("low_priority", (await consumer.ReceiveAsync()).GetBody()); + } + + [Fact] + public async Task Should_take_message_priority_from_producer_configuration() + { + var address = nameof(Should_take_message_priority_from_producer_configuration); + await using var connection = await CreateConnection(); + await using var producer = await connection.CreateAnonymousProducer(new AnonymousProducerConfiguration { MessagePriority = 9 }); + await using var consumer = await connection.CreateConsumerAsync(address, QueueRoutingType.Anycast); + + await producer.SendAsync(address, AddressRoutingType.Anycast, new Message("foo")); + + Assert.Equal((byte) 9, (await consumer.ReceiveAsync()).Priority); + } + } +} \ No newline at end of file diff --git a/test/ActiveMQ.Net.IntegrationTests/MessageRoutingStrategiesSpec.cs b/test/ActiveMQ.Net.IntegrationTests/MessageRoutingStrategiesSpec.cs index 63d7e5c7..61956cdb 100644 --- a/test/ActiveMQ.Net.IntegrationTests/MessageRoutingStrategiesSpec.cs +++ b/test/ActiveMQ.Net.IntegrationTests/MessageRoutingStrategiesSpec.cs @@ -13,30 +13,30 @@ public MessageRoutingStrategiesSpec(ITestOutputHelper output) : base(output) [Fact] public async Task Should_send_and_consume_message_using_AnycastRouting() { - var connection = await CreateConnection(); + await using var connection = await CreateConnection(); var address = nameof(Should_send_and_consume_message_using_AnycastRouting); - var producer = await connection.CreateProducerAsync(address, AddressRoutingType.Anycast); - var consumer = await connection.CreateConsumerAsync(address, QueueRoutingType.Anycast); + await using var producer = await connection.CreateProducerAsync(address, AddressRoutingType.Anycast); + await using var consumer = await connection.CreateConsumerAsync(address, QueueRoutingType.Anycast); await producer.SendAsync(new Message("foo")); var msg = await consumer.ReceiveAsync(); - + Assert.Equal("foo", msg.GetBody()); } - + [Fact] public async Task Should_send_and_consume_messages_using_MulticastRouting() { - var connection = await CreateConnection(); + await using var connection = await CreateConnection(); var address = nameof(Should_send_and_consume_messages_using_MulticastRouting); - var producer = await connection.CreateProducerAsync(address, AddressRoutingType.Multicast); - var consumer1 = await connection.CreateConsumerAsync(address, QueueRoutingType.Multicast); - var consumer2 = await connection.CreateConsumerAsync(address, QueueRoutingType.Multicast); + await using var producer = await connection.CreateProducerAsync(address, AddressRoutingType.Multicast); + await using var consumer1 = await connection.CreateConsumerAsync(address, QueueRoutingType.Multicast); + await using var consumer2 = await connection.CreateConsumerAsync(address, QueueRoutingType.Multicast); await producer.SendAsync(new Message("foo")); var msg1 = await consumer1.ReceiveAsync(); var msg2 = await consumer2.ReceiveAsync(); - + Assert.Equal("foo", msg1.GetBody()); Assert.Equal("foo", msg2.GetBody()); } diff --git a/test/ActiveMQ.Net.UnitTests/ActiveMQNetSpec.cs b/test/ActiveMQ.Net.UnitTests/ActiveMQNetSpec.cs index 5647ff0c..5790f677 100644 --- a/test/ActiveMQ.Net.UnitTests/ActiveMQNetSpec.cs +++ b/test/ActiveMQ.Net.UnitTests/ActiveMQNetSpec.cs @@ -10,7 +10,7 @@ namespace ActiveMQ.Net.Tests { - public class ActiveMQNetSpec + public abstract class ActiveMQNetSpec { private readonly ITestOutputHelper _output; diff --git a/test/ActiveMQ.Net.UnitTests/CreateAnonymousProducerSpec.cs b/test/ActiveMQ.Net.UnitTests/CreateAnonymousProducerSpec.cs new file mode 100644 index 00000000..69fa7375 --- /dev/null +++ b/test/ActiveMQ.Net.UnitTests/CreateAnonymousProducerSpec.cs @@ -0,0 +1,24 @@ +using System; +using System.Threading.Tasks; +using Xunit; +using Xunit.Abstractions; + +namespace ActiveMQ.Net.Tests +{ + public class CreateAnonymousProducerSpec : ActiveMQNetSpec + { + public CreateAnonymousProducerSpec(ITestOutputHelper output) : base(output) + { + } + + [Fact] + public async Task Throws_when_created_with_null_configuration() + { + var endpoint = GetUniqueEndpoint(); + using var host = CreateOpenedContainerHost(endpoint); + await using var connection = await CreateConnection(endpoint); + + await Assert.ThrowsAsync(() => connection.CreateAnonymousProducer(null)); + } + } +} \ No newline at end of file diff --git a/test/ActiveMQ.Net.UnitTests/CreateProducerSpec.cs b/test/ActiveMQ.Net.UnitTests/CreateProducerSpec.cs index 0e7e07d5..4d25a715 100644 --- a/test/ActiveMQ.Net.UnitTests/CreateProducerSpec.cs +++ b/test/ActiveMQ.Net.UnitTests/CreateProducerSpec.cs @@ -173,5 +173,15 @@ public async Task Should_cancel_CreateProducerAsync_when_address_specified_but_a var cancellationTokenSource = new CancellationTokenSource(ShortTimeout); await Assert.ThrowsAnyAsync(() => connection.CreateProducerAsync("a1", AddressRoutingType.Multicast, cancellationTokenSource.Token)); } + + [Fact] + public async Task Throws_when_created_with_null_configuration() + { + var endpoint = GetUniqueEndpoint(); + using var host = CreateOpenedContainerHost(endpoint); + await using var connection = await CreateConnection(endpoint); + + await Assert.ThrowsAsync(() => connection.CreateProducerAsync(null)); + } } } \ No newline at end of file diff --git a/test/ActiveMQ.Net.UnitTests/MessagePrioritySpec.cs b/test/ActiveMQ.Net.UnitTests/MessagePrioritySpec.cs new file mode 100644 index 00000000..daee14d5 --- /dev/null +++ b/test/ActiveMQ.Net.UnitTests/MessagePrioritySpec.cs @@ -0,0 +1,84 @@ +using System; +using System.Threading.Tasks; +using Xunit; +using Xunit.Abstractions; + +namespace ActiveMQ.Net.Tests +{ + public class MessagePrioritySpec : ActiveMQNetSpec + { + public MessagePrioritySpec(ITestOutputHelper output) : base(output) + { + } + + [Fact] + public async Task Should_send_message_with_priority_not_set() + { + using var host = CreateOpenedContainerHost(); + var messageProcessor = host.CreateMessageProcessor("a1"); + await using var connection = await CreateConnection(host.Endpoint); + await using var producer = await connection.CreateProducerAsync("a1", AddressRoutingType.Anycast); + + var message = new Message("foo"); + await producer.SendAsync(message); + + var received = messageProcessor.Dequeue(Timeout); + + Assert.Null(received.Priority); + } + + [Theory] + [MemberData(nameof(ValidMessagePrioritiesData))] + public async Task Should_send_message_with_valid_priority(byte priority) + { + using var host = CreateOpenedContainerHost(); + var messageProcessor = host.CreateMessageProcessor("a1"); + await using var connection = await CreateConnection(host.Endpoint); + await using var producer = await connection.CreateProducerAsync("a1", AddressRoutingType.Anycast); + + var message = new Message("foo") + { + Priority = priority + }; + await producer.SendAsync(message); + + var received = messageProcessor.Dequeue(Timeout); + + Assert.Equal(priority, received.Priority); + } + + public static TheoryData ValidMessagePrioritiesData => new TheoryData { default, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }; + + [Fact] + public void Throws_when_invalid_priority_assigned() + { + Assert.Throws(() => + { + var _ = new Message("foo") + { + Priority = 10 + }; + }); + } + + [Fact] + public async Task Should_take_message_priority_from_Producer_configuration() + { + using var host = CreateOpenedContainerHost(); + var messageProcessor = host.CreateMessageProcessor("a1"); + await using var connection = await CreateConnection(host.Endpoint); + await using var producer = await connection.CreateProducerAsync(new ProducerConfiguration + { + Address = "a1", + RoutingType = AddressRoutingType.Anycast, + MessagePriority = 9 + }); + + await producer.SendAsync(new Message("foo")); + + var received = messageProcessor.Dequeue(Timeout); + + Assert.Equal((byte) 9, received.Priority); + } + } +} \ No newline at end of file