diff --git a/src/MassTransit.Abstractions/Contexts/Context/PublishContextProxy.cs b/src/MassTransit.Abstractions/Contexts/Context/PublishContextProxy.cs index 919ebf55e5b..6465ed634bb 100644 --- a/src/MassTransit.Abstractions/Contexts/Context/PublishContextProxy.cs +++ b/src/MassTransit.Abstractions/Contexts/Context/PublishContextProxy.cs @@ -132,6 +132,12 @@ public ISerialization Serialization set => _context.Serialization = value; } + public string[] SupportedMessageTypes + { + get => _context.SupportedMessageTypes; + set => _context.SupportedMessageTypes = value; + } + public long? BodyLength => _context.BodyLength; public SendContext CreateProxy(T message) diff --git a/src/MassTransit.Abstractions/Contexts/Context/SendContextProxy.cs b/src/MassTransit.Abstractions/Contexts/Context/SendContextProxy.cs index 2cfca1b34de..aec04c019a6 100644 --- a/src/MassTransit.Abstractions/Contexts/Context/SendContextProxy.cs +++ b/src/MassTransit.Abstractions/Contexts/Context/SendContextProxy.cs @@ -131,6 +131,12 @@ public ISerialization Serialization set => _context.Serialization = value; } + public string[] SupportedMessageTypes + { + get => _context.SupportedMessageTypes; + set => _context.SupportedMessageTypes = value; + } + public long? BodyLength => _context.BodyLength; public SendContext CreateProxy(T message) diff --git a/src/MassTransit.Abstractions/Contexts/SendContext.cs b/src/MassTransit.Abstractions/Contexts/SendContext.cs index 0cdbb690727..1762a40aa46 100644 --- a/src/MassTransit.Abstractions/Contexts/SendContext.cs +++ b/src/MassTransit.Abstractions/Contexts/SendContext.cs @@ -69,6 +69,11 @@ public interface SendContext : /// ISerialization Serialization { get; set; } + /// + /// The supported message types for the message being sent/published. For internal use only. + /// + string[] SupportedMessageTypes { get; set; } + /// /// After serialization, should return the length of the message body /// diff --git a/src/MassTransit.Newtonsoft/Serialization/NewtonsoftBsonMessageBody.cs b/src/MassTransit.Newtonsoft/Serialization/NewtonsoftBsonMessageBody.cs index a7f2cd3469d..17345a81c16 100644 --- a/src/MassTransit.Newtonsoft/Serialization/NewtonsoftBsonMessageBody.cs +++ b/src/MassTransit.Newtonsoft/Serialization/NewtonsoftBsonMessageBody.cs @@ -35,7 +35,7 @@ public byte[] GetBytes() try { - var envelope = _envelope ??= new JsonMessageEnvelope(_context, _context.Message, MessageTypeCache.MessageTypeNames); + var envelope = _envelope ??= new JsonMessageEnvelope(_context, _context.Message); using var stream = new MemoryStream(); using var jsonWriter = new BsonDataWriter(stream); diff --git a/src/MassTransit.Newtonsoft/Serialization/NewtonsoftJsonMessageBody.cs b/src/MassTransit.Newtonsoft/Serialization/NewtonsoftJsonMessageBody.cs index b8c289f483d..eae15888e05 100644 --- a/src/MassTransit.Newtonsoft/Serialization/NewtonsoftJsonMessageBody.cs +++ b/src/MassTransit.Newtonsoft/Serialization/NewtonsoftJsonMessageBody.cs @@ -37,7 +37,7 @@ public byte[] GetBytes() try { - var envelope = _envelope ??= new JsonMessageEnvelope(_context, _context.Message, MessageTypeCache.MessageTypeNames); + var envelope = _envelope ??= new JsonMessageEnvelope(_context, _context.Message); using var stream = new MemoryStream(); using var writer = new StreamWriter(stream, MessageDefaults.Encoding, 1024, true); diff --git a/src/MassTransit.Newtonsoft/Serialization/NewtonsoftXmlMessageBody.cs b/src/MassTransit.Newtonsoft/Serialization/NewtonsoftXmlMessageBody.cs index f7fae214ba6..b7572eca30e 100644 --- a/src/MassTransit.Newtonsoft/Serialization/NewtonsoftXmlMessageBody.cs +++ b/src/MassTransit.Newtonsoft/Serialization/NewtonsoftXmlMessageBody.cs @@ -36,7 +36,7 @@ public byte[] GetBytes() try { - var envelope = _envelope ??= new JsonMessageEnvelope(_context, _context.Message, MessageTypeCache.MessageTypeNames); + var envelope = _envelope ??= new JsonMessageEnvelope(_context, _context.Message); using var stream = new MemoryStream(); diff --git a/src/MassTransit/Contexts/Context/MessageSendContext.cs b/src/MassTransit/Contexts/Context/MessageSendContext.cs index 6cb5e9a6cb3..71dd82e68c0 100644 --- a/src/MassTransit/Contexts/Context/MessageSendContext.cs +++ b/src/MassTransit/Contexts/Context/MessageSendContext.cs @@ -35,6 +35,8 @@ public MessageSendContext(TMessage message, CancellationToken cancellationToken MessageId = messageId.ToGuid(); SentTime = messageId.Timestamp; + SupportedMessageTypes = MessageTypeCache.MessageTypeNames; + _body = new Lazy(() => GetMessageBody()); } @@ -84,6 +86,8 @@ public IMessageSerializer Serializer public ISerialization Serialization { get; set; } + public string[] SupportedMessageTypes { get; set; } + public long? BodyLength => _body.IsValueCreated ? _body.Value.Length : default; public SendContext CreateProxy(T message) diff --git a/src/MassTransit/Courier/RoutingSlipBuilderSendEndpoint.cs b/src/MassTransit/Courier/RoutingSlipBuilderSendEndpoint.cs index 4d370322c4a..22c08b84284 100644 --- a/src/MassTransit/Courier/RoutingSlipBuilderSendEndpoint.cs +++ b/src/MassTransit/Courier/RoutingSlipBuilderSendEndpoint.cs @@ -169,7 +169,7 @@ public RoutingSlipSendContext(T message, CancellationToken cancellationToken, Ur public MessageEnvelope GetMessageEnvelope() { - var envelope = new JsonMessageEnvelope(this, Message, MessageTypeCache.MessageTypeNames); + var envelope = new JsonMessageEnvelope(this, Message); return envelope; } diff --git a/src/MassTransit/InMemoryTransport/InMemoryTransport/InMemoryMessageMoveTransport.cs b/src/MassTransit/InMemoryTransport/InMemoryTransport/InMemoryMessageMoveTransport.cs index a5524504ed7..57cc5e28610 100644 --- a/src/MassTransit/InMemoryTransport/InMemoryTransport/InMemoryMessageMoveTransport.cs +++ b/src/MassTransit/InMemoryTransport/InMemoryTransport/InMemoryMessageMoveTransport.cs @@ -23,11 +23,7 @@ protected async Task Move(ReceiveContext context, Action(PipeContext transportContext, SendContext sendContext) var messageId = context.MessageId ?? NewId.NextGuid(); - var transportMessage = new InMemoryTransportMessage(messageId, context.Body.GetBytes(), context.ContentType.ToString(), TypeCache.ShortName) + var transportMessage = new InMemoryTransportMessage(messageId, context.Body.GetBytes(), context.ContentType.ToString()) { Delay = context.Delay, RoutingKey = context.RoutingKey diff --git a/src/MassTransit/InMemoryTransport/InMemoryTransport/InMemoryTransportMessage.cs b/src/MassTransit/InMemoryTransport/InMemoryTransport/InMemoryTransportMessage.cs index 92a7664ef62..16bf878fc22 100644 --- a/src/MassTransit/InMemoryTransport/InMemoryTransport/InMemoryTransportMessage.cs +++ b/src/MassTransit/InMemoryTransport/InMemoryTransport/InMemoryTransportMessage.cs @@ -7,19 +7,16 @@ namespace MassTransit.InMemoryTransport public class InMemoryTransportMessage { - public InMemoryTransportMessage(Guid messageId, byte[] body, string contentType, string messageType) + public InMemoryTransportMessage(Guid messageId, byte[] body, string contentType) { Headers = new Dictionary(StringComparer.OrdinalIgnoreCase); MessageId = messageId; Body = body; - MessageType = messageType; Headers[MessageHeaders.MessageId] = messageId.ToString(); Headers[MessageHeaders.ContentType] = contentType; } - public string MessageType { get; } - public Guid MessageId { get; } public byte[] Body { get; } diff --git a/src/MassTransit/Logging/Diagnostics/LogContextActivityExtensions.cs b/src/MassTransit/Logging/Diagnostics/LogContextActivityExtensions.cs index d0dacc9cea1..a14d5b1e523 100644 --- a/src/MassTransit/Logging/Diagnostics/LogContextActivityExtensions.cs +++ b/src/MassTransit/Logging/Diagnostics/LogContextActivityExtensions.cs @@ -186,7 +186,7 @@ public static class LogContextActivityExtensions if (context.DestinationAddress != null) activity.SetTag(DiagnosticHeaders.DestinationAddress, context.DestinationAddress.ToString()); - activity.SetTag(DiagnosticHeaders.MessageTypes, string.Join(",", MessageTypeCache.MessageTypeNames)); + activity.SetTag(DiagnosticHeaders.MessageTypes, string.Join(",", context.SupportedMessageTypes)); for (var i = 0; i < tags.Length; i++) { diff --git a/src/MassTransit/Middleware/OutboxMessageSendPipe.cs b/src/MassTransit/Middleware/OutboxMessageSendPipe.cs index fd979765231..b3a92e7987e 100644 --- a/src/MassTransit/Middleware/OutboxMessageSendPipe.cs +++ b/src/MassTransit/Middleware/OutboxMessageSendPipe.cs @@ -45,6 +45,7 @@ public Task Send(SendContext context) context.SourceAddress = serializerContext.SourceAddress; context.ResponseAddress = serializerContext.ResponseAddress; context.FaultAddress = serializerContext.FaultAddress; + context.SupportedMessageTypes = serializerContext.SupportedMessageTypes; if (serializerContext.ExpirationTime.HasValue) context.TimeToLive = serializerContext.ExpirationTime.Value.ToUniversalTime() - DateTime.UtcNow; @@ -81,6 +82,20 @@ public IEnumerable> GetAll() if (!string.IsNullOrWhiteSpace(_message.ContentType)) yield return new KeyValuePair(MessageHeaders.ContentType, _message.ContentType!); + + foreach (KeyValuePair header in _message.Headers.GetAll()) + { + switch (header.Key) + { + case MessageHeaders.MessageId: + case MessageHeaders.ContentType: + continue; + + default: + yield return header; + break; + } + } } public bool TryGetHeader(string key, [NotNullWhen(true)] out object? value) @@ -94,7 +109,13 @@ public bool TryGetHeader(string key, [NotNullWhen(true)] out object? value) if (MessageHeaders.ContentType.Equals(key, StringComparison.OrdinalIgnoreCase)) { value = _message.ContentType; - return value != null; + return true; + } + + if (_message.Headers.TryGetHeader(key, out var headerValue)) + { + value = headerValue; + return true; } value = null; diff --git a/src/MassTransit/Serialization/RawSerializerOptions.cs b/src/MassTransit/RawSerializerOptions.cs similarity index 94% rename from src/MassTransit/Serialization/RawSerializerOptions.cs rename to src/MassTransit/RawSerializerOptions.cs index 96213e8e2b0..26c0eaae8e9 100644 --- a/src/MassTransit/Serialization/RawSerializerOptions.cs +++ b/src/MassTransit/RawSerializerOptions.cs @@ -1,4 +1,4 @@ -namespace MassTransit.Serialization +namespace MassTransit { using System; diff --git a/src/MassTransit/Serialization/JsonMessageEnvelope.cs b/src/MassTransit/Serialization/JsonMessageEnvelope.cs index e06ffee8244..2c94d2aa148 100644 --- a/src/MassTransit/Serialization/JsonMessageEnvelope.cs +++ b/src/MassTransit/Serialization/JsonMessageEnvelope.cs @@ -16,7 +16,7 @@ public JsonMessageEnvelope() { } - public JsonMessageEnvelope(SendContext context, object message, string[] messageTypeNames) + public JsonMessageEnvelope(SendContext context, object message) { if (context.MessageId.HasValue) MessageId = context.MessageId.Value.ToString(); @@ -45,7 +45,7 @@ public JsonMessageEnvelope(SendContext context, object message, string[] message if (context.FaultAddress != null) FaultAddress = context.FaultAddress.ToString(); - MessageType = messageTypeNames; + MessageType = context.SupportedMessageTypes; Message = message; diff --git a/src/MassTransit/Serialization/RawMessageSerializer.cs b/src/MassTransit/Serialization/RawMessageSerializer.cs index ea31ed2b980..e84c5f87a36 100644 --- a/src/MassTransit/Serialization/RawMessageSerializer.cs +++ b/src/MassTransit/Serialization/RawMessageSerializer.cs @@ -25,7 +25,8 @@ protected virtual void SetRawMessageHeaders(SendContext context) if (context.RequestId.HasValue) context.Headers.Set(MessageHeaders.RequestId, context.RequestId.Value.ToString()); - context.Headers.Set(MessageHeaders.MessageType, string.Join(";", MessageTypeCache.MessageTypeNames)); + if (context.SupportedMessageTypes?.Length > 0) + context.Headers.Set(MessageHeaders.MessageType, string.Join(";", context.SupportedMessageTypes)); if (context.ResponseAddress != null) context.Headers.Set(MessageHeaders.ResponseAddress, context.ResponseAddress); diff --git a/src/MassTransit/Serialization/SystemTextJsonMessageBody.cs b/src/MassTransit/Serialization/SystemTextJsonMessageBody.cs index 7f4f7ddf4e1..e9a3471b8f0 100644 --- a/src/MassTransit/Serialization/SystemTextJsonMessageBody.cs +++ b/src/MassTransit/Serialization/SystemTextJsonMessageBody.cs @@ -45,7 +45,7 @@ public byte[] GetBytes() try { - var envelope = _envelope ??= new JsonMessageEnvelope(_context, _context.Message, MessageTypeCache.MessageTypeNames); + var envelope = _envelope ??= new JsonMessageEnvelope(_context, _context.Message); _bytes = JsonSerializer.SerializeToUtf8Bytes(envelope, _options); @@ -70,7 +70,7 @@ public string GetString() try { - var envelope = _envelope ??= new JsonMessageEnvelope(_context, _context.Message, MessageTypeCache.MessageTypeNames); + var envelope = _envelope ??= new JsonMessageEnvelope(_context, _context.Message); _string = JsonSerializer.Serialize(envelope, _options); diff --git a/src/MassTransit/Serialization/SystemTextJsonRawSerializerContext.cs b/src/MassTransit/Serialization/SystemTextJsonRawSerializerContext.cs index a9e510288ea..74ed76124fd 100644 --- a/src/MassTransit/Serialization/SystemTextJsonRawSerializerContext.cs +++ b/src/MassTransit/Serialization/SystemTextJsonRawSerializerContext.cs @@ -41,5 +41,13 @@ public override bool IsSupportedMessageType(Type messageType) || SupportedMessageTypes.Length == 0 || SupportedMessageTypes.Any(x => typeUrn.Equals(x, StringComparison.OrdinalIgnoreCase)); } + + public override IMessageSerializer GetMessageSerializer(object message, string[] messageTypes) + { + if (message == null) + throw new ArgumentNullException(nameof(message)); + + return new SystemTextJsonBodyMessageSerializer(message, ContentType, Options, _rawOptions); + } } } diff --git a/src/Persistence/MassTransit.EntityFrameworkCoreIntegration/EntityFrameworkCoreIntegration/BusOutboxDeliveryService.cs b/src/Persistence/MassTransit.EntityFrameworkCoreIntegration/EntityFrameworkCoreIntegration/BusOutboxDeliveryService.cs index 2de74f274bf..557f6c0faef 100644 --- a/src/Persistence/MassTransit.EntityFrameworkCoreIntegration/EntityFrameworkCoreIntegration/BusOutboxDeliveryService.cs +++ b/src/Persistence/MassTransit.EntityFrameworkCoreIntegration/EntityFrameworkCoreIntegration/BusOutboxDeliveryService.cs @@ -304,16 +304,15 @@ async Task DeliverOutboxMessages(TDbContext dbContext, OutboxState outboxSt if (messageIndex == messages.Count && messages.Count < messageLimit) { + outboxState.Delivered = DateTime.UtcNow; + if (hasLastSequenceNumber == false) { dbContext.Remove(outboxState); dbContext.RemoveRange(messages); } else - { - outboxState.Delivered = DateTime.UtcNow; dbContext.Update(outboxState); - } saveChanges = true; diff --git a/src/Scheduling/MassTransit.HangfireIntegration/HangfireIntegration/HangfireRecurringSceduledMessageData.cs b/src/Scheduling/MassTransit.HangfireIntegration/HangfireIntegration/HangfireRecurringSceduledMessageData.cs index 98f06561a49..883112d3f4e 100644 --- a/src/Scheduling/MassTransit.HangfireIntegration/HangfireIntegration/HangfireRecurringSceduledMessageData.cs +++ b/src/Scheduling/MassTransit.HangfireIntegration/HangfireIntegration/HangfireRecurringSceduledMessageData.cs @@ -24,7 +24,7 @@ public static HangfireRecurringScheduledMessageData Create(ConsumeContext(context.Message)); - SetBaseProperties(data, context, context.Message.Destination, messageBody); + SetBaseProperties(data, context, context.Message.Destination, messageBody, context.Message.PayloadType); return data; } diff --git a/src/Scheduling/MassTransit.HangfireIntegration/HangfireIntegration/HangfireScheduledMessageData.cs b/src/Scheduling/MassTransit.HangfireIntegration/HangfireIntegration/HangfireScheduledMessageData.cs index cd8792ca847..c9f0871e37f 100644 --- a/src/Scheduling/MassTransit.HangfireIntegration/HangfireIntegration/HangfireScheduledMessageData.cs +++ b/src/Scheduling/MassTransit.HangfireIntegration/HangfireIntegration/HangfireScheduledMessageData.cs @@ -25,11 +25,12 @@ public class HangfireScheduledMessageData public string? TokenId { get; set; } public string? HeadersAsJson { get; set; } public string? TransportProperties { get; set; } + public string? MessageType { get; set; } public Uri Destination => new Uri(DestinationAddress!); protected static void SetBaseProperties(HangfireScheduledMessageData data, ConsumeContext context, Uri destination, MessageBody messageBody, - Guid? tokenId = default) + string[] supportedMessageTypes, Guid? tokenId = default) { data.DestinationAddress = destination?.ToString() ?? ""; data.Body = messageBody.GetString(); @@ -37,6 +38,8 @@ protected static void SetBaseProperties(HangfireScheduledMessageData data, Consu data.FaultAddress = context.FaultAddress?.ToString() ?? ""; data.ResponseAddress = context.ResponseAddress?.ToString() ?? ""; + data.MessageType = string.Join(";", supportedMessageTypes); + if (context.MessageId.HasValue) data.MessageId = context.MessageId.Value.ToString(); diff --git a/src/Scheduling/MassTransit.HangfireIntegration/HangfireIntegration/HashedHangfireScheduledMessageData.cs b/src/Scheduling/MassTransit.HangfireIntegration/HangfireIntegration/HashedHangfireScheduledMessageData.cs index 6c6890ab90c..db710cc6711 100644 --- a/src/Scheduling/MassTransit.HangfireIntegration/HangfireIntegration/HashedHangfireScheduledMessageData.cs +++ b/src/Scheduling/MassTransit.HangfireIntegration/HangfireIntegration/HashedHangfireScheduledMessageData.cs @@ -17,7 +17,7 @@ public static HashedHangfireScheduledMessageData Create(ConsumeContext(context.Message)); - SetBaseProperties(message, context, context.Message.Destination, messageBody, context.Message.CorrelationId); + SetBaseProperties(message, context, context.Message.Destination, messageBody, context.Message.PayloadType, context.Message.CorrelationId); return message; } diff --git a/src/Scheduling/MassTransit.HangfireIntegration/HangfireIntegration/MessageDataMessageContext.cs b/src/Scheduling/MassTransit.HangfireIntegration/HangfireIntegration/MessageDataMessageContext.cs index 426407ec7d7..116cffd8939 100644 --- a/src/Scheduling/MassTransit.HangfireIntegration/HangfireIntegration/MessageDataMessageContext.cs +++ b/src/Scheduling/MassTransit.HangfireIntegration/HangfireIntegration/MessageDataMessageContext.cs @@ -5,6 +5,7 @@ namespace MassTransit.HangfireIntegration using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; using System.Globalization; + using System.Linq; using System.Text.Json; using Metadata; using Serialization; @@ -41,7 +42,8 @@ public IReadOnlyDictionary? TransportProperties get { return !string.IsNullOrWhiteSpace(_messageData.TransportProperties) - ? JsonSerializer.Deserialize>(_messageData.TransportProperties!, SystemTextJsonMessageSerializer.Options) + ? JsonSerializer.Deserialize>(_messageData.TransportProperties!, + SystemTextJsonMessageSerializer.Options) : null; } } @@ -120,6 +122,8 @@ public bool TryGetHeader(string key, [NotNullWhen(true)] out object? value) public Headers Headers => _headers ??= GetHeaders(); public HostInfo Host => _hostInfo ??= HostMetadataCache.Host; + public string[] SupportedMessageTypes => _messageData.MessageType?.Split(';').ToArray() ?? Array.Empty(); + Headers GetHeaders() { var headers = new DictionarySendHeaders(); diff --git a/src/Scheduling/MassTransit.HangfireIntegration/HangfireIntegration/ScheduleJob.cs b/src/Scheduling/MassTransit.HangfireIntegration/HangfireIntegration/ScheduleJob.cs index 642238c38bc..49fba951b52 100644 --- a/src/Scheduling/MassTransit.HangfireIntegration/HangfireIntegration/ScheduleJob.cs +++ b/src/Scheduling/MassTransit.HangfireIntegration/HangfireIntegration/ScheduleJob.cs @@ -107,6 +107,7 @@ public Task Send(SendContext context) context.SourceAddress = _messageContext.SourceAddress; context.ResponseAddress = _messageContext.ResponseAddress; context.FaultAddress = _messageContext.FaultAddress; + context.SupportedMessageTypes = _messageContext.SupportedMessageTypes; if (_messageContext.ExpirationTime.HasValue) context.TimeToLive = _messageContext.ExpirationTime.Value.ToUniversalTime() - DateTime.UtcNow; @@ -118,7 +119,6 @@ public Task Send(SendContext context) if (transportProperties != null && context is TransportSendContext transportSendContext) transportSendContext.ReadPropertiesFrom(transportProperties); - context.Serializer = serializerContext.GetMessageSerializer(); return Task.CompletedTask; diff --git a/src/Scheduling/MassTransit.HangfireIntegration/HangfireIntegration/ScheduleRecurringMessageConsumer.cs b/src/Scheduling/MassTransit.HangfireIntegration/HangfireIntegration/ScheduleRecurringMessageConsumer.cs index 4e0deaec1ce..487991ea35e 100644 --- a/src/Scheduling/MassTransit.HangfireIntegration/HangfireIntegration/ScheduleRecurringMessageConsumer.cs +++ b/src/Scheduling/MassTransit.HangfireIntegration/HangfireIntegration/ScheduleRecurringMessageConsumer.cs @@ -41,7 +41,7 @@ public Task Consume(ConsumeContext context) jobKey, x => x.SendMessage(message, null!), context.Message.Schedule.CronExpression, - tz); + new RecurringJobOptions { TimeZone = tz }); LogContext.Debug?.Log("Scheduled: {Key}", jobKey); return Task.CompletedTask; diff --git a/src/Scheduling/MassTransit.QuartzIntegration/QuartzIntegration/ScheduleMessageConsumer.cs b/src/Scheduling/MassTransit.QuartzIntegration/QuartzIntegration/ScheduleMessageConsumer.cs index 0be4101411d..45a1664f20b 100644 --- a/src/Scheduling/MassTransit.QuartzIntegration/QuartzIntegration/ScheduleMessageConsumer.cs +++ b/src/Scheduling/MassTransit.QuartzIntegration/QuartzIntegration/ScheduleMessageConsumer.cs @@ -43,7 +43,8 @@ public async Task Consume(ConsumeContext context) .WithSchedule(SimpleScheduleBuilder.Create().WithMisfireHandlingInstructionFireNow()) .WithIdentity(triggerKey); - var trigger = PopulateTrigger(context, builder, messageBody, context.Message.Destination, context.MessageId, context.Message.CorrelationId); + var trigger = PopulateTrigger(context, builder, messageBody, context.Message.Destination, context.Message.PayloadType, messageId: context.MessageId, + tokenId: context.Message.CorrelationId); var scheduler = await _schedulerFactory.GetScheduler(context.CancellationToken).ConfigureAwait(false); @@ -92,8 +93,8 @@ public async Task Consume(ConsumeContext context) if (schedule.EndTime.HasValue) triggerBuilder.EndAt(schedule.EndTime); - var trigger = PopulateTrigger(context, triggerBuilder, messageBody, context.Message.Destination, context.MessageId, - context.Message.CorrelationId); + var trigger = PopulateTrigger(context, triggerBuilder, messageBody, context.Message.Destination, context.Message.PayloadType, + messageId: context.MessageId, tokenId: context.Message.CorrelationId); var scheduler = await _schedulerFactory.GetScheduler(context.CancellationToken).ConfigureAwait(false); @@ -106,12 +107,13 @@ public async Task Consume(ConsumeContext context) } static ITrigger PopulateTrigger(ConsumeContext context, TriggerBuilder builder, MessageBody messageBody, Uri destination, - Guid? messageId = default, Guid? tokenId = default) + string[] messageTypes, Guid? messageId = default, Guid? tokenId = default) { builder = builder .UsingJobData("Destination", ToString(destination)) .UsingJobData("Body", messageBody.GetString()) - .UsingJobData("ContentType", context.ReceiveContext.ContentType.ToString()); + .UsingJobData("ContentType", context.ReceiveContext.ContentType.ToString()) + .UsingJobData("MessageType", string.Join(";", messageTypes)); if (messageId.HasValue) builder = builder.UsingJobData("MessageId", messageId.Value.ToString()); diff --git a/src/Scheduling/MassTransit.QuartzIntegration/QuartzIntegration/ScheduledMessageJob.cs b/src/Scheduling/MassTransit.QuartzIntegration/QuartzIntegration/ScheduledMessageJob.cs index 7d7981e6b86..da23cbfb07e 100644 --- a/src/Scheduling/MassTransit.QuartzIntegration/QuartzIntegration/ScheduledMessageJob.cs +++ b/src/Scheduling/MassTransit.QuartzIntegration/QuartzIntegration/ScheduledMessageJob.cs @@ -29,11 +29,11 @@ public async Task Execute(IJobExecutionContext context) var contentType = new ContentType(jobData.GetString("ContentType")!); var destinationAddress = new Uri(jobData.GetString("Destination")!); var body = jobData.GetString("Body") ?? string.Empty; - var messageType = jobData.GetString("MessageType")?.Split(';')?.ToArray() ?? Array.Empty(); + var supportedMessageTypes = jobData.GetString("MessageType")?.Split(';').ToArray() ?? Array.Empty(); try { - var pipe = new ForwardScheduledMessagePipe(contentType, messageContext, body, destinationAddress); + var pipe = new ForwardScheduledMessagePipe(contentType, messageContext, body, destinationAddress, supportedMessageTypes); var endpoint = await _bus.GetSendEndpoint(destinationAddress).ConfigureAwait(false); @@ -43,7 +43,7 @@ public async Task Execute(IJobExecutionContext context) } catch (Exception ex) { - LogContext.Error?.Log(ex, "Failed to send scheduled message: {MessageType} {DestinationAddress}", messageType, destinationAddress); + LogContext.Error?.Log(ex, "Failed to send scheduled message: {MessageType} {DestinationAddress}", supportedMessageTypes, destinationAddress); throw new JobExecutionException(ex, context.RefireCount < 5); } @@ -56,14 +56,17 @@ class ForwardScheduledMessagePipe : readonly string _body; readonly ContentType? _contentType; readonly Uri? _destinationAddress; + readonly string[] _supportedMessageTypes; readonly JobDataMessageContext _messageContext; - public ForwardScheduledMessagePipe(ContentType? contentType, JobDataMessageContext messageContext, string body, Uri? destinationAddress) + public ForwardScheduledMessagePipe(ContentType? contentType, JobDataMessageContext messageContext, string body, Uri? destinationAddress, + string[] supportedMessageTypes) { _contentType = contentType; _messageContext = messageContext; _body = body; _destinationAddress = destinationAddress; + _supportedMessageTypes = supportedMessageTypes; } public Task Send(SendContext context) @@ -84,6 +87,7 @@ public Task Send(SendContext context) context.SourceAddress = _messageContext.SourceAddress; context.ResponseAddress = _messageContext.ResponseAddress; context.FaultAddress = _messageContext.FaultAddress; + context.SupportedMessageTypes = _supportedMessageTypes; if (_messageContext.ExpirationTime.HasValue) context.TimeToLive = _messageContext.ExpirationTime.Value.ToUniversalTime() - DateTime.UtcNow; diff --git a/src/Transports/MassTransit.GrpcTransport/GrpcTransport/GrpcSendTransportContext.cs b/src/Transports/MassTransit.GrpcTransport/GrpcTransport/GrpcSendTransportContext.cs index 4291b0c15f1..f8623cf8d80 100644 --- a/src/Transports/MassTransit.GrpcTransport/GrpcTransport/GrpcSendTransportContext.cs +++ b/src/Transports/MassTransit.GrpcTransport/GrpcTransport/GrpcSendTransportContext.cs @@ -94,7 +94,7 @@ public Task Send(PipeContext transportContext, SendContext sendContext) } }; - transportMessage.Deliver.Envelope.MessageType.AddRange(context.MessageTypes); + transportMessage.Deliver.Envelope.MessageType.AddRange(sendContext.SupportedMessageTypes); SetHeaders(transportMessage.Deliver.Envelope.Headers, context.Headers); diff --git a/src/Transports/MassTransit.GrpcTransport/GrpcTransport/GrpcTransportPropertyNames.cs b/src/Transports/MassTransit.GrpcTransport/GrpcTransport/GrpcTransportPropertyNames.cs index 8c24a637ad4..38215c30b3d 100644 --- a/src/Transports/MassTransit.GrpcTransport/GrpcTransport/GrpcTransportPropertyNames.cs +++ b/src/Transports/MassTransit.GrpcTransport/GrpcTransport/GrpcTransportPropertyNames.cs @@ -3,7 +3,6 @@ namespace MassTransit.GrpcTransport static class GrpcTransportPropertyNames { public const string Exchange = "GRPC-Exchange"; - public const string MessageTypes = "GRPC-MessageTypes"; public const string RoutingKey = "RoutingKey"; } } diff --git a/src/Transports/MassTransit.GrpcTransport/GrpcTransport/TransportGrpcSendContext.cs b/src/Transports/MassTransit.GrpcTransport/GrpcTransport/TransportGrpcSendContext.cs index 094c6e9aa0a..fad379d3e6f 100644 --- a/src/Transports/MassTransit.GrpcTransport/GrpcTransport/TransportGrpcSendContext.cs +++ b/src/Transports/MassTransit.GrpcTransport/GrpcTransport/TransportGrpcSendContext.cs @@ -15,20 +15,16 @@ public TransportGrpcSendContext(string exchange, T message, CancellationToken ca { Exchange = exchange; RoutingKey = default; - - MessageTypes = MessageTypeCache.MessageTypeNames; } public string Exchange { get; private set; } public string RoutingKey { get; set; } - public string[] MessageTypes { get; set; } public override void ReadPropertiesFrom(IReadOnlyDictionary properties) { base.ReadPropertiesFrom(properties); Exchange = ReadString(properties, GrpcTransportPropertyNames.Exchange, Exchange); - MessageTypes = ReadStringArray(properties, GrpcTransportPropertyNames.MessageTypes); RoutingKey = ReadString(properties, GrpcTransportPropertyNames.RoutingKey); } @@ -38,8 +34,6 @@ public override void WritePropertiesTo(IDictionary properties) if (!string.IsNullOrWhiteSpace(Exchange)) properties[GrpcTransportPropertyNames.Exchange] = Exchange; - if (MessageTypes != null && MessageTypes.Length > 0) - properties[GrpcTransportPropertyNames.MessageTypes] = string.Join(";", MessageTypes); if (!string.IsNullOrWhiteSpace(RoutingKey)) properties[GrpcTransportPropertyNames.RoutingKey] = RoutingKey; } diff --git a/tests/MassTransit.EntityFrameworkCoreIntegration.Tests/ReliableMessaging/BusOutbox_Specs.cs b/tests/MassTransit.EntityFrameworkCoreIntegration.Tests/ReliableMessaging/BusOutbox_Specs.cs index dba318e35ce..f22f24bca16 100644 --- a/tests/MassTransit.EntityFrameworkCoreIntegration.Tests/ReliableMessaging/BusOutbox_Specs.cs +++ b/tests/MassTransit.EntityFrameworkCoreIntegration.Tests/ReliableMessaging/BusOutbox_Specs.cs @@ -75,6 +75,75 @@ public async Task Should_support_the_test_harness() } } + [Test] + public async Task Should_include_headers_when_using_raw_json() + { + using var tracerProvider = TraceConfig.CreateTraceProvider("ef-core-tests"); + + await using var provider = new ServiceCollection() + .AddBusOutboxServices() + .AddTelemetryListener() + .AddMassTransitTestHarness(x => + { + x.AddEntityFrameworkOutbox(o => + { + o.QueryDelay = TimeSpan.FromSeconds(1); + + o.UseBusOutbox(bo => + { + bo.MessageDeliveryLimit = 10; + }); + }); + + x.AddConsumer(); + + x.SetTestTimeouts(testInactivityTimeout: TimeSpan.FromSeconds(10)); + + x.UsingInMemory((context, cfg) => + { + cfg.UseRawJsonSerializer(RawSerializerOptions.CopyHeaders | RawSerializerOptions.AddTransportHeaders); + cfg.ConfigureEndpoints(context); + }); + }) + .BuildServiceProvider(true); + + var harness = provider.GetTestHarness(); + harness.TestInactivityTimeout = TimeSpan.FromSeconds(5); + + await harness.Start(); + + IConsumerTestHarness consumerHarness = harness.GetConsumerHarness(); + + try + { + { + await using var dbContext = harness.Scope.ServiceProvider.GetRequiredService(); + + var publishEndpoint = harness.Scope.ServiceProvider.GetRequiredService(); + + var activity = TraceConfig.Source.StartActivity(ActivityKind.Client); + + await publishEndpoint.Publish(new PingMessage(), x => x.Headers.Set("Test-Header", "Test-Value")); + + await dbContext.SaveChangesAsync(harness.CancellationToken); + + activity.Stop(); + } + + Assert.That(await consumerHarness.Consumed.Any(), Is.True); + + IReceivedMessage context = await consumerHarness.Consumed.SelectAsync().FirstOrDefault(); + + Assert.That(context.Context.Headers.TryGetHeader("Test-Header", out var header), Is.True); + + Assert.That(header, Is.EqualTo("Test-Value")); + } + finally + { + await harness.Stop(); + } + } + [Test] public async Task Should_support_baggage_in_telemetry() { diff --git a/tests/MassTransit.HangfireIntegration.Tests/ScheduleMessage_Specs.cs b/tests/MassTransit.HangfireIntegration.Tests/ScheduleMessage_Specs.cs index 08ade9d68ea..24a8c940970 100644 --- a/tests/MassTransit.HangfireIntegration.Tests/ScheduleMessage_Specs.cs +++ b/tests/MassTransit.HangfireIntegration.Tests/ScheduleMessage_Specs.cs @@ -53,6 +53,112 @@ public class SecondMessage } } + [TestFixture] + public class ScheduleMessageUsingJson_Specs : + HangfireInMemoryTestFixture + { + [Test] + public async Task Should_get_both_messages() + { + await Scheduler.ScheduleSend(InputQueueAddress, DateTime.Now, new FirstMessage()); + + await _first; + + await _second; + + if (_secondActivityId != null && _firstActivityId != null) + Assert.That(_secondActivityId.StartsWith(_firstActivityId), Is.True); + } + + Task> _second; + Task> _first; + string _firstActivityId; + string _secondActivityId; + + protected override void ConfigureInMemoryBus(IInMemoryBusFactoryConfigurator configurator) + { + configurator.UseRawJsonSerializer(RawSerializerOptions.CopyHeaders | RawSerializerOptions.AddTransportHeaders); + base.ConfigureInMemoryBus(configurator); + } + + protected override void ConfigureInMemoryReceiveEndpoint(IInMemoryReceiveEndpointConfigurator configurator) + { + _first = Handler(configurator, async context => + { + _firstActivityId = Activity.Current?.Id; + await context.ScheduleSend(TimeSpan.FromSeconds(5), new SecondMessage()); + }); + + _second = Handler(configurator, async context => + { + _secondActivityId = Activity.Current?.Id; + }); + } + + + public class FirstMessage + { + } + + + public class SecondMessage + { + } + } + + [TestFixture] + public class ScheduleMessageUsingNewtonsoftJson_Specs : + HangfireInMemoryTestFixture + { + [Test] + public async Task Should_get_both_messages() + { + await Scheduler.ScheduleSend(InputQueueAddress, DateTime.Now, new FirstMessage()); + + await _first; + + await _second; + + if (_secondActivityId != null && _firstActivityId != null) + Assert.That(_secondActivityId.StartsWith(_firstActivityId), Is.True); + } + + Task> _second; + Task> _first; + string _firstActivityId; + string _secondActivityId; + + protected override void ConfigureInMemoryBus(IInMemoryBusFactoryConfigurator configurator) + { + configurator.UseNewtonsoftRawJsonSerializer(RawSerializerOptions.CopyHeaders | RawSerializerOptions.AddTransportHeaders); + base.ConfigureInMemoryBus(configurator); + } + + protected override void ConfigureInMemoryReceiveEndpoint(IInMemoryReceiveEndpointConfigurator configurator) + { + _first = Handler(configurator, async context => + { + _firstActivityId = Activity.Current?.Id; + await context.ScheduleSend(TimeSpan.FromSeconds(5), new SecondMessage()); + }); + + _second = Handler(configurator, async context => + { + _secondActivityId = Activity.Current?.Id; + }); + } + + + public class FirstMessage + { + } + + + public class SecondMessage + { + } + } + [TestFixture] public class ScheduleMessageBson_Specs : diff --git a/tests/MassTransit.QuartzIntegration.Tests/ScheduleMessage_Specs.cs b/tests/MassTransit.QuartzIntegration.Tests/ScheduleMessage_Specs.cs index 9d445f4c720..84d62b45aad 100644 --- a/tests/MassTransit.QuartzIntegration.Tests/ScheduleMessage_Specs.cs +++ b/tests/MassTransit.QuartzIntegration.Tests/ScheduleMessage_Specs.cs @@ -58,6 +58,117 @@ public class SecondMessage } + [TestFixture] + public class ScheduleMessageUsingRawJson_Specs : + QuartzInMemoryTestFixture + { + [Test] + public async Task Should_get_both_messages() + { + await Scheduler.ScheduleSend(InputQueueAddress, DateTime.Now, new FirstMessage()); + + await _first; + + await AdvanceTime(TimeSpan.FromSeconds(10)); + + await _second; + + if (_secondActivityId != null && _firstActivityId != null) + Assert.That(_secondActivityId.StartsWith(_firstActivityId), Is.True); + } + + Task> _second; + Task> _first; + string _firstActivityId; + string _secondActivityId; + + protected override void ConfigureInMemoryBus(IInMemoryBusFactoryConfigurator configurator) + { + configurator.UseRawJsonSerializer(RawSerializerOptions.CopyHeaders | RawSerializerOptions.AddTransportHeaders); + base.ConfigureInMemoryBus(configurator); + } + + protected override void ConfigureInMemoryReceiveEndpoint(IInMemoryReceiveEndpointConfigurator configurator) + { + _first = Handler(configurator, async context => + { + _firstActivityId = Activity.Current?.Id; + await context.ScheduleSend(TimeSpan.FromSeconds(10), new SecondMessage()); + }); + + _second = Handler(configurator, async context => + { + _secondActivityId = Activity.Current?.Id; + }); + } + + + public class FirstMessage + { + } + + + public class SecondMessage + { + } + } + + [TestFixture] + public class ScheduleMessageUsingNewtonsoftRawJson_Specs : + QuartzInMemoryTestFixture + { + [Test] + public async Task Should_get_both_messages() + { + await Scheduler.ScheduleSend(InputQueueAddress, DateTime.Now, new FirstMessage()); + + await _first; + + await AdvanceTime(TimeSpan.FromSeconds(10)); + + await _second; + + if (_secondActivityId != null && _firstActivityId != null) + Assert.That(_secondActivityId.StartsWith(_firstActivityId), Is.True); + } + + Task> _second; + Task> _first; + string _firstActivityId; + string _secondActivityId; + + protected override void ConfigureInMemoryBus(IInMemoryBusFactoryConfigurator configurator) + { + configurator.UseNewtonsoftRawJsonSerializer(RawSerializerOptions.CopyHeaders | RawSerializerOptions.AddTransportHeaders); + base.ConfigureInMemoryBus(configurator); + } + + protected override void ConfigureInMemoryReceiveEndpoint(IInMemoryReceiveEndpointConfigurator configurator) + { + _first = Handler(configurator, async context => + { + _firstActivityId = Activity.Current?.Id; + await context.ScheduleSend(TimeSpan.FromSeconds(10), new SecondMessage()); + }); + + _second = Handler(configurator, async context => + { + _secondActivityId = Activity.Current?.Id; + }); + } + + + public class FirstMessage + { + } + + + public class SecondMessage + { + } + } + + [TestFixture] public class ScheduleMessageUsingBson_Specs : QuartzInMemoryTestFixture diff --git a/tests/MassTransit.Tests/Serialization/JobDeserialization_Specs.cs b/tests/MassTransit.Tests/Serialization/JobDeserialization_Specs.cs index 6a5f36cbd12..59afcf30441 100644 --- a/tests/MassTransit.Tests/Serialization/JobDeserialization_Specs.cs +++ b/tests/MassTransit.Tests/Serialization/JobDeserialization_Specs.cs @@ -71,7 +71,7 @@ protected async Task> GetConsumeContext(object values) { var bytes = Serialize((await MessageInitializerCache.Initialize(values)).Message); - var message = new InMemoryTransportMessage(NewId.NextGuid(), bytes, Serializer.ContentType.MediaType, TypeCache.ShortName); + var message = new InMemoryTransportMessage(NewId.NextGuid(), bytes, Serializer.ContentType.MediaType); var receiveContext = new InMemoryReceiveContext(message, TestConsumeContext.GetContext()); var consumeContext = Deserializer.Deserialize(receiveContext); diff --git a/tests/MassTransit.Tests/Serialization/Performance_Specs.cs b/tests/MassTransit.Tests/Serialization/Performance_Specs.cs index b4b862dcf1c..a32c928194d 100644 --- a/tests/MassTransit.Tests/Serialization/Performance_Specs.cs +++ b/tests/MassTransit.Tests/Serialization/Performance_Specs.cs @@ -44,8 +44,7 @@ public void Just_how_fast_are_you() { byte[] data = Serialize(sendContext); - var transportMessage = new InMemoryTransportMessage(Guid.NewGuid(), data, Serializer.ContentType.MediaType, - TypeCache.ShortName); + var transportMessage = new InMemoryTransportMessage(Guid.NewGuid(), data, Serializer.ContentType.MediaType); receiveContext = new InMemoryReceiveContext(transportMessage, TestConsumeContext.GetContext()); Deserialize(receiveContext); diff --git a/tests/MassTransit.Tests/Serialization/SerializationTest.cs b/tests/MassTransit.Tests/Serialization/SerializationTest.cs index 668c55d409c..706b0a58863 100644 --- a/tests/MassTransit.Tests/Serialization/SerializationTest.cs +++ b/tests/MassTransit.Tests/Serialization/SerializationTest.cs @@ -132,7 +132,7 @@ protected byte[] Serialize(T obj) protected T Return(byte[] serializedMessageData) where T : class { - var message = new InMemoryTransportMessage(Guid.NewGuid(), serializedMessageData, Serializer.ContentType.MediaType, TypeCache.ShortName); + var message = new InMemoryTransportMessage(Guid.NewGuid(), serializedMessageData, Serializer.ContentType.MediaType); var receiveContext = new InMemoryReceiveContext(message, TestConsumeContext.GetContext()); var consumeContext = Deserializer.Deserialize(receiveContext);