Skip to content

Commit

Permalink
Message priority support
Browse files Browse the repository at this point in the history
  • Loading branch information
Havret committed May 1, 2020
1 parent 30df257 commit c2f5ac2
Show file tree
Hide file tree
Showing 23 changed files with 323 additions and 43 deletions.
30 changes: 28 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,9 @@ var message = await consumer.ReceiveAsync(cts.Token);

This may be particularly useful when you want to shut down your application.

### Messages
### Message payload

ActiveMQ.Net uses `Message` class to represent messages which may be sent and received. A `Message` can carry various types of payload and accompanying metadata.
ActiveMQ.Net uses `Message` class to represent messages which may be transmitted. A `Message` can carry various types of payload and accompanying metadata.

A new message can be created as follows:

Expand Down Expand Up @@ -145,6 +145,32 @@ var body = message.GetBody<T>();

If `T` matches the type of the payload, the value will be returned, otherwise, you will get `default(T)`.

### Message properties

#### Priority

This property defines the level of importance of a message. ActiveMQ Artemis uses it to prioritize message delivery. Messages with higher priority will be delivered before messages with lower priority. Messages with the same priority level should be delivered according to the order they were sent with. There are 10 levels of message priority, ranging from 0 (the lowest) to 9 (the highest). If no message priority is set on the client (Priority set to `null`), the message will be treated as if it was assigned a normal priority (4).

Default message priority can be overridden on message producer level:

```csharp
var producer = await connection.CreateProducerAsync(new ProducerConfiguration
{
Address = "a1",
RoutingType = AddressRoutingType.Anycast,
MessagePriority = 9
});
```

Each message sent with this producer will automatically have priority set to `9` unless specified otherwise. The priority set explicitly on the message object takes the highest precedence.

```csharp
await producer.SendAsync(new Message("foo")
{
Priority = 0 // takes precedence over priority specified on producer level
});
```

### Resources lifespan

Connections, producers, and consumers are meant to be long-lived objects. The underlying protocol is designed and optimized for long-running connections. That means that opening a new connection per operation, e.g. sending a message, is unnecessary and strongly discouraged as it will introduce a lot of network round trips and overhead. The same rule applies to all ActiveMQ. Net resources.
2 changes: 1 addition & 1 deletion src/ActiveMQ.Net/AnonymousProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace ActiveMQ.Net
{
internal class AnonymousProducer : ProducerBase, IAnonymousProducer
{
public AnonymousProducer(ILoggerFactory loggerFactory, SenderLink senderLink) : base(loggerFactory, senderLink)
public AnonymousProducer(ILoggerFactory loggerFactory, SenderLink senderLink, AnonymousProducerConfiguration configuration) : base(loggerFactory, senderLink, configuration)
{
}

Expand Down
7 changes: 7 additions & 0 deletions src/ActiveMQ.Net/AnonymousProducerConfiguration.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace ActiveMQ.Net
{
public class AnonymousProducerConfiguration : IBaseProducerConfiguration
{
public byte? MessagePriority { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ namespace ActiveMQ.Net.AutoRecovering
{
internal class AutoRecoveringAnonymousProducer : AutoRecoveringProducerBase, IAnonymousProducer
{
private readonly AnonymousProducerConfiguration _configuration;
private IAnonymousProducer _producer;

public AutoRecoveringAnonymousProducer(ILoggerFactory loggerFactory) : base(loggerFactory)
public AutoRecoveringAnonymousProducer(ILoggerFactory loggerFactory, AnonymousProducerConfiguration configuration) : base(loggerFactory)
{
_configuration = configuration;
}

public async Task SendAsync(string address, AddressRoutingType routingType, Message message, CancellationToken cancellationToken = default)
Expand Down Expand Up @@ -51,7 +53,7 @@ public void Send(string address, AddressRoutingType routingType, Message message

protected override async Task RecoverUnderlyingProducer(IConnection connection, CancellationToken cancellationToken)
{
_producer = await connection.CreateAnonymousProducer(cancellationToken).ConfigureAwait(false);
_producer = await connection.CreateAnonymousProducer(_configuration, cancellationToken).ConfigureAwait(false);
}

protected override ValueTask DisposeUnderlyingProducer()
Expand Down
4 changes: 2 additions & 2 deletions src/ActiveMQ.Net/AutoRecovering/AutoRecoveringConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,9 @@ public async Task<IProducer> CreateProducerAsync(ProducerConfiguration configura
return autoRecoveringProducer;
}

public async Task<IAnonymousProducer> CreateAnonymousProducer(CancellationToken cancellationToken)
public async Task<IAnonymousProducer> CreateAnonymousProducer(AnonymousProducerConfiguration configuration, CancellationToken cancellationToken = default)
{
var autoRecoveringAnonymousProducer = new AutoRecoveringAnonymousProducer(_loggerFactory);
var autoRecoveringAnonymousProducer = new AutoRecoveringAnonymousProducer(_loggerFactory, configuration);
await PrepareRecoverable(autoRecoveringAnonymousProducer, cancellationToken);
return autoRecoveringAnonymousProducer;
}
Expand Down
6 changes: 4 additions & 2 deletions src/ActiveMQ.Net/Builders/AnonymousProducerBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ public AnonymousProducerBuilder(ILoggerFactory loggerFactory, Session session)
_tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
}

public async Task<IAnonymousProducer> CreateAsync(CancellationToken cancellationToken)
public async Task<IAnonymousProducer> CreateAsync(AnonymousProducerConfiguration configuration, CancellationToken cancellationToken)
{
if (configuration == null) throw new ArgumentNullException(nameof(configuration));

cancellationToken.ThrowIfCancellationRequested();
cancellationToken.Register(() => _tcs.TrySetCanceled());

Expand All @@ -33,7 +35,7 @@ public async Task<IAnonymousProducer> CreateAsync(CancellationToken cancellation
var senderLink = new SenderLink(_session, Guid.NewGuid().ToString(), target, OnAttached);
senderLink.AddClosedCallback(OnClosed);
await _tcs.Task.ConfigureAwait(false);
var producer = new AnonymousProducer(_loggerFactory, senderLink);
var producer = new AnonymousProducer(_loggerFactory, senderLink, configuration);
senderLink.Closed -= OnClosed;
return producer;
}
Expand Down
4 changes: 2 additions & 2 deletions src/ActiveMQ.Net/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ public async Task<IProducer> CreateProducerAsync(ProducerConfiguration configura
return await producerBuilder.CreateAsync(configuration, cancellationToken).ConfigureAwait(false);
}

public async Task<IAnonymousProducer> CreateAnonymousProducer(CancellationToken cancellationToken)
public async Task<IAnonymousProducer> CreateAnonymousProducer(AnonymousProducerConfiguration configuration, CancellationToken cancellationToken)
{
var session = await CreateSession(cancellationToken).ConfigureAwait(false);
var producerBuilder = new AnonymousProducerBuilder(_loggerFactory, session);
return await producerBuilder.CreateAsync(cancellationToken).ConfigureAwait(false);
return await producerBuilder.CreateAsync(configuration, cancellationToken).ConfigureAwait(false);
}

private Task<Session> CreateSession(CancellationToken cancellationToken)
Expand Down
6 changes: 6 additions & 0 deletions src/ActiveMQ.Net/ConnectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,11 @@ public static Task<IProducer> CreateProducerAsync(this IConnection connection, s
};
return connection.CreateProducerAsync(configuration, cancellationToken);
}

public static Task<IAnonymousProducer> CreateAnonymousProducer(this IConnection connection, CancellationToken cancellationToken = default)
{
var configuration = new AnonymousProducerConfiguration();
return connection.CreateAnonymousProducer(configuration, cancellationToken);
}
}
}
24 changes: 24 additions & 0 deletions src/ActiveMQ.Net/Header.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
namespace ActiveMQ.Net
{
internal sealed class Header
{
private readonly Amqp.Framing.Header _innerHeader;

public Header(Amqp.Message innerMessage)
{
_innerHeader = innerMessage.Header ??= new Amqp.Framing.Header();
}

public byte? Priority
{
get => _innerHeader.HasField(1) ? _innerHeader.Priority : default(byte?);
set
{
if (value != default)
_innerHeader.Priority = value.Value;
else
_innerHeader.ResetField(1);
}
}
}
}
7 changes: 7 additions & 0 deletions src/ActiveMQ.Net/IBaseProducerConfiguration.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace ActiveMQ.Net
{
public interface IBaseProducerConfiguration
{
byte? MessagePriority { get; }
}
}
2 changes: 1 addition & 1 deletion src/ActiveMQ.Net/IConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public interface IConnection : IAsyncDisposable
bool IsOpened { get; }
Task<IConsumer> CreateConsumerAsync(ConsumerConfiguration configuration, CancellationToken cancellationToken = default);
Task<IProducer> CreateProducerAsync(ProducerConfiguration configuration, CancellationToken cancellationToken = default);
Task<IAnonymousProducer> CreateAnonymousProducer(CancellationToken cancellationToken = default);
Task<IAnonymousProducer> CreateAnonymousProducer(AnonymousProducerConfiguration configuration, CancellationToken cancellationToken = default);
event EventHandler<ConnectionClosedEventArgs> ConnectionClosed;
event EventHandler<ConnectionRecoveredEventArgs> ConnectionRecovered;
event EventHandler<ConnectionRecoveryErrorEventArgs> ConnectionRecoveryError;
Expand Down
19 changes: 16 additions & 3 deletions src/ActiveMQ.Net/Message.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ namespace ActiveMQ.Net
{
public class Message
{
private Header _header;
private Properties _properties;
private ApplicationProperties _applicationProperties;
private MessageAnnotations _messageAnnotations;
internal Amqp.Message InnerMessage { get; }

internal Message(Amqp.Message message)
{
Expand Down Expand Up @@ -54,11 +54,24 @@ private static RestrictedDescribed GetBodySection(object body)
}
}

public Properties Properties => _properties ??= new Properties(InnerMessage);
internal Amqp.Message InnerMessage { get; }

public ApplicationProperties ApplicationProperties => _applicationProperties ??= new ApplicationProperties(InnerMessage);
private Header Header => _header ??= new Header(InnerMessage);

internal Properties Properties => _properties ??= new Properties(InnerMessage);

internal MessageAnnotations MessageAnnotations => _messageAnnotations ??= new MessageAnnotations(InnerMessage);
public ApplicationProperties ApplicationProperties => _applicationProperties ??= new ApplicationProperties(InnerMessage);

public byte? Priority
{
get => Header.Priority;
set
{
if (value > 9) throw new ArgumentOutOfRangeException(nameof(value), $"Priority value {value} is out of range (0..9).");
Header.Priority = value;
}
}

public T GetBody<T>()
{
Expand Down
2 changes: 1 addition & 1 deletion src/ActiveMQ.Net/Producer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ internal class Producer : ProducerBase, IProducer
{
private readonly ProducerConfiguration _configuration;

public Producer(ILoggerFactory loggerFactory, SenderLink senderLink, ProducerConfiguration configuration) : base(loggerFactory, senderLink)
public Producer(ILoggerFactory loggerFactory, SenderLink senderLink, ProducerConfiguration configuration) : base(loggerFactory, senderLink, configuration)
{
_configuration = configuration;
}
Expand Down
7 changes: 5 additions & 2 deletions src/ActiveMQ.Net/ProducerBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,19 @@

namespace ActiveMQ.Net
{
internal class ProducerBase
internal abstract class ProducerBase
{
private static readonly OutcomeCallback _onOutcome = OnOutcome;

private readonly ILogger _logger;
private readonly SenderLink _senderLink;
private readonly IBaseProducerConfiguration _configuration;

public ProducerBase(ILoggerFactory loggerFactory, SenderLink senderLink)
protected ProducerBase(ILoggerFactory loggerFactory, SenderLink senderLink, IBaseProducerConfiguration configuration)
{
_logger = loggerFactory.CreateLogger(GetType());
_senderLink = senderLink;
_configuration = configuration;
}

private bool IsDetaching => _senderLink.LinkState >= LinkState.DetachPipe;
Expand Down Expand Up @@ -78,6 +80,7 @@ private void Send(string address,

try
{
message.Priority ??= _configuration.MessagePriority;
message.Properties.To = address;
message.MessageAnnotations[SymbolUtils.RoutingType] = routingType.GetRoutingAnnotation();

Expand Down
3 changes: 2 additions & 1 deletion src/ActiveMQ.Net/ProducerConfiguration.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
namespace ActiveMQ.Net
{
public class ProducerConfiguration
public class ProducerConfiguration : IBaseProducerConfiguration
{
public string Address { get; set; }
public AddressRoutingType RoutingType { get; set; }
public byte? MessagePriority { get; set; }
}
}
7 changes: 4 additions & 3 deletions src/ActiveMQ.Net/Properties.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

namespace ActiveMQ.Net
{
public sealed class Properties
internal sealed class Properties
{
private readonly Amqp.Framing.Properties _innerProperties;

Expand All @@ -22,6 +22,7 @@ public string MessageId
_innerProperties.ResetField(0);
}
}

public byte[] UserId
{
get => _innerProperties.UserId;
Expand Down Expand Up @@ -142,10 +143,10 @@ public string ReplyToGroupId
}
}

internal string To
public string To
{
get => _innerProperties.To;
set
internal set
{
if (value != default)
_innerProperties.To = value;
Expand Down
20 changes: 10 additions & 10 deletions test/ActiveMQ.Net.IntegrationTests/AnonymousMessageProducerSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,32 +9,32 @@ public class AnonymousMessageProducerSpec : ActiveMQNetIntegrationSpec
public AnonymousMessageProducerSpec(ITestOutputHelper output) : base(output)
{
}

[Fact]
public async Task Should_send_message_to_specified_address_using_AnycastRoutingType()
{
var connection = await CreateConnection();
await using var connection = await CreateConnection();
var address = nameof(Should_send_message_to_specified_address_using_AnycastRoutingType);
var producer = await connection.CreateAnonymousProducer();
var consumer = await connection.CreateConsumerAsync(address, QueueRoutingType.Anycast);
await using var producer = await connection.CreateAnonymousProducer();
await using var consumer = await connection.CreateConsumerAsync(address, QueueRoutingType.Anycast);

await producer.SendAsync(address, AddressRoutingType.Anycast, new Message("foo"));
var msg = await consumer.ReceiveAsync();

Assert.Equal("foo", msg.GetBody<string>());
}

[Fact]
public async Task Should_send_message_to_specified_address_using_MulticastRoutingType()
{
var connection = await CreateConnection();
await using var connection = await CreateConnection();
var address = nameof(Should_send_message_to_specified_address_using_MulticastRoutingType);
var producer = await connection.CreateAnonymousProducer();
var consumer = await connection.CreateConsumerAsync(address, QueueRoutingType.Multicast);
await using var producer = await connection.CreateAnonymousProducer();
await using var consumer = await connection.CreateConsumerAsync(address, QueueRoutingType.Multicast);

await producer.SendAsync(address, AddressRoutingType.Multicast, new Message("foo"));
var msg = await consumer.ReceiveAsync();

Assert.Equal("foo", msg.GetBody<string>());
}
}
Expand Down
Loading

0 comments on commit c2f5ac2

Please sign in to comment.