Skip to content

Commit

Permalink
add IMessageMutator and IMessageTopicRouter configuration interfaces
Browse files Browse the repository at this point in the history
change CurrentMessageInformation implementation to ThreadStatic
  • Loading branch information
judwhite committed Apr 13, 2015
1 parent ec6569c commit 44ae415
Show file tree
Hide file tree
Showing 6 changed files with 226 additions and 130 deletions.
15 changes: 13 additions & 2 deletions NsqSharp/Bus/Configuration/BusConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public class BusConfiguration : IBusConfiguration
private readonly IBusStateChangedHandler _busStateChangedHandler;
private readonly ILogger _nsqLogger;
private readonly bool _preCreateTopicsAndChannels;
private readonly IMessageMutator _messageMutator;
private readonly IMessageTopicRouter _messageTopicRouter;

private NsqBus _bus;

Expand All @@ -53,6 +55,9 @@ public class BusConfiguration : IBusConfiguration
/// <param name="preCreateTopicsAndChannels">Set to <c>true</c> to pre-create all registered topics and channels
/// on the local nsqd instance listening on 127.0.0.1:4151; useful for self-contained clusters (default =
/// <c>false</c>).</param>
/// <param name="messageMutator">The message mutator used to modify a message before it's sent (optional).</param>
/// <param name="messageTopicRouter">The message router used to specify custom message-to-topic routing logic; used
/// to override <paramref name="messageTypeToTopicProvider"/> (optional).</param>
public BusConfiguration(
IObjectBuilder dependencyInjectionContainer,
IMessageSerializer defaultMessageSerializer,
Expand All @@ -64,7 +69,9 @@ public BusConfiguration(
Config defaultConsumerNsqConfig = null,
IBusStateChangedHandler busStateChangedHandler = null,
ILogger nsqLogger = null,
bool preCreateTopicsAndChannels = false
bool preCreateTopicsAndChannels = false,
IMessageMutator messageMutator = null,
IMessageTopicRouter messageTopicRouter = null
)
{
if (dependencyInjectionContainer == null)
Expand Down Expand Up @@ -99,6 +106,8 @@ public BusConfiguration(
_busStateChangedHandler = busStateChangedHandler;
_nsqLogger = nsqLogger ?? new TraceLogger();
_preCreateTopicsAndChannels = preCreateTopicsAndChannels;
_messageMutator = messageMutator;
_messageTopicRouter = messageTopicRouter;

var handlerTypes = _handlerTypeToChannelProvider.GetHandlerTypes();
AddMessageHandlers(handlerTypes);
Expand Down Expand Up @@ -329,7 +338,9 @@ internal void StartBus()
_messageTypeToTopicProvider,
_defaultMessageSerializer,
_defaultNsqdHttpEndpoints,
_nsqLogger
_nsqLogger,
_messageMutator,
_messageTopicRouter
);

_bus.Start();
Expand Down
17 changes: 17 additions & 0 deletions NsqSharp/Bus/Configuration/IMessageMutator.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
namespace NsqSharp.Bus.Configuration
{
/// <summary>
/// Implement this interface to modify a message before it is sent.
/// </summary>
public interface IMessageMutator
{
/// <summary>
/// Gets a mutated message before it is sent.
/// </summary>
/// <typeparam name="T">The message type.</typeparam>
/// <param name="bus">The bus sending this message.</param>
/// <param name="sentMessage">The message about to be sent.</param>
/// <returns>The mutated message.</returns>
T GetMutatedMessage<T>(IBus bus, T sentMessage);
}
}
21 changes: 21 additions & 0 deletions NsqSharp/Bus/Configuration/IMessageTopicRouter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
using NsqSharp.Bus.Configuration.Providers;

namespace NsqSharp.Bus.Configuration
{
/// <summary>
/// Implement this interface to specify custom message-to-topic routing logic based on a message object about to be sent.
/// </summary>
public interface IMessageTopicRouter
{
/// <summary>
/// Gets the topic a message should be sent on.
/// </summary>
/// <typeparam name="T">The message type.</typeparam>
/// <param name="bus">The bus sending this message.</param>
/// <param name="originalTopic">The original topic name as provided by the implementation
/// of <see cref="IMessageTypeToTopicProvider"/> passed to this bus.</param>
/// <param name="sentMessage">The message about to be sent.</param>
/// <returns>The topic to send this message on.</returns>
string GetMessageTopic<T>(IBus bus, string originalTopic, T sentMessage);
}
}
181 changes: 87 additions & 94 deletions NsqSharp/Bus/MessageDistributor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,111 +75,104 @@ MessageHandlerMetadata messageHandlerMetadata

public void HandleMessage(Message message)
{
var messageInformation = new MessageInformation
{
UniqueIdentifier = Guid.NewGuid(),
Topic = _topic,
Channel = _channel,
HandlerType = _handlerType,
MessageType = _messageType,
Message = message,
DeserializedMessageBody = null,
Started = DateTime.UtcNow
};

_bus.SetCurrentMessageInformation(messageInformation);

// Get handler
object handler;
try
{
var messageInformation = new MessageInformation
{
UniqueIdentifier = Guid.NewGuid(),
Topic = _topic,
Channel = _channel,
HandlerType = _handlerType,
MessageType = _messageType,
Message = message,
DeserializedMessageBody = null,
Started = DateTime.UtcNow
};

_bus.AddMessage(messageInformation);

// Get handler
object handler;
try
{
handler = _objectBuilder.GetInstance(_handlerType);
messageInformation.HandlerType = handler.GetType();
}
catch (Exception ex)
{
messageInformation.Finished = DateTime.UtcNow;

_messageAuditor.TryOnFailed(_logger, _bus,
new FailedMessageInformation
(
messageInformation,
FailedMessageQueueAction.Finish,
FailedMessageReason.HandlerConstructor,
ex
)
);
handler = _objectBuilder.GetInstance(_handlerType);
messageInformation.HandlerType = handler.GetType();
}
catch (Exception ex)
{
messageInformation.Finished = DateTime.UtcNow;

message.Finish();
return;
}
_messageAuditor.TryOnFailed(_logger, _bus,
new FailedMessageInformation
(
messageInformation,
FailedMessageQueueAction.Finish,
FailedMessageReason.HandlerConstructor,
ex
)
);

message.Finish();
return;
}

// Get deserialized value
object value;
try
{
value = _serializer.Deserialize(_concreteMessageType, message.Body);
}
catch (Exception ex)
{
messageInformation.Finished = DateTime.UtcNow;

_messageAuditor.TryOnFailed(_logger, _bus,
new FailedMessageInformation
(
messageInformation,
FailedMessageQueueAction.Finish,
FailedMessageReason.MessageDeserialization,
ex
)
);
// Get deserialized value
object value;
try
{
value = _serializer.Deserialize(_concreteMessageType, message.Body);
}
catch (Exception ex)
{
messageInformation.Finished = DateTime.UtcNow;

message.Finish();
return;
}
_messageAuditor.TryOnFailed(_logger, _bus,
new FailedMessageInformation
(
messageInformation,
FailedMessageQueueAction.Finish,
FailedMessageReason.MessageDeserialization,
ex
)
);

message.Finish();
return;
}

// Handle message
messageInformation.DeserializedMessageBody = value;
_messageAuditor.TryOnReceived(_logger, _bus, messageInformation);
// Handle message
messageInformation.DeserializedMessageBody = value;
_messageAuditor.TryOnReceived(_logger, _bus, messageInformation);

try
{
_handleMethod.Invoke(handler, new[] { value });
}
catch (Exception ex)
{
bool requeue = (message.Attempts < message.MaxAttempts);

messageInformation.Finished = DateTime.UtcNow;

if (requeue)
message.Requeue();
else
message.Finish();

_messageAuditor.TryOnFailed(_logger, _bus,
new FailedMessageInformation
(
messageInformation,
requeue ? FailedMessageQueueAction.Requeue : FailedMessageQueueAction.Finish,
requeue ? FailedMessageReason.HandlerException : FailedMessageReason.MaxAttemptsExceeded,
ex
)
);

return;
}
try
{
_handleMethod.Invoke(handler, new[] { value });
}
catch (Exception ex)
{
bool requeue = (message.Attempts < message.MaxAttempts);

messageInformation.Finished = DateTime.UtcNow;

_messageAuditor.TryOnSucceeded(_logger, _bus, messageInformation);
}
finally
{
_bus.RemoveMessage();
if (requeue)
message.Requeue();
else
message.Finish();

_messageAuditor.TryOnFailed(_logger, _bus,
new FailedMessageInformation
(
messageInformation,
requeue ? FailedMessageQueueAction.Requeue : FailedMessageQueueAction.Finish,
requeue ? FailedMessageReason.HandlerException : FailedMessageReason.MaxAttemptsExceeded,
ex
)
);

return;
}

messageInformation.Finished = DateTime.UtcNow;

_messageAuditor.TryOnSucceeded(_logger, _bus, messageInformation);
}

public void LogFailedMessage(Message message)
Expand Down
Loading

0 comments on commit 44ae415

Please sign in to comment.