Skip to content

Commit

Permalink
add method to IMessageTopicRouter to get all routable topics for a me…
Browse files Browse the repository at this point in the history
…ssage type; add MessageRouter and MesageMutator tests;
  • Loading branch information
judwhite committed Apr 14, 2015
1 parent 44ae415 commit 311d330
Show file tree
Hide file tree
Showing 8 changed files with 773 additions and 10 deletions.
512 changes: 512 additions & 0 deletions NsqSharp.Tests/Bus/MessageMutatorTest.cs

Large diffs are not rendered by default.

223 changes: 223 additions & 0 deletions NsqSharp.Tests/Bus/MessageRouterTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using Newtonsoft.Json;
using NsqSharp.Bus;
using NsqSharp.Bus.Configuration;
using NsqSharp.Bus.Configuration.BuiltIn;
using NsqSharp.Bus.Configuration.Providers;
using NsqSharp.Tests.Bus.TestFakes;
using NsqSharp.Utils.Extensions;
using NUnit.Framework;
using StructureMap;

namespace NsqSharp.Tests.Bus
{
#if !RUN_INTEGRATION_TESTS
[TestFixture(IgnoreReason = "NSQD Integration Test")]
#else
[TestFixture]
#endif
public class MessageRouterTest
{
[Test]
public void RoutingByProperty()
{
var timestamp = DateTime.Now.UnixNano();
string originalTopicName = string.Format("test_message_router_{0}", timestamp);
string topicName1 = string.Format("{0}_1", originalTopicName);
string topicName2 = string.Format("{0}_2", originalTopicName);
const string channelName = "test_message_router";

var container = new Container();

NsqdHttpApi.CreateTopic("http://127.0.0.1:4161", originalTopicName);
NsqdHttpApi.CreateChannel("http://127.0.0.1:4161", originalTopicName, channelName);

NsqdHttpApi.CreateTopic("http://127.0.0.1:4161", topicName1);
NsqdHttpApi.CreateChannel("http://127.0.0.1:4161", topicName1, channelName);

NsqdHttpApi.CreateTopic("http://127.0.0.1:4161", topicName2);
NsqdHttpApi.CreateChannel("http://127.0.0.1:4161", topicName2, channelName);

try
{
var messageTypeToTopicProvider = new MessageTypeToTopicDictionary(new Dictionary<Type, string> {
{ typeof(MyRoutedMessage), originalTopicName }
});

BusService.Start(new BusConfiguration(
new StructureMapObjectBuilder(container),
new NewtonsoftJsonSerializer(typeof(JsonConverter).Assembly),
new MessageAuditorStub(),
messageTypeToTopicProvider,
new HandlerTypeToChannelDictionary(new Dictionary<Type, string> {
{ typeof(MyRoutedMessageHandler), channelName }
}),
defaultNsqLookupdHttpEndpoints: new[] { "127.0.0.1:4161" },
defaultThreadsPerHandler: 1,
defaultConsumerNsqConfig: new Config
{
MaxRequeueDelay = TimeSpan.Zero,
LookupdPollJitter = 0,
LookupdPollInterval = TimeSpan.FromSeconds(1)
},
preCreateTopicsAndChannels: true,
messageTopicRouter: new MessageTopicRouter(messageTypeToTopicProvider)
));

var bus = container.GetInstance<IBus>();

bus.Send(new MyRoutedMessage());
bus.Send(new MyRoutedMessage { RouteIndex = 1 });
bus.Send(new MyRoutedMessage { RouteIndex = 2 });

var dict = MyRoutedMessageHandler.GetReceived();

Assert.AreEqual(3, dict.Count, "dict.Count");

// get stats from http server
var stats = NsqdHttpApi.Stats("http://127.0.0.1:4151");

foreach (var topicName in new[] { originalTopicName, topicName1, topicName2 })
{
// assert received message topic/message match expectations
var receivedMessage = dict.Single(p => p.Key.Topic == topicName);

int expectedRouteIndex;
if (topicName == topicName1)
expectedRouteIndex = 1;
else if (topicName == topicName2)
expectedRouteIndex = 2;
else
expectedRouteIndex = 0;

Assert.AreEqual(expectedRouteIndex, receivedMessage.Value.RouteIndex, "expectedRouteIndex");

// assert stats from http server
var topic = stats.Topics.Single(p => p.TopicName == topicName);
var channel = topic.Channels.Single(p => p.ChannelName == channelName);

Assert.AreEqual(1, topic.MessageCount, "topic.MessageCount");
Assert.AreEqual(0, topic.Depth, "topic.Depth");
Assert.AreEqual(0, topic.BackendDepth, "topic.BackendDepth");

Assert.AreEqual(1, channel.MessageCount, "channel.MessageCount");
Assert.AreEqual(0, channel.DeferredCount, "channel.DeferredCount");
Assert.AreEqual(0, channel.Depth, "channel.Depth");
Assert.AreEqual(0, channel.BackendDepth, "channel.BackendDepth");
Assert.AreEqual(0, channel.InFlightCount, "channel.InFlightCount");
Assert.AreEqual(0, channel.TimeoutCount, "channel.TimeoutCount");
Assert.AreEqual(0, channel.RequeueCount, "channel.RequeueCount");
}
}
finally
{
BusService.Stop();

NsqdHttpApi.DeleteTopic("http://127.0.0.1:4151", originalTopicName);
NsqdHttpApi.DeleteTopic("http://127.0.0.1:4161", originalTopicName);

NsqdHttpApi.DeleteTopic("http://127.0.0.1:4151", topicName1);
NsqdHttpApi.DeleteTopic("http://127.0.0.1:4161", topicName1);

NsqdHttpApi.DeleteTopic("http://127.0.0.1:4151", topicName2);
NsqdHttpApi.DeleteTopic("http://127.0.0.1:4161", topicName2);
}
}

public class MyRoutedMessage : IMessageWithRouteIndex
{
public int RouteIndex { get; set; }
}

public interface IMessageWithRouteIndex
{
int RouteIndex { get; }
}

public class MessageTopicRouter : IMessageTopicRouter
{
private readonly IMessageTypeToTopicProvider _messageTypeToTopicProvider;

public MessageTopicRouter(IMessageTypeToTopicProvider messageTypeToTopicProvider)
{
_messageTypeToTopicProvider = messageTypeToTopicProvider;
}

public string GetMessageTopic<T>(IBus bus, string originalTopic, T sentMessage)
{
var myRoutedMessage = sentMessage as IMessageWithRouteIndex;
if (myRoutedMessage != null)
{
if (myRoutedMessage.RouteIndex == 0)
return originalTopic;
else
return string.Format("{0}_{1}", originalTopic, myRoutedMessage.RouteIndex);
}
else
{
return originalTopic;
}
}

public string[] GetTopics(Type messageType)
{
string originalTopic = _messageTypeToTopicProvider.GetTopic(messageType);

if (messageType.GetInterfaces().Contains(typeof(IMessageWithRouteIndex)))
{
return new[]
{
originalTopic,
string.Format("{0}_1", originalTopic),
string.Format("{0}_2", originalTopic),
};
}
else
{
return new[] { originalTopic };
}
}
}

public class MyRoutedMessageHandler : IHandleMessages<MyRoutedMessage>
{
private static readonly Dictionary<ICurrentMessageInformation, MyRoutedMessage> _received =
new Dictionary<ICurrentMessageInformation, MyRoutedMessage>();
private static readonly object _receivedLocker = new object();
private static readonly AutoResetEvent _wait = new AutoResetEvent(initialState: false);

private readonly IBus _bus;

public MyRoutedMessageHandler(IBus bus)
{
_bus = bus;
}

public void Handle(MyRoutedMessage message)
{
bool done = false;
lock (_receivedLocker)
{
_received.Add(_bus.GetCurrentMessageInformation(), message);
if (_received.Count == 3)
done = true;
}

if (done)
_wait.Set();
}

public static Dictionary<ICurrentMessageInformation, MyRoutedMessage> GetReceived()
{
_wait.WaitOne(TimeSpan.FromSeconds(10));
lock (_receivedLocker)
{
return new Dictionary<ICurrentMessageInformation, MyRoutedMessage>(_received);
}
}
}
}
}
2 changes: 2 additions & 0 deletions NsqSharp.Tests/NsqSharp.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@
<ItemGroup>
<Compile Include="Bus\BusCurrentMessageTest.cs" />
<Compile Include="Bus\DeferTest.cs" />
<Compile Include="Bus\MessageMutatorTest.cs" />
<Compile Include="Bus\MessageRouterTest.cs" />
<Compile Include="Bus\MultiImplementIHandleMessagesTest.cs" />
<Compile Include="Bus\TestFakes\MessageAuditorStub.cs" />
<Compile Include="Bus\TouchTest.cs" />
Expand Down
28 changes: 21 additions & 7 deletions NsqSharp/Bus/Configuration/BusConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -149,16 +149,27 @@ private void AddMessageHandlers(IEnumerable<Type> handlerTypes)
}

Type messageType = handlerMessageTypes[0];
List<string> topics;

string topic;
try
if (_messageTopicRouter == null)
{
topic = _messageTypeToTopicProvider.GetTopic(messageType);
string topic;
try
{
topic = _messageTypeToTopicProvider.GetTopic(messageType);
}
catch (Exception ex)
{
throw new Exception(string.Format(
"Topic for message type '{0}' not registered.", messageType.FullName), ex);
}

topics = new List<string>();
topics.Add(topic);
}
catch (Exception ex)
else
{
throw new Exception(string.Format(
"Topic for message type '{0}' not registered.", messageType.FullName), ex);
topics = new List<string>(_messageTopicRouter.GetTopics(messageType));
}

string channel;
Expand All @@ -172,7 +183,10 @@ private void AddMessageHandlers(IEnumerable<Type> handlerTypes)
"Channel for handler type '{0}' not registered.", handlerType.FullName), ex);
}

AddMessageHandler(handlerType, messageType, topic, channel);
foreach (var topic in topics)
{
AddMessageHandler(handlerType, messageType, topic, channel);
}
}
else
{
Expand Down
12 changes: 11 additions & 1 deletion NsqSharp/Bus/Configuration/IMessageTopicRouter.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using NsqSharp.Bus.Configuration.Providers;
using System;
using NsqSharp.Bus.Configuration.Providers;

namespace NsqSharp.Bus.Configuration
{
Expand All @@ -17,5 +18,14 @@ public interface IMessageTopicRouter
/// <param name="sentMessage">The message about to be sent.</param>
/// <returns>The topic to send this message on.</returns>
string GetMessageTopic<T>(IBus bus, string originalTopic, T sentMessage);

/// <summary>
/// Gets the topics a specified <paramref name="messageType"/> can be produced/published on based on
/// the implementation of <see cref="GetMessageTopic&lt;T&gt;"/>.
/// </summary>
/// <param name="messageType">The message type. See <see cref="IHandleMessages&lt;T&gt;"/>.</param>
/// <returns>The topics the specified <paramref name="messageType"/> can be produced/published on. based on
/// the implementation of <see cref="GetMessageTopic&lt;T&gt;"/>.</returns>
string[] GetTopics(Type messageType);
}
}
2 changes: 2 additions & 0 deletions NsqSharp/Bus/ICurrentMessageInformation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,7 @@ public interface ICurrentMessageInformation
string Channel { get; }
/// <summary>The message.</summary>
Message Message { get; }
/// <summary>The deserialized message body (can be <c>null</c>).</summary>
object DeserializedMessageBody { get; }
}
}
2 changes: 1 addition & 1 deletion NsqSharp/Properties/AssemblyInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,4 @@
// You can specify all the values or you can default the Build and Revision Numbers
// by using the '*' as shown below:
// [assembly: AssemblyVersion("1.0.*")]
[assembly: AssemblyVersion("0.3.4")]
[assembly: AssemblyVersion("0.4.0")]
2 changes: 1 addition & 1 deletion nuget/NsqSharp.nuspec
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<metadata>
<id>NsqSharp</id>
<title>NsqSharp</title>
<version>0.3.4</version>
<version>0.4.0</version>
<authors>Judson White</authors>
<owners>Judson White</owners>
<licenseUrl>https://raw.githubusercontent.com/judwhite/NsqSharp/master/LICENSE</licenseUrl>
Expand Down

0 comments on commit 311d330

Please sign in to comment.