diff --git a/NsqSharp.Tests/Bus/MessageMutatorTest.cs b/NsqSharp.Tests/Bus/MessageMutatorTest.cs new file mode 100644 index 0000000..8ad8cd4 --- /dev/null +++ b/NsqSharp.Tests/Bus/MessageMutatorTest.cs @@ -0,0 +1,512 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using Newtonsoft.Json; +using NsqSharp.Bus; +using NsqSharp.Bus.Configuration; +using NsqSharp.Bus.Configuration.BuiltIn; +using NsqSharp.Bus.Configuration.Providers; +using NsqSharp.Tests.Bus.TestFakes; +using NsqSharp.Utils.Extensions; +using NUnit.Framework; +using StructureMap; + +namespace NsqSharp.Tests.Bus +{ +#if !RUN_INTEGRATION_TESTS + [TestFixture(IgnoreReason = "NSQD Integration Test")] +#else + [TestFixture] +#endif + public class MessageMutatorTest + { + [Test] + public void SetParentIdInMessageMutator() + { + string topicName = string.Format("test_message_mutator_{0}", DateTime.Now.UnixNano()); + const string channelName = "test_message_router"; + + var container = new Container(); + + NsqdHttpApi.CreateTopic("http://127.0.0.1:4161", topicName); + NsqdHttpApi.CreateChannel("http://127.0.0.1:4161", topicName, channelName); + + try + { + BusService.Start(new BusConfiguration( + new StructureMapObjectBuilder(container), + new NewtonsoftJsonSerializer(typeof(JsonConverter).Assembly), + new MessageAuditorStub(), + new MessageTypeToTopicDictionary(new Dictionary { + { typeof(MyMutatedMessage), topicName } + }), + new HandlerTypeToChannelDictionary(new Dictionary { + { typeof(MyMutatedMessageHandler), channelName } + }), + defaultNsqLookupdHttpEndpoints: new[] { "127.0.0.1:4161" }, + defaultThreadsPerHandler: 1, + defaultConsumerNsqConfig: new Config + { + MaxRequeueDelay = TimeSpan.Zero, + LookupdPollJitter = 0, + LookupdPollInterval = TimeSpan.FromSeconds(1) + }, + preCreateTopicsAndChannels: true, + messageMutator: new MessageMutator() + )); + + var bus = container.GetInstance(); + + bus.Send(new MyMutatedMessage { Text = "One" }); + + var dict = MyMutatedMessageHandler.GetReceived(); + + Assert.AreEqual(2, dict.Count, "dict.Count"); + + var firstMessage = dict.Keys.First(); + var secondMessage = dict.Keys.Skip(1).Single(); + + var expectedParentId = firstMessage.UniqueIdentifier.ToString(); + + Console.WriteLine(expectedParentId); + Console.WriteLine(dict[firstMessage].ParentId); + Console.WriteLine(dict[firstMessage].Text); + Console.WriteLine(dict[secondMessage].ParentId); + Console.WriteLine(dict[secondMessage].Text); + + Assert.AreEqual(null, dict[firstMessage].ParentId, "dict[firstMessage].ParentId"); + Assert.AreEqual("One", dict[firstMessage].Text, "dict[firstMessage].Text"); + Assert.AreEqual(expectedParentId, dict[secondMessage].ParentId, "dict[secondMessage].ParentId"); + Assert.AreEqual("Two", dict[secondMessage].Text, "dict[secondMessage].Text"); + + // get stats from http server + var stats = NsqdHttpApi.Stats("http://127.0.0.1:4151"); + + // assert stats from http server + var topic = stats.Topics.Single(p => p.TopicName == topicName); + var channel = topic.Channels.Single(p => p.ChannelName == channelName); + + Assert.AreEqual(2, topic.MessageCount, "topic.MessageCount"); + Assert.AreEqual(0, topic.Depth, "topic.Depth"); + Assert.AreEqual(0, topic.BackendDepth, "topic.BackendDepth"); + + Assert.AreEqual(2, channel.MessageCount, "channel.MessageCount"); + Assert.AreEqual(0, channel.DeferredCount, "channel.DeferredCount"); + Assert.AreEqual(0, channel.Depth, "channel.Depth"); + Assert.AreEqual(0, channel.BackendDepth, "channel.BackendDepth"); + Assert.AreEqual(0, channel.InFlightCount, "channel.InFlightCount"); + Assert.AreEqual(0, channel.TimeoutCount, "channel.TimeoutCount"); + Assert.AreEqual(0, channel.RequeueCount, "channel.RequeueCount"); + } + finally + { + BusService.Stop(); + + NsqdHttpApi.DeleteTopic("http://127.0.0.1:4151", topicName); + NsqdHttpApi.DeleteTopic("http://127.0.0.1:4161", topicName); + } + } + + public class MyMutatedMessage : IMessageWithParentId + { + public string ParentId { get; set; } + public string Text { get; set; } + } + + public interface IMessageWithParentId + { + string ParentId { get; set; } + } + + public class MessageMutator : IMessageMutator + { + public T GetMutatedMessage(IBus bus, T sentMessage) + { + var currentMessageInfo = bus.GetCurrentMessageInformation(); + + IMessageWithParentId messageWithParentId = sentMessage as IMessageWithParentId; + if (messageWithParentId != null) + { + if (currentMessageInfo != null) + messageWithParentId.ParentId = currentMessageInfo.UniqueIdentifier.ToString(); + } + + IMessageWithRouteIndex messageWithRouteIndex = sentMessage as IMessageWithRouteIndex; + if (messageWithRouteIndex != null && messageWithRouteIndex.RouteIndex == null) + { + if (currentMessageInfo != null) + { + var currentMessage = currentMessageInfo.DeserializedMessageBody as IMessageWithRouteIndex; + if (currentMessage != null) + messageWithRouteIndex.RouteIndex = currentMessage.RouteIndex; + } + } + + return sentMessage; + } + } + + public class MyMutatedMessageHandler : IHandleMessages + { + private static readonly Dictionary _received = + new Dictionary(); + private static readonly object _receivedLocker = new object(); + private static readonly AutoResetEvent _wait = new AutoResetEvent(initialState: false); + + private readonly IBus _bus; + + public MyMutatedMessageHandler(IBus bus) + { + _bus = bus; + } + + public void Handle(MyMutatedMessage message) + { + bool done = false; + lock (_receivedLocker) + { + _received.Add(_bus.GetCurrentMessageInformation(), message); + if (_received.Count == 2) + done = true; + else + _bus.Send(new MyMutatedMessage { Text = "Two" }); + } + + if (done) + _wait.Set(); + } + + public static Dictionary GetReceived() + { + _wait.WaitOne(TimeSpan.FromSeconds(10)); + lock (_receivedLocker) + { + return new Dictionary(_received); + } + } + } + + [Test] + public void MutatorPreservesPreviousMessageRoutingProperty() + { + var timestamp = DateTime.Now.UnixNano(); + string originalTopicName = string.Format("test_message_router_mutator_{0}", timestamp); + string topicName1 = string.Format("{0}_1", originalTopicName); + string topicName2 = string.Format("{0}_2", originalTopicName); + string childOriginalTopicName = string.Format("test_message_router_mutator_child_{0}", timestamp); + string childTopicName1 = string.Format("{0}_1", childOriginalTopicName); + string childTopicName2 = string.Format("{0}_2", childOriginalTopicName); + const string channelName = "test_message_router_mutator"; + + var container = new Container(); + + Console.WriteLine(originalTopicName); + Console.WriteLine(topicName1); + Console.WriteLine(topicName2); + Console.WriteLine(childOriginalTopicName); + Console.WriteLine(childTopicName1); + Console.WriteLine(childTopicName2); + + NsqdHttpApi.CreateTopic("http://127.0.0.1:4161", originalTopicName); + NsqdHttpApi.CreateChannel("http://127.0.0.1:4161", originalTopicName, channelName); + + NsqdHttpApi.CreateTopic("http://127.0.0.1:4161", topicName1); + NsqdHttpApi.CreateChannel("http://127.0.0.1:4161", topicName1, channelName); + + NsqdHttpApi.CreateTopic("http://127.0.0.1:4161", topicName2); + NsqdHttpApi.CreateChannel("http://127.0.0.1:4161", topicName2, channelName); + + NsqdHttpApi.CreateTopic("http://127.0.0.1:4161", childOriginalTopicName); + NsqdHttpApi.CreateChannel("http://127.0.0.1:4161", childOriginalTopicName, channelName); + + NsqdHttpApi.CreateTopic("http://127.0.0.1:4161", childTopicName1); + NsqdHttpApi.CreateChannel("http://127.0.0.1:4161", childTopicName1, channelName); + + NsqdHttpApi.CreateTopic("http://127.0.0.1:4161", childTopicName2); + NsqdHttpApi.CreateChannel("http://127.0.0.1:4161", childTopicName2, channelName); + + try + { + var messageTypeToTopicProvider = new MessageTypeToTopicDictionary(new Dictionary { + { typeof(MyRoutedMessage), originalTopicName }, + { typeof(MyMutatedRoutedMessage), childOriginalTopicName }, + }); + + BusService.Start(new BusConfiguration( + new StructureMapObjectBuilder(container), + new NewtonsoftJsonSerializer(typeof(JsonConverter).Assembly), + new MessageAuditorStub(), + messageTypeToTopicProvider, + new HandlerTypeToChannelDictionary(new Dictionary { + { typeof(MyRoutedMessageHandler), channelName }, + { typeof(MyMutatedRoutedMessageHandler), channelName }, + }), + defaultNsqLookupdHttpEndpoints: new[] { "127.0.0.1:4161" }, + defaultThreadsPerHandler: 1, + defaultConsumerNsqConfig: new Config + { + MaxRequeueDelay = TimeSpan.Zero, + LookupdPollJitter = 0, + LookupdPollInterval = TimeSpan.FromSeconds(1) + }, + preCreateTopicsAndChannels: true, + messageTopicRouter: new MessageTopicRouter(messageTypeToTopicProvider), + messageMutator: new MessageMutator() + )); + + var bus = container.GetInstance(); + + bus.Send(new MyRoutedMessage { Text = "One" } ); + bus.Send(new MyRoutedMessage { RouteIndex = 1, Text = "Two" }); + bus.Send(new MyRoutedMessage { RouteIndex = 2, Text = "Three" }); + + var dict1 = MyRoutedMessageHandler.GetReceived(); + var dict2 = MyMutatedRoutedMessageHandler.GetReceived(); + + Assert.AreEqual(3, dict1.Count, "dict.Count"); + Assert.AreEqual(3, dict2.Count, "dict2.Count"); + + var firstMessage = dict1.Single(p => p.Value.Text == "One"); + var secondMessage = dict1.Single(p => p.Value.Text == "Two"); + var thirdMessage = dict1.Single(p => p.Value.Text == "Three"); + + var childFirstMessage = dict2.Single(p => p.Value.Text == "One Child"); + var childSecondMessage = dict2.Single(p => p.Value.Text == "Two Child"); + var childThirdMessage = dict2.Single(p => p.Value.Text == "Three Child"); + + Console.WriteLine(firstMessage.Key.UniqueIdentifier); + Console.WriteLine(firstMessage.Value.Text); + Console.WriteLine(secondMessage.Key.UniqueIdentifier); + Console.WriteLine(secondMessage.Value.Text); + Console.WriteLine(thirdMessage.Key.UniqueIdentifier); + Console.WriteLine(thirdMessage.Value.Text); + + Console.WriteLine(childFirstMessage.Value.ParentId); + Console.WriteLine(childFirstMessage.Value.Text); + Console.WriteLine(childSecondMessage.Value.ParentId); + Console.WriteLine(childSecondMessage.Value.Text); + Console.WriteLine(childThirdMessage.Value.ParentId); + Console.WriteLine(childThirdMessage.Value.Text); + + Assert.AreEqual("One", firstMessage.Value.Text, "firstMessage.Value.Text"); + Assert.AreEqual("Two", secondMessage.Value.Text, "secondMessage.Value.Text"); + Assert.AreEqual("Three", thirdMessage.Value.Text, "thirdMessage.Value.Text"); + + Assert.AreEqual(firstMessage.Key.UniqueIdentifier.ToString(), childFirstMessage.Value.ParentId, + "childFirstMessage.Value.ParentId"); + Assert.AreEqual("One Child", childFirstMessage.Value.Text, "childFirstMessage.Value.Text"); + Assert.AreEqual(secondMessage.Key.UniqueIdentifier.ToString(), childSecondMessage.Value.ParentId, + "childSecondMessage.Value.ParentId"); + Assert.AreEqual("Two Child", childSecondMessage.Value.Text, "childSecondMessage.Value.Text"); + Assert.AreEqual(thirdMessage.Key.UniqueIdentifier.ToString(), childThirdMessage.Value.ParentId, + "childThirdMessage.Value.ParentId"); + Assert.AreEqual("Three Child", childThirdMessage.Value.Text, "childThirdMessage.Value.Text"); + + // get stats from http server + var stats = NsqdHttpApi.Stats("http://127.0.0.1:4151"); + + foreach (var topicName in new[] { + originalTopicName, topicName1, topicName2, + childOriginalTopicName, childTopicName1, childTopicName2 + }) + { + // assert received message topic/message match expectations + IMessageWithRouteIndex receivedMessage; + if (dict1.Any(p => p.Key.Topic == topicName)) + receivedMessage = dict1.Single(p => p.Key.Topic == topicName).Value; + else + receivedMessage = dict2.Single(p => p.Key.Topic == topicName).Value; + + int? expectedRouteIndex; + if (topicName == topicName1 || topicName == childTopicName1) + expectedRouteIndex = 1; + else if (topicName == topicName2 || topicName == childTopicName2) + expectedRouteIndex = 2; + else + expectedRouteIndex = null; + + Assert.AreEqual(expectedRouteIndex, receivedMessage.RouteIndex, "expectedRouteIndex"); + + // assert stats from http server + var topic = stats.Topics.Single(p => p.TopicName == topicName); + var channel = topic.Channels.Single(p => p.ChannelName == channelName); + + Assert.AreEqual(1, topic.MessageCount, "topic.MessageCount"); + Assert.AreEqual(0, topic.Depth, "topic.Depth"); + Assert.AreEqual(0, topic.BackendDepth, "topic.BackendDepth"); + + Assert.AreEqual(1, channel.MessageCount, "channel.MessageCount"); + Assert.AreEqual(0, channel.DeferredCount, "channel.DeferredCount"); + Assert.AreEqual(0, channel.Depth, "channel.Depth"); + Assert.AreEqual(0, channel.BackendDepth, "channel.BackendDepth"); + Assert.AreEqual(0, channel.InFlightCount, "channel.InFlightCount"); + Assert.AreEqual(0, channel.TimeoutCount, "channel.TimeoutCount"); + Assert.AreEqual(0, channel.RequeueCount, "channel.RequeueCount"); + } + } + finally + { + BusService.Stop(); + + NsqdHttpApi.DeleteTopic("http://127.0.0.1:4151", originalTopicName); + NsqdHttpApi.DeleteTopic("http://127.0.0.1:4161", originalTopicName); + + NsqdHttpApi.DeleteTopic("http://127.0.0.1:4151", topicName1); + NsqdHttpApi.DeleteTopic("http://127.0.0.1:4161", topicName1); + + NsqdHttpApi.DeleteTopic("http://127.0.0.1:4151", topicName2); + NsqdHttpApi.DeleteTopic("http://127.0.0.1:4161", topicName2); + + NsqdHttpApi.DeleteTopic("http://127.0.0.1:4151", childOriginalTopicName); + NsqdHttpApi.DeleteTopic("http://127.0.0.1:4161", childOriginalTopicName); + + NsqdHttpApi.DeleteTopic("http://127.0.0.1:4151", childTopicName1); + NsqdHttpApi.DeleteTopic("http://127.0.0.1:4161", childTopicName1); + + NsqdHttpApi.DeleteTopic("http://127.0.0.1:4151", childTopicName2); + NsqdHttpApi.DeleteTopic("http://127.0.0.1:4161", childTopicName2); + } + } + + public class MyRoutedMessage : IMessageWithRouteIndex + { + public int? RouteIndex { get; set; } + public string Text { get; set; } + } + + public class MyMutatedRoutedMessage : IMessageWithRouteIndex, IMessageWithParentId + { + public string ParentId { get; set; } + public int? RouteIndex { get; set; } + public string Text { get; set; } + } + + public interface IMessageWithRouteIndex + { + int? RouteIndex { get; set; } + } + + public class MessageTopicRouter : IMessageTopicRouter + { + private readonly IMessageTypeToTopicProvider _messageTypeToTopicProvider; + + public MessageTopicRouter(IMessageTypeToTopicProvider messageTypeToTopicProvider) + { + _messageTypeToTopicProvider = messageTypeToTopicProvider; + } + + public string GetMessageTopic(IBus bus, string originalTopic, T sentMessage) + { + var myRoutedMessage = sentMessage as IMessageWithRouteIndex; + if (myRoutedMessage != null) + { + if (myRoutedMessage.RouteIndex == null) + return originalTopic; + else + return string.Format("{0}_{1}", originalTopic, myRoutedMessage.RouteIndex); + } + else + { + return originalTopic; + } + } + + public string[] GetTopics(Type messageType) + { + string originalTopic = _messageTypeToTopicProvider.GetTopic(messageType); + + if (messageType.GetInterfaces().Contains(typeof(IMessageWithRouteIndex))) + { + return new[] + { + originalTopic, + string.Format("{0}_1", originalTopic), + string.Format("{0}_2", originalTopic), + }; + } + else + { + return new[] { originalTopic }; + } + } + } + + public class MyRoutedMessageHandler : IHandleMessages + { + private static readonly Dictionary _received = + new Dictionary(); + private static readonly object _receivedLocker = new object(); + private static readonly AutoResetEvent _wait = new AutoResetEvent(initialState: false); + + private readonly IBus _bus; + + public MyRoutedMessageHandler(IBus bus) + { + _bus = bus; + } + + public void Handle(MyRoutedMessage message) + { + bool done = false; + lock (_receivedLocker) + { + _received.Add(_bus.GetCurrentMessageInformation(), message); + _bus.Send(new MyMutatedRoutedMessage { Text = message.Text + " Child" }); + if (_received.Count == 3) + done = true; + } + + if (done) + _wait.Set(); + } + + public static Dictionary GetReceived() + { + _wait.WaitOne(TimeSpan.FromSeconds(10)); + lock (_receivedLocker) + { + return new Dictionary(_received); + } + } + } + + public class MyMutatedRoutedMessageHandler : IHandleMessages + { + private static readonly Dictionary _received = + new Dictionary(); + private static readonly object _receivedLocker = new object(); + private static readonly AutoResetEvent _wait = new AutoResetEvent(initialState: false); + + private readonly IBus _bus; + + public MyMutatedRoutedMessageHandler(IBus bus) + { + _bus = bus; + } + + public void Handle(MyMutatedRoutedMessage message) + { + bool done = false; + lock (_receivedLocker) + { + _received.Add(_bus.GetCurrentMessageInformation(), message); + if (_received.Count == 3) + done = true; + } + + if (done) + _wait.Set(); + } + + public static Dictionary GetReceived() + { + _wait.WaitOne(TimeSpan.FromSeconds(10)); + lock (_receivedLocker) + { + return new Dictionary(_received); + } + } + } + } +} diff --git a/NsqSharp.Tests/Bus/MessageRouterTest.cs b/NsqSharp.Tests/Bus/MessageRouterTest.cs new file mode 100644 index 0000000..984c76c --- /dev/null +++ b/NsqSharp.Tests/Bus/MessageRouterTest.cs @@ -0,0 +1,223 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using Newtonsoft.Json; +using NsqSharp.Bus; +using NsqSharp.Bus.Configuration; +using NsqSharp.Bus.Configuration.BuiltIn; +using NsqSharp.Bus.Configuration.Providers; +using NsqSharp.Tests.Bus.TestFakes; +using NsqSharp.Utils.Extensions; +using NUnit.Framework; +using StructureMap; + +namespace NsqSharp.Tests.Bus +{ +#if !RUN_INTEGRATION_TESTS + [TestFixture(IgnoreReason = "NSQD Integration Test")] +#else + [TestFixture] +#endif + public class MessageRouterTest + { + [Test] + public void RoutingByProperty() + { + var timestamp = DateTime.Now.UnixNano(); + string originalTopicName = string.Format("test_message_router_{0}", timestamp); + string topicName1 = string.Format("{0}_1", originalTopicName); + string topicName2 = string.Format("{0}_2", originalTopicName); + const string channelName = "test_message_router"; + + var container = new Container(); + + NsqdHttpApi.CreateTopic("http://127.0.0.1:4161", originalTopicName); + NsqdHttpApi.CreateChannel("http://127.0.0.1:4161", originalTopicName, channelName); + + NsqdHttpApi.CreateTopic("http://127.0.0.1:4161", topicName1); + NsqdHttpApi.CreateChannel("http://127.0.0.1:4161", topicName1, channelName); + + NsqdHttpApi.CreateTopic("http://127.0.0.1:4161", topicName2); + NsqdHttpApi.CreateChannel("http://127.0.0.1:4161", topicName2, channelName); + + try + { + var messageTypeToTopicProvider = new MessageTypeToTopicDictionary(new Dictionary { + { typeof(MyRoutedMessage), originalTopicName } + }); + + BusService.Start(new BusConfiguration( + new StructureMapObjectBuilder(container), + new NewtonsoftJsonSerializer(typeof(JsonConverter).Assembly), + new MessageAuditorStub(), + messageTypeToTopicProvider, + new HandlerTypeToChannelDictionary(new Dictionary { + { typeof(MyRoutedMessageHandler), channelName } + }), + defaultNsqLookupdHttpEndpoints: new[] { "127.0.0.1:4161" }, + defaultThreadsPerHandler: 1, + defaultConsumerNsqConfig: new Config + { + MaxRequeueDelay = TimeSpan.Zero, + LookupdPollJitter = 0, + LookupdPollInterval = TimeSpan.FromSeconds(1) + }, + preCreateTopicsAndChannels: true, + messageTopicRouter: new MessageTopicRouter(messageTypeToTopicProvider) + )); + + var bus = container.GetInstance(); + + bus.Send(new MyRoutedMessage()); + bus.Send(new MyRoutedMessage { RouteIndex = 1 }); + bus.Send(new MyRoutedMessage { RouteIndex = 2 }); + + var dict = MyRoutedMessageHandler.GetReceived(); + + Assert.AreEqual(3, dict.Count, "dict.Count"); + + // get stats from http server + var stats = NsqdHttpApi.Stats("http://127.0.0.1:4151"); + + foreach (var topicName in new[] { originalTopicName, topicName1, topicName2 }) + { + // assert received message topic/message match expectations + var receivedMessage = dict.Single(p => p.Key.Topic == topicName); + + int expectedRouteIndex; + if (topicName == topicName1) + expectedRouteIndex = 1; + else if (topicName == topicName2) + expectedRouteIndex = 2; + else + expectedRouteIndex = 0; + + Assert.AreEqual(expectedRouteIndex, receivedMessage.Value.RouteIndex, "expectedRouteIndex"); + + // assert stats from http server + var topic = stats.Topics.Single(p => p.TopicName == topicName); + var channel = topic.Channels.Single(p => p.ChannelName == channelName); + + Assert.AreEqual(1, topic.MessageCount, "topic.MessageCount"); + Assert.AreEqual(0, topic.Depth, "topic.Depth"); + Assert.AreEqual(0, topic.BackendDepth, "topic.BackendDepth"); + + Assert.AreEqual(1, channel.MessageCount, "channel.MessageCount"); + Assert.AreEqual(0, channel.DeferredCount, "channel.DeferredCount"); + Assert.AreEqual(0, channel.Depth, "channel.Depth"); + Assert.AreEqual(0, channel.BackendDepth, "channel.BackendDepth"); + Assert.AreEqual(0, channel.InFlightCount, "channel.InFlightCount"); + Assert.AreEqual(0, channel.TimeoutCount, "channel.TimeoutCount"); + Assert.AreEqual(0, channel.RequeueCount, "channel.RequeueCount"); + } + } + finally + { + BusService.Stop(); + + NsqdHttpApi.DeleteTopic("http://127.0.0.1:4151", originalTopicName); + NsqdHttpApi.DeleteTopic("http://127.0.0.1:4161", originalTopicName); + + NsqdHttpApi.DeleteTopic("http://127.0.0.1:4151", topicName1); + NsqdHttpApi.DeleteTopic("http://127.0.0.1:4161", topicName1); + + NsqdHttpApi.DeleteTopic("http://127.0.0.1:4151", topicName2); + NsqdHttpApi.DeleteTopic("http://127.0.0.1:4161", topicName2); + } + } + + public class MyRoutedMessage : IMessageWithRouteIndex + { + public int RouteIndex { get; set; } + } + + public interface IMessageWithRouteIndex + { + int RouteIndex { get; } + } + + public class MessageTopicRouter : IMessageTopicRouter + { + private readonly IMessageTypeToTopicProvider _messageTypeToTopicProvider; + + public MessageTopicRouter(IMessageTypeToTopicProvider messageTypeToTopicProvider) + { + _messageTypeToTopicProvider = messageTypeToTopicProvider; + } + + public string GetMessageTopic(IBus bus, string originalTopic, T sentMessage) + { + var myRoutedMessage = sentMessage as IMessageWithRouteIndex; + if (myRoutedMessage != null) + { + if (myRoutedMessage.RouteIndex == 0) + return originalTopic; + else + return string.Format("{0}_{1}", originalTopic, myRoutedMessage.RouteIndex); + } + else + { + return originalTopic; + } + } + + public string[] GetTopics(Type messageType) + { + string originalTopic = _messageTypeToTopicProvider.GetTopic(messageType); + + if (messageType.GetInterfaces().Contains(typeof(IMessageWithRouteIndex))) + { + return new[] + { + originalTopic, + string.Format("{0}_1", originalTopic), + string.Format("{0}_2", originalTopic), + }; + } + else + { + return new[] { originalTopic }; + } + } + } + + public class MyRoutedMessageHandler : IHandleMessages + { + private static readonly Dictionary _received = + new Dictionary(); + private static readonly object _receivedLocker = new object(); + private static readonly AutoResetEvent _wait = new AutoResetEvent(initialState: false); + + private readonly IBus _bus; + + public MyRoutedMessageHandler(IBus bus) + { + _bus = bus; + } + + public void Handle(MyRoutedMessage message) + { + bool done = false; + lock (_receivedLocker) + { + _received.Add(_bus.GetCurrentMessageInformation(), message); + if (_received.Count == 3) + done = true; + } + + if (done) + _wait.Set(); + } + + public static Dictionary GetReceived() + { + _wait.WaitOne(TimeSpan.FromSeconds(10)); + lock (_receivedLocker) + { + return new Dictionary(_received); + } + } + } + } +} diff --git a/NsqSharp.Tests/NsqSharp.Tests.csproj b/NsqSharp.Tests/NsqSharp.Tests.csproj index dd1388b..216cb93 100644 --- a/NsqSharp.Tests/NsqSharp.Tests.csproj +++ b/NsqSharp.Tests/NsqSharp.Tests.csproj @@ -70,6 +70,8 @@ + + diff --git a/NsqSharp/Bus/Configuration/BusConfiguration.cs b/NsqSharp/Bus/Configuration/BusConfiguration.cs index 8d239c6..4c2deab 100644 --- a/NsqSharp/Bus/Configuration/BusConfiguration.cs +++ b/NsqSharp/Bus/Configuration/BusConfiguration.cs @@ -149,16 +149,27 @@ private void AddMessageHandlers(IEnumerable handlerTypes) } Type messageType = handlerMessageTypes[0]; + List topics; - string topic; - try + if (_messageTopicRouter == null) { - topic = _messageTypeToTopicProvider.GetTopic(messageType); + string topic; + try + { + topic = _messageTypeToTopicProvider.GetTopic(messageType); + } + catch (Exception ex) + { + throw new Exception(string.Format( + "Topic for message type '{0}' not registered.", messageType.FullName), ex); + } + + topics = new List(); + topics.Add(topic); } - catch (Exception ex) + else { - throw new Exception(string.Format( - "Topic for message type '{0}' not registered.", messageType.FullName), ex); + topics = new List(_messageTopicRouter.GetTopics(messageType)); } string channel; @@ -172,7 +183,10 @@ private void AddMessageHandlers(IEnumerable handlerTypes) "Channel for handler type '{0}' not registered.", handlerType.FullName), ex); } - AddMessageHandler(handlerType, messageType, topic, channel); + foreach (var topic in topics) + { + AddMessageHandler(handlerType, messageType, topic, channel); + } } else { diff --git a/NsqSharp/Bus/Configuration/IMessageTopicRouter.cs b/NsqSharp/Bus/Configuration/IMessageTopicRouter.cs index f207a03..042ce46 100644 --- a/NsqSharp/Bus/Configuration/IMessageTopicRouter.cs +++ b/NsqSharp/Bus/Configuration/IMessageTopicRouter.cs @@ -1,4 +1,5 @@ -using NsqSharp.Bus.Configuration.Providers; +using System; +using NsqSharp.Bus.Configuration.Providers; namespace NsqSharp.Bus.Configuration { @@ -17,5 +18,14 @@ public interface IMessageTopicRouter /// The message about to be sent. /// The topic to send this message on. string GetMessageTopic(IBus bus, string originalTopic, T sentMessage); + + /// + /// Gets the topics a specified can be produced/published on based on + /// the implementation of . + /// + /// The message type. See . + /// The topics the specified can be produced/published on. based on + /// the implementation of . + string[] GetTopics(Type messageType); } } diff --git a/NsqSharp/Bus/ICurrentMessageInformation.cs b/NsqSharp/Bus/ICurrentMessageInformation.cs index 0e4effd..c990cf6 100644 --- a/NsqSharp/Bus/ICurrentMessageInformation.cs +++ b/NsqSharp/Bus/ICurrentMessageInformation.cs @@ -15,5 +15,7 @@ public interface ICurrentMessageInformation string Channel { get; } /// The message. Message Message { get; } + /// The deserialized message body (can be null). + object DeserializedMessageBody { get; } } } diff --git a/NsqSharp/Properties/AssemblyInfo.cs b/NsqSharp/Properties/AssemblyInfo.cs index 53059c3..9e6a8fe 100644 --- a/NsqSharp/Properties/AssemblyInfo.cs +++ b/NsqSharp/Properties/AssemblyInfo.cs @@ -31,4 +31,4 @@ // You can specify all the values or you can default the Build and Revision Numbers // by using the '*' as shown below: // [assembly: AssemblyVersion("1.0.*")] -[assembly: AssemblyVersion("0.3.4")] +[assembly: AssemblyVersion("0.4.0")] diff --git a/nuget/NsqSharp.nuspec b/nuget/NsqSharp.nuspec index 598d66e..dbe1515 100644 --- a/nuget/NsqSharp.nuspec +++ b/nuget/NsqSharp.nuspec @@ -3,7 +3,7 @@ NsqSharp NsqSharp - 0.3.4 + 0.4.0 Judson White Judson White https://raw.githubusercontent.com/judwhite/NsqSharp/master/LICENSE