Skip to content

Commit

Permalink
Fixed MassTransit#4420 - consistent application of supported message …
Browse files Browse the repository at this point in the history
…types across Quartz/HangFire and Outbox
  • Loading branch information
phatboyg committed Jun 6, 2023
1 parent 0800562 commit 964a8d9
Show file tree
Hide file tree
Showing 36 changed files with 386 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ public ISerialization Serialization
set => _context.Serialization = value;
}

public string[] SupportedMessageTypes
{
get => _context.SupportedMessageTypes;
set => _context.SupportedMessageTypes = value;
}

public long? BodyLength => _context.BodyLength;

public SendContext<T> CreateProxy<T>(T message)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,12 @@ public ISerialization Serialization
set => _context.Serialization = value;
}

public string[] SupportedMessageTypes
{
get => _context.SupportedMessageTypes;
set => _context.SupportedMessageTypes = value;
}

public long? BodyLength => _context.BodyLength;

public SendContext<T> CreateProxy<T>(T message)
Expand Down
5 changes: 5 additions & 0 deletions src/MassTransit.Abstractions/Contexts/SendContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ public interface SendContext :
/// </summary>
ISerialization Serialization { get; set; }

/// <summary>
/// The supported message types for the message being sent/published. For internal use only.
/// </summary>
string[] SupportedMessageTypes { get; set; }

/// <summary>
/// After serialization, should return the length of the message body
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public byte[] GetBytes()

try
{
var envelope = _envelope ??= new JsonMessageEnvelope(_context, _context.Message, MessageTypeCache<TMessage>.MessageTypeNames);
var envelope = _envelope ??= new JsonMessageEnvelope(_context, _context.Message);

using var stream = new MemoryStream();
using var jsonWriter = new BsonDataWriter(stream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public byte[] GetBytes()

try
{
var envelope = _envelope ??= new JsonMessageEnvelope(_context, _context.Message, MessageTypeCache<TMessage>.MessageTypeNames);
var envelope = _envelope ??= new JsonMessageEnvelope(_context, _context.Message);

using var stream = new MemoryStream();
using var writer = new StreamWriter(stream, MessageDefaults.Encoding, 1024, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public byte[] GetBytes()

try
{
var envelope = _envelope ??= new JsonMessageEnvelope(_context, _context.Message, MessageTypeCache<TMessage>.MessageTypeNames);
var envelope = _envelope ??= new JsonMessageEnvelope(_context, _context.Message);

using var stream = new MemoryStream();

Expand Down
4 changes: 4 additions & 0 deletions src/MassTransit/Contexts/Context/MessageSendContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public MessageSendContext(TMessage message, CancellationToken cancellationToken
MessageId = messageId.ToGuid();
SentTime = messageId.Timestamp;

SupportedMessageTypes = MessageTypeCache<TMessage>.MessageTypeNames;

_body = new Lazy<MessageBody>(() => GetMessageBody());
}

Expand Down Expand Up @@ -84,6 +86,8 @@ public IMessageSerializer Serializer

public ISerialization Serialization { get; set; }

public string[] SupportedMessageTypes { get; set; }

public long? BodyLength => _body.IsValueCreated ? _body.Value.Length : default;

public SendContext<T> CreateProxy<T>(T message)
Expand Down
2 changes: 1 addition & 1 deletion src/MassTransit/Courier/RoutingSlipBuilderSendEndpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public RoutingSlipSendContext(T message, CancellationToken cancellationToken, Ur

public MessageEnvelope GetMessageEnvelope()
{
var envelope = new JsonMessageEnvelope(this, Message, MessageTypeCache<T>.MessageTypeNames);
var envelope = new JsonMessageEnvelope(this, Message);

return envelope;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,7 @@ protected async Task Move(ReceiveContext context, Action<InMemoryTransportMessag

var body = context.GetBody();

var messageType = "Unknown";
if (context.TryGetPayload(out InMemoryTransportMessage receivedMessage))
messageType = receivedMessage.MessageType;

var transportMessage = new InMemoryTransportMessage(messageId, body, context.ContentType?.MediaType, messageType);
var transportMessage = new InMemoryTransportMessage(messageId, body, context.ContentType?.MediaType);

transportMessage.Headers.SetHostHeaders();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public Task Send<T>(PipeContext transportContext, SendContext<T> sendContext)

var messageId = context.MessageId ?? NewId.NextGuid();

var transportMessage = new InMemoryTransportMessage(messageId, context.Body.GetBytes(), context.ContentType.ToString(), TypeCache<T>.ShortName)
var transportMessage = new InMemoryTransportMessage(messageId, context.Body.GetBytes(), context.ContentType.ToString())
{
Delay = context.Delay,
RoutingKey = context.RoutingKey
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,16 @@ namespace MassTransit.InMemoryTransport

public class InMemoryTransportMessage
{
public InMemoryTransportMessage(Guid messageId, byte[] body, string contentType, string messageType)
public InMemoryTransportMessage(Guid messageId, byte[] body, string contentType)
{
Headers = new Dictionary<string, object>(StringComparer.OrdinalIgnoreCase);
MessageId = messageId;
Body = body;
MessageType = messageType;

Headers[MessageHeaders.MessageId] = messageId.ToString();
Headers[MessageHeaders.ContentType] = contentType;
}

public string MessageType { get; }

public Guid MessageId { get; }

public byte[] Body { get; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ public static class LogContextActivityExtensions
if (context.DestinationAddress != null)
activity.SetTag(DiagnosticHeaders.DestinationAddress, context.DestinationAddress.ToString());

activity.SetTag(DiagnosticHeaders.MessageTypes, string.Join(",", MessageTypeCache<T>.MessageTypeNames));
activity.SetTag(DiagnosticHeaders.MessageTypes, string.Join(",", context.SupportedMessageTypes));

for (var i = 0; i < tags.Length; i++)
{
Expand Down
23 changes: 22 additions & 1 deletion src/MassTransit/Middleware/OutboxMessageSendPipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public Task Send(SendContext context)
context.SourceAddress = serializerContext.SourceAddress;
context.ResponseAddress = serializerContext.ResponseAddress;
context.FaultAddress = serializerContext.FaultAddress;
context.SupportedMessageTypes = serializerContext.SupportedMessageTypes;

if (serializerContext.ExpirationTime.HasValue)
context.TimeToLive = serializerContext.ExpirationTime.Value.ToUniversalTime() - DateTime.UtcNow;
Expand Down Expand Up @@ -81,6 +82,20 @@ public IEnumerable<KeyValuePair<string, object>> GetAll()

if (!string.IsNullOrWhiteSpace(_message.ContentType))
yield return new KeyValuePair<string, object>(MessageHeaders.ContentType, _message.ContentType!);

foreach (KeyValuePair<string, object> header in _message.Headers.GetAll())
{
switch (header.Key)
{
case MessageHeaders.MessageId:
case MessageHeaders.ContentType:
continue;

default:
yield return header;
break;
}
}
}

public bool TryGetHeader(string key, [NotNullWhen(true)] out object? value)
Expand All @@ -94,7 +109,13 @@ public bool TryGetHeader(string key, [NotNullWhen(true)] out object? value)
if (MessageHeaders.ContentType.Equals(key, StringComparison.OrdinalIgnoreCase))
{
value = _message.ContentType;
return value != null;
return true;
}

if (_message.Headers.TryGetHeader(key, out var headerValue))
{
value = headerValue;
return true;
}

value = null;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace MassTransit.Serialization
namespace MassTransit
{
using System;

Expand Down
4 changes: 2 additions & 2 deletions src/MassTransit/Serialization/JsonMessageEnvelope.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public JsonMessageEnvelope()
{
}

public JsonMessageEnvelope(SendContext context, object message, string[] messageTypeNames)
public JsonMessageEnvelope(SendContext context, object message)
{
if (context.MessageId.HasValue)
MessageId = context.MessageId.Value.ToString();
Expand Down Expand Up @@ -45,7 +45,7 @@ public JsonMessageEnvelope(SendContext context, object message, string[] message
if (context.FaultAddress != null)
FaultAddress = context.FaultAddress.ToString();

MessageType = messageTypeNames;
MessageType = context.SupportedMessageTypes;

Message = message;

Expand Down
3 changes: 2 additions & 1 deletion src/MassTransit/Serialization/RawMessageSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ protected virtual void SetRawMessageHeaders<T>(SendContext context)
if (context.RequestId.HasValue)
context.Headers.Set(MessageHeaders.RequestId, context.RequestId.Value.ToString());

context.Headers.Set(MessageHeaders.MessageType, string.Join(";", MessageTypeCache<T>.MessageTypeNames));
if (context.SupportedMessageTypes?.Length > 0)
context.Headers.Set(MessageHeaders.MessageType, string.Join(";", context.SupportedMessageTypes));

if (context.ResponseAddress != null)
context.Headers.Set(MessageHeaders.ResponseAddress, context.ResponseAddress);
Expand Down
4 changes: 2 additions & 2 deletions src/MassTransit/Serialization/SystemTextJsonMessageBody.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public byte[] GetBytes()

try
{
var envelope = _envelope ??= new JsonMessageEnvelope(_context, _context.Message, MessageTypeCache<TMessage>.MessageTypeNames);
var envelope = _envelope ??= new JsonMessageEnvelope(_context, _context.Message);

_bytes = JsonSerializer.SerializeToUtf8Bytes(envelope, _options);

Expand All @@ -70,7 +70,7 @@ public string GetString()

try
{
var envelope = _envelope ??= new JsonMessageEnvelope(_context, _context.Message, MessageTypeCache<TMessage>.MessageTypeNames);
var envelope = _envelope ??= new JsonMessageEnvelope(_context, _context.Message);

_string = JsonSerializer.Serialize(envelope, _options);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,13 @@ public override bool IsSupportedMessageType(Type messageType)
|| SupportedMessageTypes.Length == 0
|| SupportedMessageTypes.Any(x => typeUrn.Equals(x, StringComparison.OrdinalIgnoreCase));
}

public override IMessageSerializer GetMessageSerializer(object message, string[] messageTypes)
{
if (message == null)
throw new ArgumentNullException(nameof(message));

return new SystemTextJsonBodyMessageSerializer(message, ContentType, Options, _rawOptions);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -304,16 +304,15 @@ async Task<int> DeliverOutboxMessages(TDbContext dbContext, OutboxState outboxSt

if (messageIndex == messages.Count && messages.Count < messageLimit)
{
outboxState.Delivered = DateTime.UtcNow;

if (hasLastSequenceNumber == false)
{
dbContext.Remove(outboxState);
dbContext.RemoveRange(messages);
}
else
{
outboxState.Delivered = DateTime.UtcNow;
dbContext.Update(outboxState);
}

saveChanges = true;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public static HangfireRecurringScheduledMessageData Create(ConsumeContext<Schedu
var messageBody = context.SerializerContext.GetMessageSerializer(context.Message.Payload, context.Message.PayloadType)
.GetMessageBody(new MessageSendContext<ScheduleRecurringMessage>(context.Message));

SetBaseProperties(data, context, context.Message.Destination, messageBody);
SetBaseProperties(data, context, context.Message.Destination, messageBody, context.Message.PayloadType);

return data;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,21 @@ public class HangfireScheduledMessageData
public string? TokenId { get; set; }
public string? HeadersAsJson { get; set; }
public string? TransportProperties { get; set; }
public string? MessageType { get; set; }

public Uri Destination => new Uri(DestinationAddress!);

protected static void SetBaseProperties(HangfireScheduledMessageData data, ConsumeContext context, Uri destination, MessageBody messageBody,
Guid? tokenId = default)
string[] supportedMessageTypes, Guid? tokenId = default)
{
data.DestinationAddress = destination?.ToString() ?? "";
data.Body = messageBody.GetString();
data.ContentType = context.ReceiveContext.ContentType.ToString();
data.FaultAddress = context.FaultAddress?.ToString() ?? "";
data.ResponseAddress = context.ResponseAddress?.ToString() ?? "";

data.MessageType = string.Join(";", supportedMessageTypes);

if (context.MessageId.HasValue)
data.MessageId = context.MessageId.Value.ToString();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public static HashedHangfireScheduledMessageData Create(ConsumeContext<ScheduleM
var messageBody = context.SerializerContext.GetMessageSerializer(context.Message.Payload, context.Message.PayloadType)
.GetMessageBody(new MessageSendContext<ScheduleMessage>(context.Message));

SetBaseProperties(message, context, context.Message.Destination, messageBody, context.Message.CorrelationId);
SetBaseProperties(message, context, context.Message.Destination, messageBody, context.Message.PayloadType, context.Message.CorrelationId);

return message;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ namespace MassTransit.HangfireIntegration
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Globalization;
using System.Linq;
using System.Text.Json;
using Metadata;
using Serialization;
Expand Down Expand Up @@ -41,7 +42,8 @@ public IReadOnlyDictionary<string, object>? TransportProperties
get
{
return !string.IsNullOrWhiteSpace(_messageData.TransportProperties)
? JsonSerializer.Deserialize<IReadOnlyDictionary<string, object>>(_messageData.TransportProperties!, SystemTextJsonMessageSerializer.Options)
? JsonSerializer.Deserialize<IReadOnlyDictionary<string, object>>(_messageData.TransportProperties!,
SystemTextJsonMessageSerializer.Options)
: null;
}
}
Expand Down Expand Up @@ -120,6 +122,8 @@ public bool TryGetHeader(string key, [NotNullWhen(true)] out object? value)
public Headers Headers => _headers ??= GetHeaders();
public HostInfo Host => _hostInfo ??= HostMetadataCache.Host;

public string[] SupportedMessageTypes => _messageData.MessageType?.Split(';').ToArray() ?? Array.Empty<string>();

Headers GetHeaders()
{
var headers = new DictionarySendHeaders();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ public Task Send(SendContext context)
context.SourceAddress = _messageContext.SourceAddress;
context.ResponseAddress = _messageContext.ResponseAddress;
context.FaultAddress = _messageContext.FaultAddress;
context.SupportedMessageTypes = _messageContext.SupportedMessageTypes;

if (_messageContext.ExpirationTime.HasValue)
context.TimeToLive = _messageContext.ExpirationTime.Value.ToUniversalTime() - DateTime.UtcNow;
Expand All @@ -118,7 +119,6 @@ public Task Send(SendContext context)
if (transportProperties != null && context is TransportSendContext transportSendContext)
transportSendContext.ReadPropertiesFrom(transportProperties);


context.Serializer = serializerContext.GetMessageSerializer();

return Task.CompletedTask;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public Task Consume(ConsumeContext<ScheduleRecurringMessage> context)
jobKey,
x => x.SendMessage(message, null!),
context.Message.Schedule.CronExpression,
tz);
new RecurringJobOptions { TimeZone = tz });

LogContext.Debug?.Log("Scheduled: {Key}", jobKey);
return Task.CompletedTask;
Expand Down
Loading

0 comments on commit 964a8d9

Please sign in to comment.