Skip to content

Commit

Permalink
Support all AMQP message payload types
Browse files Browse the repository at this point in the history
  • Loading branch information
Havret committed Apr 30, 2020
1 parent 43e1d8d commit 30df257
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 5 deletions.
42 changes: 40 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ await anonymousProducer.SendAsync("a2", AddressRoutingType.Multicast, new Messag

### Receiving messages

ActiveMQ. Net uses `IConsumer` interface for receiving messages. `IConsumer` can be created as follows:
ActiveMQ.Net uses `IConsumer` interface for receiving messages. `IConsumer` can be created as follows:

```csharp
var consumer = await connection.CreateConsumerAsync("a1", QueueRoutingType.Anycast);
Expand All @@ -105,7 +105,45 @@ var cts = new CancellationTokenSource();
var message = await consumer.ReceiveAsync(cts.Token);
```

This may be particularly useful when you want to quickly shut down your application.
This may be particularly useful when you want to shut down your application.

### Messages

ActiveMQ.Net uses `Message` class to represent messages which may be sent and received. A `Message` can carry various types of payload and accompanying metadata.

A new message can be created as follows:

```csharp
var message = new Message("foo");
```

The `Message` constructor accepts a single parameter of type object. It's the message body. Although body argument is very generic, only certain types are considered as a valid payload:

* `string`
* `char`
* `byte`
* `sbyte`
* `short`
* `ushort`
* `int`
* `uint`
* `long`
* `ulong`
* `float`
* `System.Guid`
* `System.DateTime`
* `byte[]`
* `Amqp.Types.List`

An attempt to pass an argument out of this list will result in `ArgumentOutOfRangeException`. Passing `null` is not acceptable either and cause `ArgumentNullException`.

In order to get the message payload call `GetBody` and specify the expected type of the body section:

```csharp
var body = message.GetBody<T>();
```

If `T` matches the type of the payload, the value will be returned, otherwise, you will get `default(T)`.

### Resources lifespan

Expand Down
43 changes: 40 additions & 3 deletions src/ActiveMQ.Net/Message.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
namespace ActiveMQ.Net
using System;
using Amqp.Framing;
using Amqp.Types;

namespace ActiveMQ.Net
{
public class Message
{
Expand All @@ -12,9 +16,42 @@ internal Message(Amqp.Message message)
InnerMessage = message;
}

public Message(string body)
public Message(object body)
{
InnerMessage = new Amqp.Message
{
BodySection = GetBodySection(body)
};
}

private static RestrictedDescribed GetBodySection(object body)
{
InnerMessage = new Amqp.Message(body);
switch (body)
{
case string _:
case char _:
case byte _:
case sbyte _:
case short _:
case ushort _:
case int _:
case uint _:
case long _:
case ulong _:
case float _:
case double _:
case Guid _:
case DateTime _:
return new AmqpValue { Value = body };
case byte[] payload:
return new Data { Binary = payload };
case List list:
return new AmqpSequence { List = list };
case null:
throw new ArgumentNullException(nameof(body));
default:
throw new ArgumentOutOfRangeException(nameof(body), $"The type '{body.GetType().FullName}' is not a valid AMQP type and cannot be encoded.");
}
}

public Properties Properties => _properties ??= new Properties(InnerMessage);
Expand Down
74 changes: 74 additions & 0 deletions test/ActiveMQ.Net.UnitTests/MessageSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -168,5 +168,79 @@ public async Task Should_set_and_get_message_application_properties()
},
}, resetMsg.ApplicationProperties["MapKey"]);
}

[Fact]
public Task Should_send_message_with_char_payload() => ShouldSendMessageWithPayload(char.MaxValue);

[Fact]
public Task Should_send_message_with_string_payload() => ShouldSendMessageWithPayload("foo");

[Fact]
public Task Should_send_message_with_byte_payload() => ShouldSendMessageWithPayload(byte.MaxValue);

[Fact]
public Task Should_send_message_with_sbyte_payload() => ShouldSendMessageWithPayload(sbyte.MaxValue);

[Fact]
public Task Should_send_message_with_short_payload() => ShouldSendMessageWithPayload(short.MaxValue);

[Fact]
public Task Should_send_message_with_ushort_payload() => ShouldSendMessageWithPayload(ushort.MaxValue);

[Fact]
public Task Should_send_message_with_int_payload() => ShouldSendMessageWithPayload(int.MaxValue);

[Fact]
public Task Should_send_message_with_uint_payload() => ShouldSendMessageWithPayload(uint.MaxValue);

[Fact]
public Task Should_send_message_with_long_payload() => ShouldSendMessageWithPayload(long.MaxValue);

[Fact]
public Task Should_send_message_with_ulong_payload() => ShouldSendMessageWithPayload(ulong.MaxValue);

[Fact]
public Task Should_send_message_with_float_payload() => ShouldSendMessageWithPayload(float.MaxValue);

[Fact]
public Task Should_send_message_with_double_payload() => ShouldSendMessageWithPayload(double.MaxValue);

[Fact]
public Task Should_send_message_with_Guid_payload() => ShouldSendMessageWithPayload(Guid.NewGuid());

[Fact]
public Task Should_send_message_with_DateTime_payload()
{
// drop tics precision, as AMQP timestamp is represented as milliseconds from Unix epoch
const long ticksPerMillisecond = 10000;
var dateTime = new DateTime(DateTime.UtcNow.Ticks / ticksPerMillisecond * ticksPerMillisecond, DateTimeKind.Utc);

return ShouldSendMessageWithPayload(dateTime);
}

[Fact]
public Task Should_send_message_with_bytes_payload() => ShouldSendMessageWithPayload(new byte[] { 1, 2, 3, 4 });

[Fact]
public Task Should_send_message_with_List_payload() => ShouldSendMessageWithPayload(new List
{
char.MaxValue, "foo", byte.MaxValue, sbyte.MaxValue, short.MaxValue, ushort.MaxValue, int.MaxValue, uint.MaxValue, long.MaxValue, ulong.MaxValue, float.MaxValue, double.MaxValue, new byte[] { 1, 2, 3, 4 },
new List { 1, 2, 3, 4 }
});

private async Task ShouldSendMessageWithPayload<T>(T payload)
{
using var host = CreateOpenedContainerHost();
var messageProcessor = host.CreateMessageProcessor("a1");
await using var connection = await CreateConnection(host.Endpoint);
await using var producer = await connection.CreateProducerAsync("a1", AddressRoutingType.Anycast);

var message = new Message(payload);
await producer.SendAsync(message);

var received = messageProcessor.Dequeue(Timeout);

Assert.Equal(payload, received.GetBody<T>());
}
}
}

0 comments on commit 30df257

Please sign in to comment.