diff --git a/README.md b/README.md index 05bb340c..f7273155 100644 --- a/README.md +++ b/README.md @@ -84,7 +84,7 @@ await anonymousProducer.SendAsync("a2", AddressRoutingType.Multicast, new Messag ### Receiving messages -ActiveMQ. Net uses `IConsumer` interface for receiving messages. `IConsumer` can be created as follows: +ActiveMQ.Net uses `IConsumer` interface for receiving messages. `IConsumer` can be created as follows: ```csharp var consumer = await connection.CreateConsumerAsync("a1", QueueRoutingType.Anycast); @@ -105,7 +105,45 @@ var cts = new CancellationTokenSource(); var message = await consumer.ReceiveAsync(cts.Token); ``` -This may be particularly useful when you want to quickly shut down your application. +This may be particularly useful when you want to shut down your application. + +### Messages + +ActiveMQ.Net uses `Message` class to represent messages which may be sent and received. A `Message` can carry various types of payload and accompanying metadata. + +A new message can be created as follows: + +```csharp +var message = new Message("foo"); +``` + +The `Message` constructor accepts a single parameter of type object. It's the message body. Although body argument is very generic, only certain types are considered as a valid payload: + +* `string` +* `char` +* `byte` +* `sbyte` +* `short` +* `ushort` +* `int` +* `uint` +* `long` +* `ulong` +* `float` +* `System.Guid` +* `System.DateTime` +* `byte[]` +* `Amqp.Types.List` + +An attempt to pass an argument out of this list will result in `ArgumentOutOfRangeException`. Passing `null` is not acceptable either and cause `ArgumentNullException`. + +In order to get the message payload call `GetBody` and specify the expected type of the body section: + +```csharp +var body = message.GetBody(); +``` + +If `T` matches the type of the payload, the value will be returned, otherwise, you will get `default(T)`. ### Resources lifespan diff --git a/src/ActiveMQ.Net/Message.cs b/src/ActiveMQ.Net/Message.cs index 5f39ce36..f1b1ac29 100644 --- a/src/ActiveMQ.Net/Message.cs +++ b/src/ActiveMQ.Net/Message.cs @@ -1,4 +1,8 @@ -namespace ActiveMQ.Net +using System; +using Amqp.Framing; +using Amqp.Types; + +namespace ActiveMQ.Net { public class Message { @@ -12,9 +16,42 @@ internal Message(Amqp.Message message) InnerMessage = message; } - public Message(string body) + public Message(object body) + { + InnerMessage = new Amqp.Message + { + BodySection = GetBodySection(body) + }; + } + + private static RestrictedDescribed GetBodySection(object body) { - InnerMessage = new Amqp.Message(body); + switch (body) + { + case string _: + case char _: + case byte _: + case sbyte _: + case short _: + case ushort _: + case int _: + case uint _: + case long _: + case ulong _: + case float _: + case double _: + case Guid _: + case DateTime _: + return new AmqpValue { Value = body }; + case byte[] payload: + return new Data { Binary = payload }; + case List list: + return new AmqpSequence { List = list }; + case null: + throw new ArgumentNullException(nameof(body)); + default: + throw new ArgumentOutOfRangeException(nameof(body), $"The type '{body.GetType().FullName}' is not a valid AMQP type and cannot be encoded."); + } } public Properties Properties => _properties ??= new Properties(InnerMessage); diff --git a/test/ActiveMQ.Net.UnitTests/MessageSpec.cs b/test/ActiveMQ.Net.UnitTests/MessageSpec.cs index f84a1a7d..cf5cbf64 100644 --- a/test/ActiveMQ.Net.UnitTests/MessageSpec.cs +++ b/test/ActiveMQ.Net.UnitTests/MessageSpec.cs @@ -168,5 +168,79 @@ public async Task Should_set_and_get_message_application_properties() }, }, resetMsg.ApplicationProperties["MapKey"]); } + + [Fact] + public Task Should_send_message_with_char_payload() => ShouldSendMessageWithPayload(char.MaxValue); + + [Fact] + public Task Should_send_message_with_string_payload() => ShouldSendMessageWithPayload("foo"); + + [Fact] + public Task Should_send_message_with_byte_payload() => ShouldSendMessageWithPayload(byte.MaxValue); + + [Fact] + public Task Should_send_message_with_sbyte_payload() => ShouldSendMessageWithPayload(sbyte.MaxValue); + + [Fact] + public Task Should_send_message_with_short_payload() => ShouldSendMessageWithPayload(short.MaxValue); + + [Fact] + public Task Should_send_message_with_ushort_payload() => ShouldSendMessageWithPayload(ushort.MaxValue); + + [Fact] + public Task Should_send_message_with_int_payload() => ShouldSendMessageWithPayload(int.MaxValue); + + [Fact] + public Task Should_send_message_with_uint_payload() => ShouldSendMessageWithPayload(uint.MaxValue); + + [Fact] + public Task Should_send_message_with_long_payload() => ShouldSendMessageWithPayload(long.MaxValue); + + [Fact] + public Task Should_send_message_with_ulong_payload() => ShouldSendMessageWithPayload(ulong.MaxValue); + + [Fact] + public Task Should_send_message_with_float_payload() => ShouldSendMessageWithPayload(float.MaxValue); + + [Fact] + public Task Should_send_message_with_double_payload() => ShouldSendMessageWithPayload(double.MaxValue); + + [Fact] + public Task Should_send_message_with_Guid_payload() => ShouldSendMessageWithPayload(Guid.NewGuid()); + + [Fact] + public Task Should_send_message_with_DateTime_payload() + { + // drop tics precision, as AMQP timestamp is represented as milliseconds from Unix epoch + const long ticksPerMillisecond = 10000; + var dateTime = new DateTime(DateTime.UtcNow.Ticks / ticksPerMillisecond * ticksPerMillisecond, DateTimeKind.Utc); + + return ShouldSendMessageWithPayload(dateTime); + } + + [Fact] + public Task Should_send_message_with_bytes_payload() => ShouldSendMessageWithPayload(new byte[] { 1, 2, 3, 4 }); + + [Fact] + public Task Should_send_message_with_List_payload() => ShouldSendMessageWithPayload(new List + { + char.MaxValue, "foo", byte.MaxValue, sbyte.MaxValue, short.MaxValue, ushort.MaxValue, int.MaxValue, uint.MaxValue, long.MaxValue, ulong.MaxValue, float.MaxValue, double.MaxValue, new byte[] { 1, 2, 3, 4 }, + new List { 1, 2, 3, 4 } + }); + + private async Task ShouldSendMessageWithPayload(T payload) + { + using var host = CreateOpenedContainerHost(); + var messageProcessor = host.CreateMessageProcessor("a1"); + await using var connection = await CreateConnection(host.Endpoint); + await using var producer = await connection.CreateProducerAsync("a1", AddressRoutingType.Anycast); + + var message = new Message(payload); + await producer.SendAsync(message); + + var received = messageProcessor.Dequeue(Timeout); + + Assert.Equal(payload, received.GetBody()); + } } } \ No newline at end of file