From 11e22d5000c6e397519c6e9c0d0a5a9d5172b974 Mon Sep 17 00:00:00 2001 From: "Reuben J. Sonnenberg" Date: Thu, 8 Jun 2023 11:15:13 -0800 Subject: [PATCH] Add Protocol Buffer serialization support --- Directory.Packages.props | 1 + MassTransit.sln | 59 +++-- .../ProtobufConfigurationExtensions.cs | 28 +++ .../MassTransit.Protobuf.csproj | 32 +++ .../MassTransit.Protobuf.csproj.DotSettings | 2 + .../NullableAttributes.cs | 24 ++ .../Serialization/ProtoMessageEnvelope.cs | 49 +++++ .../ProtobufBodyMessageSerializer.cs | 42 ++++ .../Serialization/ProtobufHostInfo.cs | 34 +++ .../Serialization/ProtobufMessageBody.cs | 63 ++++++ .../ProtobufMessageDeserializer.cs | 60 +++++ .../Serialization/ProtobufMessageEnvelope.cs | 208 ++++++++++++++++++ .../ProtobufMessageSerializer.cs | 27 +++ .../ProtobufObjectDeserializer.cs | 82 +++++++ .../ProtobufSerializerContext.cs | 128 +++++++++++ .../ProtobufSerializerFactory.cs | 36 +++ 16 files changed, 853 insertions(+), 22 deletions(-) create mode 100644 src/MassTransit.Protobuf/Configuration/ProtobufConfigurationExtensions.cs create mode 100644 src/MassTransit.Protobuf/MassTransit.Protobuf.csproj create mode 100644 src/MassTransit.Protobuf/MassTransit.Protobuf.csproj.DotSettings create mode 100644 src/MassTransit.Protobuf/NullableAttributes.cs create mode 100644 src/MassTransit.Protobuf/Serialization/ProtoMessageEnvelope.cs create mode 100644 src/MassTransit.Protobuf/Serialization/ProtobufBodyMessageSerializer.cs create mode 100644 src/MassTransit.Protobuf/Serialization/ProtobufHostInfo.cs create mode 100644 src/MassTransit.Protobuf/Serialization/ProtobufMessageBody.cs create mode 100644 src/MassTransit.Protobuf/Serialization/ProtobufMessageDeserializer.cs create mode 100644 src/MassTransit.Protobuf/Serialization/ProtobufMessageEnvelope.cs create mode 100644 src/MassTransit.Protobuf/Serialization/ProtobufMessageSerializer.cs create mode 100644 src/MassTransit.Protobuf/Serialization/ProtobufObjectDeserializer.cs create mode 100644 src/MassTransit.Protobuf/Serialization/ProtobufSerializerContext.cs create mode 100644 src/MassTransit.Protobuf/Serialization/ProtobufSerializerFactory.cs diff --git a/Directory.Packages.props b/Directory.Packages.props index 1eaae0ad485..68b3579e632 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -70,6 +70,7 @@ + diff --git a/MassTransit.sln b/MassTransit.sln index e369a77db02..4e896e61b08 100644 --- a/MassTransit.sln +++ b/MassTransit.sln @@ -1,7 +1,7 @@  Microsoft Visual Studio Solution File, Format Version 12.00 -# Visual Studio Version 16 -VisualStudioVersion = 16.0.29613.14 +# Visual Studio Version 17 +VisualStudioVersion = 17.6.33723.286 MinimumVisualStudioVersion = 10.0.40219.1 Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Persistence", "Persistence", "{56F516D7-BC3C-49E1-A639-83C5F14953F8}" EndProject @@ -25,12 +25,12 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MassTransit.Containers.Test EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = ".solution", ".solution", "{2C8A15FA-A445-4916-AAC2-3BE53AA247A7}" ProjectSection(SolutionItems) = preProject + .github\workflows\build.yml = .github\workflows\build.yml Directory.Build.props = Directory.Build.props + Directory.Packages.props = Directory.Packages.props + NuGet.README.md = NuGet.README.md README.md = README.md signing.props = signing.props - .github\workflows\build.yml = .github\workflows\build.yml - NuGet.README.md = NuGet.README.md - Directory.Packages.props = Directory.Packages.props EndProjectSection EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Scheduling", "Scheduling", "{FDC1A760-D4E5-44F4-B0D9-C19F2E14253A}" @@ -121,37 +121,39 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MassTransit.EventHubIntegra EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MassTransit.EventHubIntegration.Tests", "tests\MassTransit.EventHubIntegration.Tests\MassTransit.EventHubIntegration.Tests.csproj", "{24CC1B47-AE01-4871-A939-2BF6EB789860}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MassTransit.Azure.Table", "src\Persistence\MassTransit.Azure.Table\MassTransit.Azure.Table.csproj", "{338E2B7E-4301-4547-9172-4415F739C029}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MassTransit.Azure.Table", "src\Persistence\MassTransit.Azure.Table\MassTransit.Azure.Table.csproj", "{338E2B7E-4301-4547-9172-4415F739C029}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MassTransit.Azure.Table.Tests", "tests\MassTransit.Azure.Table.Tests\MassTransit.Azure.Table.Tests.csproj", "{17E04A9B-5DB5-4857-99CD-E9AF22DA00E4}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MassTransit.Azure.Table.Tests", "tests\MassTransit.Azure.Table.Tests\MassTransit.Azure.Table.Tests.csproj", "{17E04A9B-5DB5-4857-99CD-E9AF22DA00E4}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MassTransit.GrpcTransport", "src\Transports\MassTransit.GrpcTransport\MassTransit.GrpcTransport.csproj", "{AF56509B-0B95-4ACE-9CFC-F9A79756C357}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MassTransit.GrpcTransport", "src\Transports\MassTransit.GrpcTransport\MassTransit.GrpcTransport.csproj", "{AF56509B-0B95-4ACE-9CFC-F9A79756C357}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MassTransit.GrpcTransport.Tests", "tests\MassTransit.GrpcTransport.Tests\MassTransit.GrpcTransport.Tests.csproj", "{082488F6-6322-462A-85D7-9E893A67B8CD}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MassTransit.GrpcTransport.Tests", "tests\MassTransit.GrpcTransport.Tests\MassTransit.GrpcTransport.Tests.csproj", "{082488F6-6322-462A-85D7-9E893A67B8CD}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MassTransit.Transports.Tests", "tests\MassTransit.Transports.Tests\MassTransit.Transports.Tests.csproj", "{7632EDAF-29A1-4E69-952F-C75D3BF34B3B}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MassTransit.Transports.Tests", "tests\MassTransit.Transports.Tests\MassTransit.Transports.Tests.csproj", "{7632EDAF-29A1-4E69-952F-C75D3BF34B3B}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MassTransit.Abstractions", "src\MassTransit.Abstractions\MassTransit.Abstractions.csproj", "{2CAF0C51-2F64-4EB0-8E27-AE3E0085CA75}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MassTransit.Abstractions", "src\MassTransit.Abstractions\MassTransit.Abstractions.csproj", "{2CAF0C51-2F64-4EB0-8E27-AE3E0085CA75}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MassTransit.StateMachineVisualizer", "src\MassTransit.StateMachineVisualizer\MassTransit.StateMachineVisualizer.csproj", "{8CD4298E-962B-4D04-821D-274343099E83}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MassTransit.StateMachineVisualizer", "src\MassTransit.StateMachineVisualizer\MassTransit.StateMachineVisualizer.csproj", "{8CD4298E-962B-4D04-821D-274343099E83}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MassTransit.Abstractions.Tests", "tests\MassTransit.Abstractions.Tests\MassTransit.Abstractions.Tests.csproj", "{AB188378-1BAC-4ECB-98A7-91E12C861381}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MassTransit.Abstractions.Tests", "tests\MassTransit.Abstractions.Tests\MassTransit.Abstractions.Tests.csproj", "{AB188378-1BAC-4ECB-98A7-91E12C861381}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MassTransit.BenchmarkConsole", "tests\MassTransit.BenchmarkConsole\MassTransit.BenchmarkConsole.csproj", "{2B6ACCF2-0CAF-4152-901F-0048390971E4}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MassTransit.BenchmarkConsole", "tests\MassTransit.BenchmarkConsole\MassTransit.BenchmarkConsole.csproj", "{2B6ACCF2-0CAF-4152-901F-0048390971E4}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MassTransit.Benchmark", "tests\MassTransit.Benchmark\MassTransit.Benchmark.csproj", "{667E52D5-E1D9-49EE-B364-3CB7E43EE160}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MassTransit.Benchmark", "tests\MassTransit.Benchmark\MassTransit.Benchmark.csproj", "{667E52D5-E1D9-49EE-B364-3CB7E43EE160}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MassTransit.Newtonsoft", "src\MassTransit.Newtonsoft\MassTransit.Newtonsoft.csproj", "{388085C8-1BC8-48F8-8EA2-3698FCB385E5}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MassTransit.Newtonsoft", "src\MassTransit.Newtonsoft\MassTransit.Newtonsoft.csproj", "{388085C8-1BC8-48F8-8EA2-3698FCB385E5}" EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Benchmarking", "Benchmarking", "{BF384860-70ED-47F0-B276-13D2DA9ECD87}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MassTransit.DynamoDbIntegration", "src\Persistence\MassTransit.DynamoDbIntegration\MassTransit.DynamoDbIntegration.csproj", "{186D6491-ECCD-49EB-8E99-AFD7AD6037D2}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MassTransit.DynamoDbIntegration", "src\Persistence\MassTransit.DynamoDbIntegration\MassTransit.DynamoDbIntegration.csproj", "{186D6491-ECCD-49EB-8E99-AFD7AD6037D2}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MassTransit.DynamoDbIntegration.Tests", "tests\MassTransit.DynamoDbIntegration.Tests\MassTransit.DynamoDbIntegration.Tests.csproj", "{694E06CF-2842-4E71-8CD2-81FA7C1B3D13}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MassTransit.DynamoDbIntegration.Tests", "tests\MassTransit.DynamoDbIntegration.Tests\MassTransit.DynamoDbIntegration.Tests.csproj", "{694E06CF-2842-4E71-8CD2-81FA7C1B3D13}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MassTransit.AmazonS3", "src\Persistence\MassTransit.AmazonS3\MassTransit.AmazonS3.csproj", "{86905425-64C4-4EB0-8884-F2BD27782310}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MassTransit.AmazonS3", "src\Persistence\MassTransit.AmazonS3\MassTransit.AmazonS3.csproj", "{86905425-64C4-4EB0-8884-F2BD27782310}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MassTransit.AmazonS3.Tests", "tests\MassTransit.AmazonS3.Tests\MassTransit.AmazonS3.Tests.csproj", "{6E090598-2FED-41B9-9D4C-E27732985C61}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MassTransit.AmazonS3.Tests", "tests\MassTransit.AmazonS3.Tests\MassTransit.AmazonS3.Tests.csproj", "{6E090598-2FED-41B9-9D4C-E27732985C61}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MassTransit.Protobuf", "src\MassTransit.Protobuf\MassTransit.Protobuf.csproj", "{A8A192A6-283B-4808-B08B-DF7769B5950F}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -852,6 +854,18 @@ Global {6E090598-2FED-41B9-9D4C-E27732985C61}.ReleaseUnsigned|Any CPU.Build.0 = Debug|Any CPU {6E090598-2FED-41B9-9D4C-E27732985C61}.ReleaseUnsigned|x86.ActiveCfg = Debug|Any CPU {6E090598-2FED-41B9-9D4C-E27732985C61}.ReleaseUnsigned|x86.Build.0 = Debug|Any CPU + {A8A192A6-283B-4808-B08B-DF7769B5950F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {A8A192A6-283B-4808-B08B-DF7769B5950F}.Debug|Any CPU.Build.0 = Debug|Any CPU + {A8A192A6-283B-4808-B08B-DF7769B5950F}.Debug|x86.ActiveCfg = Debug|Any CPU + {A8A192A6-283B-4808-B08B-DF7769B5950F}.Debug|x86.Build.0 = Debug|Any CPU + {A8A192A6-283B-4808-B08B-DF7769B5950F}.Release|Any CPU.ActiveCfg = Release|Any CPU + {A8A192A6-283B-4808-B08B-DF7769B5950F}.Release|Any CPU.Build.0 = Release|Any CPU + {A8A192A6-283B-4808-B08B-DF7769B5950F}.Release|x86.ActiveCfg = Release|Any CPU + {A8A192A6-283B-4808-B08B-DF7769B5950F}.Release|x86.Build.0 = Release|Any CPU + {A8A192A6-283B-4808-B08B-DF7769B5950F}.ReleaseUnsigned|Any CPU.ActiveCfg = Release|Any CPU + {A8A192A6-283B-4808-B08B-DF7769B5950F}.ReleaseUnsigned|Any CPU.Build.0 = Release|Any CPU + {A8A192A6-283B-4808-B08B-DF7769B5950F}.ReleaseUnsigned|x86.ActiveCfg = Release|Any CPU + {A8A192A6-283B-4808-B08B-DF7769B5950F}.ReleaseUnsigned|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -905,13 +919,14 @@ Global {AF56509B-0B95-4ACE-9CFC-F9A79756C357} = {0006D6BB-1382-4B32-AD32-CA037F5CD4F6} {082488F6-6322-462A-85D7-9E893A67B8CD} = {0006D6BB-1382-4B32-AD32-CA037F5CD4F6} {7632EDAF-29A1-4E69-952F-C75D3BF34B3B} = {0006D6BB-1382-4B32-AD32-CA037F5CD4F6} - {388085C8-1BC8-48F8-8EA2-3698FCB385E5} = {4F40E08B-7C24-4D2A-8476-B7F93D0A2910} - {667E52D5-E1D9-49EE-B364-3CB7E43EE160} = {BF384860-70ED-47F0-B276-13D2DA9ECD87} {2B6ACCF2-0CAF-4152-901F-0048390971E4} = {BF384860-70ED-47F0-B276-13D2DA9ECD87} + {667E52D5-E1D9-49EE-B364-3CB7E43EE160} = {BF384860-70ED-47F0-B276-13D2DA9ECD87} + {388085C8-1BC8-48F8-8EA2-3698FCB385E5} = {4F40E08B-7C24-4D2A-8476-B7F93D0A2910} {186D6491-ECCD-49EB-8E99-AFD7AD6037D2} = {56F516D7-BC3C-49E1-A639-83C5F14953F8} {694E06CF-2842-4E71-8CD2-81FA7C1B3D13} = {56F516D7-BC3C-49E1-A639-83C5F14953F8} {86905425-64C4-4EB0-8884-F2BD27782310} = {56F516D7-BC3C-49E1-A639-83C5F14953F8} {6E090598-2FED-41B9-9D4C-E27732985C61} = {56F516D7-BC3C-49E1-A639-83C5F14953F8} + {A8A192A6-283B-4808-B08B-DF7769B5950F} = {4F40E08B-7C24-4D2A-8476-B7F93D0A2910} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {43D3A7D5-0945-435E-8D03-1E631E5CDBA8} diff --git a/src/MassTransit.Protobuf/Configuration/ProtobufConfigurationExtensions.cs b/src/MassTransit.Protobuf/Configuration/ProtobufConfigurationExtensions.cs new file mode 100644 index 00000000000..f4ccc06fc49 --- /dev/null +++ b/src/MassTransit.Protobuf/Configuration/ProtobufConfigurationExtensions.cs @@ -0,0 +1,28 @@ +namespace MassTransit.Configuration +{ + using MassTransit.Serialization; + + public static class ProtobufConfigurationExtensions + { + /// + /// Registers the Protobuf serializer with the bus, using the default Protobuf message contract. + /// + public static void UseProtobufSerializer(this IBusFactoryConfigurator configurator) + { + var factory = new ProtobufSerializerFactory(); + + configurator.AddSerializer(factory); + } + + /// + /// Register the Protobuf deserializer for a specific message type on the receive endpoint. + /// + public static void UseProtobufDeserializer(this IReceiveEndpointConfigurator configurator, bool isDefault = true) + where TProtoMessage : class + { + var factory = new ProtobufSerializerFactory(); + + configurator.AddDeserializer(factory, isDefault); + } + } +} diff --git a/src/MassTransit.Protobuf/MassTransit.Protobuf.csproj b/src/MassTransit.Protobuf/MassTransit.Protobuf.csproj new file mode 100644 index 00000000000..035bb543e77 --- /dev/null +++ b/src/MassTransit.Protobuf/MassTransit.Protobuf.csproj @@ -0,0 +1,32 @@ + + + + + netstandard2.0;net6.0 + MassTransit + + + + $(TargetFrameworks);net462 + + + + MassTransit.Protobuf + MassTransit.Protobuf + MassTransit;Protobuf + MassTransit Protobuf support; $(Description) + 1.0.0-alpha.38 + Copyright 2023 - Reuben Sonnenberg + True + snupkg + + + + + + + + + + + diff --git a/src/MassTransit.Protobuf/MassTransit.Protobuf.csproj.DotSettings b/src/MassTransit.Protobuf/MassTransit.Protobuf.csproj.DotSettings new file mode 100644 index 00000000000..6840e166805 --- /dev/null +++ b/src/MassTransit.Protobuf/MassTransit.Protobuf.csproj.DotSettings @@ -0,0 +1,2 @@ + + True \ No newline at end of file diff --git a/src/MassTransit.Protobuf/NullableAttributes.cs b/src/MassTransit.Protobuf/NullableAttributes.cs new file mode 100644 index 00000000000..3f38561b675 --- /dev/null +++ b/src/MassTransit.Protobuf/NullableAttributes.cs @@ -0,0 +1,24 @@ +#if !NET6_0_OR_GREATER && !NETSTANDARD2_1 +namespace System.Diagnostics.CodeAnalysis +{ + using System; + + + [AttributeUsage(AttributeTargets.Parameter)] + sealed class NotNullWhenAttribute : + Attribute + { + /// Initializes the attribute with the specified return value condition. + /// + /// The return value condition. If the method returns this value, the associated parameter will not be null. + /// + public NotNullWhenAttribute(bool returnValue) + { + ReturnValue = returnValue; + } + + /// Gets the return value condition. + public bool ReturnValue { get; } + } +} +#endif diff --git a/src/MassTransit.Protobuf/Serialization/ProtoMessageEnvelope.cs b/src/MassTransit.Protobuf/Serialization/ProtoMessageEnvelope.cs new file mode 100644 index 00000000000..63d0aadc41c --- /dev/null +++ b/src/MassTransit.Protobuf/Serialization/ProtoMessageEnvelope.cs @@ -0,0 +1,49 @@ +#nullable enable +namespace MassTransit.Serialization +{ + using System; + using System.Collections.Generic; + using System.Linq; + using MassTransit; + using Metadata; + using ProtoBuf; + + [Serializable] + public class ProtoMessageEnvelope : MessageEnvelope + where TProtoMessage : class + { + public string? MessageId { get; set; } + public string? RequestId { get; set; } + public string? CorrelationId { get; set; } + public string? ConversationId { get; set; } + public string? InitiatorId { get; set; } + public string? SourceAddress { get; set; } + public string? DestinationAddress { get; set; } + public string? ResponseAddress { get; set; } + public string? FaultAddress { get; set; } + public string[]? MessageType { get; set; } + public object? Message { get; set; } + public DateTime? ExpirationTime { get; set; } + public DateTime? SentTime { get; set; } + public HostInfo? Host { get; set; } + public Dictionary? Headers { get; } + + public ProtoMessageEnvelope(ProtobufMessageEnvelope envelope) + { + MessageId = envelope.MessageId; + RequestId = envelope.RequestId; + CorrelationId = envelope.CorrelationId; + ConversationId = envelope.ConversationId; + InitiatorId = envelope.InitiatorId; + SourceAddress = envelope.SourceAddress; + DestinationAddress = envelope.DestinationAddress; + ResponseAddress = envelope.ResponseAddress; + FaultAddress = envelope.FaultAddress; + MessageType = envelope.MessageType; + Message = envelope.Message; + ExpirationTime = envelope.ExpirationTime; + SentTime = envelope.SentTime; + Headers = envelope.Headers; + } + } +} diff --git a/src/MassTransit.Protobuf/Serialization/ProtobufBodyMessageSerializer.cs b/src/MassTransit.Protobuf/Serialization/ProtobufBodyMessageSerializer.cs new file mode 100644 index 00000000000..47a0683866e --- /dev/null +++ b/src/MassTransit.Protobuf/Serialization/ProtobufBodyMessageSerializer.cs @@ -0,0 +1,42 @@ +#nullable enable +namespace MassTransit.Serialization +{ + using System.IO; + using System.Net.Mime; + using ProtoBuf.Meta; + + public class ProtobufBodyMessageSerializer : IMessageSerializer + { + private readonly ContentType _contentType; + private readonly ProtobufMessageEnvelope _envelope; + private readonly RuntimeTypeModel _typeModel; + + public ProtobufBodyMessageSerializer(MessageEnvelope envelope, ContentType contentType, RuntimeTypeModel typeModel) + { + _envelope = new ProtobufMessageEnvelope(envelope); + _contentType = contentType; + _typeModel = typeModel; + } + + public ContentType ContentType => _contentType; + + public MessageBody GetMessageBody(SendContext context) + where T : class + { + _envelope.Update(context); + + return new ProtobufMessageBody(context, _typeModel, _envelope as ProtobufMessageEnvelope); + } + + public void Overlay(object message) + { + using (var stream = new MemoryStream()) + { + _typeModel.Serialize(stream, message); + stream.Position = 0; + var overlayMessage = _typeModel.Deserialize(_envelope.Message?.GetType(), stream); + _envelope.Message = overlayMessage; + } + } + } +} diff --git a/src/MassTransit.Protobuf/Serialization/ProtobufHostInfo.cs b/src/MassTransit.Protobuf/Serialization/ProtobufHostInfo.cs new file mode 100644 index 00000000000..fd37913ef07 --- /dev/null +++ b/src/MassTransit.Protobuf/Serialization/ProtobufHostInfo.cs @@ -0,0 +1,34 @@ +using ProtoBuf; + +namespace MassTransit.Serialization +{ +#nullable enable + [ProtoContract] + [ProtoInclude(100, typeof(HostInfo))] + public class ProtobufHostInfo : HostInfo + { + [ProtoMember(1)] + public string? MachineName { set; get; } + + [ProtoMember(2)] + public string? ProcessName { set; get; } + + [ProtoMember(3)] + public int ProcessId { set; get; } + + [ProtoMember(4)] + public string? Assembly { set; get; } + + [ProtoMember(5)] + public string? AssemblyVersion { set; get; } + + [ProtoMember(6)] + public string? FrameworkVersion { set; get; } + + [ProtoMember(7)] + public string? MassTransitVersion { set; get; } + + [ProtoMember(8)] + public string? OperatingSystemVersion { set; get; } + } +} diff --git a/src/MassTransit.Protobuf/Serialization/ProtobufMessageBody.cs b/src/MassTransit.Protobuf/Serialization/ProtobufMessageBody.cs new file mode 100644 index 00000000000..41b56f2a882 --- /dev/null +++ b/src/MassTransit.Protobuf/Serialization/ProtobufMessageBody.cs @@ -0,0 +1,63 @@ +#nullable enable +namespace MassTransit.Serialization +{ + using ProtoBuf.Meta; + using System; + using System.IO; + using System.Runtime.Serialization; + using System.Text; + public class ProtobufMessageBody : MessageBody where TProtoMessage : class + { + readonly SendContext _context; + byte[]? _bytes; + ProtobufMessageEnvelope? _envelope; + string? _string; + readonly RuntimeTypeModel _typeModel; + + public ProtobufMessageBody(SendContext context, RuntimeTypeModel typeModel, ProtobufMessageEnvelope? envelope = null) + { + _context = context; + _typeModel = typeModel; + _envelope = envelope; + } + + public long? Length => _bytes?.Length; + + public byte[] GetBytes() + { + if (_bytes != null) + return _bytes; + + try + { + var envelope = _envelope ??= new ProtobufMessageEnvelope(_context, _context.Message, MessageTypeCache.MessageTypeNames); + using var stream = new MemoryStream(); + _typeModel.Serialize(stream, envelope); + _bytes = stream.ToArray(); + return _bytes; + } + catch (SerializationException) + { + throw; + } + catch (Exception ex) + { + throw new SerializationException("Failed to serialize the message", ex); + } + } + + public Stream GetStream() + { + return new MemoryStream(GetBytes(), false); + } + + public string GetString() + { + if (_string != null) + return _string; + + _string = Encoding.UTF8.GetString(GetBytes()); + return _string; + } + } +} diff --git a/src/MassTransit.Protobuf/Serialization/ProtobufMessageDeserializer.cs b/src/MassTransit.Protobuf/Serialization/ProtobufMessageDeserializer.cs new file mode 100644 index 00000000000..23af66cdc37 --- /dev/null +++ b/src/MassTransit.Protobuf/Serialization/ProtobufMessageDeserializer.cs @@ -0,0 +1,60 @@ +#nullable enable +namespace MassTransit.Serialization +{ + using System; + using System.Net.Mime; + using System.Runtime.Serialization; + using ProtoBuf.Meta; + + public class ProtobufMessageDeserializer : IMessageDeserializer + where TProtoMessage : class + { + private readonly RuntimeTypeModel _typeModel; + private readonly IObjectDeserializer _objectDeserializer; + + public ProtobufMessageDeserializer(RuntimeTypeModel typeModel) + { + _typeModel = typeModel; + _objectDeserializer = new ProtobufObjectDeserializer(typeModel); + } + + public void Probe(ProbeContext context) + { + var scope = context.CreateScope("protobuf"); + scope.Add("contentType", ContentType.MediaType); + } + + public ContentType ContentType => new ContentType("application/vnd.masstransit+pbuf"); + + public ConsumeContext Deserialize(ReceiveContext receiveContext) + { + return new BodyConsumeContext(receiveContext, Deserialize(receiveContext.Body, receiveContext.TransportHeaders, receiveContext.InputAddress)); + } + + public SerializerContext Deserialize(MessageBody body, Headers headers, Uri? destinationAddress = null) + { + try + { + using var stream = body.GetStream(); + var envelope = _typeModel.Deserialize>(stream); + + return envelope == null + ? throw new SerializationException("The message envelope was not found.") + : (SerializerContext)new ProtobufSerializerContext(_typeModel, _objectDeserializer, new ProtoMessageEnvelope(envelope), ContentType); + } + catch (SerializationException) + { + throw; + } + catch (Exception ex) + { + throw new SerializationException("An exception occurred while deserializing the message envelope", ex); + } + } + + public MessageBody GetMessageBody(string text) + { + throw new NotSupportedException("ProtobufMessageDeserializer does not support deserializing from text."); + } + } +} diff --git a/src/MassTransit.Protobuf/Serialization/ProtobufMessageEnvelope.cs b/src/MassTransit.Protobuf/Serialization/ProtobufMessageEnvelope.cs new file mode 100644 index 00000000000..5ddb04056a6 --- /dev/null +++ b/src/MassTransit.Protobuf/Serialization/ProtobufMessageEnvelope.cs @@ -0,0 +1,208 @@ +#nullable enable +namespace MassTransit.Serialization +{ + using System; + using System.Collections.Generic; + using System.Linq; + using MassTransit; + using Metadata; + using ProtoBuf; + + [Serializable] + [ProtoContract] + [ProtoInclude(100, typeof(HostInfo))] + [ProtoInclude(101, typeof(ProtobufHostInfo))] + public class ProtobufMessageEnvelope + where TProtoMessage : class + { + [ProtoMember(1)] + public string? MessageId { get; set; } + [ProtoMember(2)] + public string? RequestId { get; set; } + [ProtoMember(3)] + public string? CorrelationId { get; set; } + [ProtoMember(4)] + public string? ConversationId { get; set; } + [ProtoMember(5)] + public string? InitiatorId { get; set; } + [ProtoMember(6)] + public string? SourceAddress { get; set; } + [ProtoMember(7)] + public string? DestinationAddress { get; set; } + [ProtoMember(8)] + public string? ResponseAddress { get; set; } + [ProtoMember(9)] + public string? FaultAddress { get; set; } + [ProtoMember(10)] + public string[]? MessageType { get; set; } + [ProtoMember(11)] + public TProtoMessage? _message; + [ProtoMember(12)] + public DateTime? ExpirationTime { get; set; } + [ProtoMember(13)] + public DateTime? SentTime { get; set; } + [ProtoMember(14)] + public ProtobufHostInfo? _host; + + [ProtoMember(15)] + Dictionary? _headers { get; set; } + + public HostInfo? Host + { + get => _host; + set => _host = ToProtobufHostInfo(value!); + } + + public object? Message + { + get => _message as object; + set => _message = (TProtoMessage?)value; + } + + public Dictionary? Headers + { + get => _headers?.ToDictionary(kv => kv.Key, kv => kv.Value as object); + set => _headers = value?.ToDictionary(kv => kv.Key, kv => kv.Value as TProtoMessage); + } + + private ProtobufHostInfo ToProtobufHostInfo(HostInfo hostInfo) + { + return new ProtobufHostInfo() + { + MachineName = hostInfo.MachineName, + ProcessName = hostInfo.ProcessName, + ProcessId = hostInfo.ProcessId, + Assembly = hostInfo.Assembly, + AssemblyVersion = hostInfo.AssemblyVersion, + FrameworkVersion = hostInfo.FrameworkVersion, + MassTransitVersion = hostInfo.MassTransitVersion, + OperatingSystemVersion = hostInfo.OperatingSystemVersion + }; + } + + public ProtobufMessageEnvelope() + { + } + + public ProtobufMessageEnvelope(SendContext context, TProtoMessage message, string[] messageTypeNames) + { + if (context.MessageId.HasValue) + MessageId = context.MessageId.Value.ToString(); + if (context.RequestId.HasValue) + RequestId = context.RequestId.Value.ToString(); + if (context.CorrelationId.HasValue) + CorrelationId = context.CorrelationId.Value.ToString(); + if (context.ConversationId.HasValue) + ConversationId = context.ConversationId.Value.ToString(); + if (context.InitiatorId.HasValue) + InitiatorId = context.InitiatorId.Value.ToString(); + if (context.SourceAddress != null) + SourceAddress = context.SourceAddress.ToString(); + if (context.DestinationAddress != null) + DestinationAddress = context.DestinationAddress.ToString(); + if (context.ResponseAddress != null) + ResponseAddress = context.ResponseAddress.ToString(); + if (context.FaultAddress != null) + FaultAddress = context.FaultAddress.ToString(); + MessageType = messageTypeNames; + Message = message as TProtoMessage; + if (context.TimeToLive.HasValue) + ExpirationTime = DateTime.UtcNow + context.TimeToLive; + SentTime = context.SentTime ?? DateTime.UtcNow; + Headers = new Dictionary(); + foreach (KeyValuePair header in context.Headers.GetAll()) + Headers[header.Key] = header.Value as TProtoMessage; + Host = HostMetadataCache.Host; + } + + public ProtobufMessageEnvelope(MessageContext context, TProtoMessage message, string[] messageTypeNames) + { + if (context.MessageId.HasValue) + MessageId = context.MessageId.Value.ToString(); + if (context.RequestId.HasValue) + RequestId = context.RequestId.Value.ToString(); + if (context.CorrelationId.HasValue) + CorrelationId = context.CorrelationId.Value.ToString(); + if (context.ConversationId.HasValue) + ConversationId = context.ConversationId.Value.ToString(); + if (context.InitiatorId.HasValue) + InitiatorId = context.InitiatorId.Value.ToString(); + if (context.SourceAddress != null) + SourceAddress = context.SourceAddress.ToString(); + if (context.DestinationAddress != null) + DestinationAddress = context.DestinationAddress.ToString(); + if (context.ResponseAddress != null) + ResponseAddress = context.ResponseAddress.ToString(); + if (context.FaultAddress != null) + FaultAddress = context.FaultAddress.ToString(); + MessageType = messageTypeNames; + Message = message as TProtoMessage; + if (context.ExpirationTime.HasValue) + ExpirationTime = context.ExpirationTime; + SentTime = context.SentTime ?? DateTime.UtcNow; + Headers = new Dictionary(); + foreach (KeyValuePair header in context.Headers.GetAll()) + Headers[header.Key] = header.Value as TProtoMessage; + Host = HostMetadataCache.Host; + } + + public ProtobufMessageEnvelope(MessageEnvelope envelope) + { + MessageId = envelope.MessageId; + RequestId = envelope.RequestId; + CorrelationId = envelope.CorrelationId; + ConversationId = envelope.ConversationId; + InitiatorId = envelope.InitiatorId; + SourceAddress = envelope.SourceAddress; + DestinationAddress = envelope.DestinationAddress; + ResponseAddress = envelope.ResponseAddress; + FaultAddress = envelope.FaultAddress; + MessageType = envelope.MessageType; + Message = envelope.Message as TProtoMessage; + ExpirationTime = envelope.ExpirationTime; + SentTime = envelope.SentTime ?? DateTime.UtcNow; + Headers = envelope.Headers != null + ? new Dictionary(envelope.Headers, StringComparer.OrdinalIgnoreCase) as Dictionary + : new Dictionary(StringComparer.OrdinalIgnoreCase); + Host = envelope.Host ?? HostMetadataCache.Host; + } + + public void Update(SendContext context) + { + DestinationAddress = context.DestinationAddress?.ToString(); + + if (context.SourceAddress != null) + SourceAddress = context.SourceAddress.ToString(); + + if (context.ResponseAddress != null) + ResponseAddress = context.ResponseAddress.ToString(); + + if (context.FaultAddress != null) + FaultAddress = context.FaultAddress.ToString(); + + if (context.MessageId.HasValue) + MessageId = context.MessageId.ToString(); + + if (context.RequestId.HasValue) + RequestId = context.RequestId.ToString(); + + if (context.ConversationId.HasValue) + ConversationId = context.ConversationId.ToString(); + + if (context.CorrelationId.HasValue) + CorrelationId = context.CorrelationId.ToString(); + + if (context.InitiatorId.HasValue) + InitiatorId = context.InitiatorId.ToString(); + + if (context.TimeToLive.HasValue) + ExpirationTime = DateTime.UtcNow + (context.TimeToLive > TimeSpan.Zero ? context.TimeToLive : TimeSpan.FromSeconds(1)); + + foreach (KeyValuePair header in context.Headers.GetAll()) + if (Headers != null) + Headers[header.Key] = header.Value as TProtoMessage; + else + Headers = new Dictionary { { header.Key, header.Value as TProtoMessage } }; + } + } +} diff --git a/src/MassTransit.Protobuf/Serialization/ProtobufMessageSerializer.cs b/src/MassTransit.Protobuf/Serialization/ProtobufMessageSerializer.cs new file mode 100644 index 00000000000..51a1bd63784 --- /dev/null +++ b/src/MassTransit.Protobuf/Serialization/ProtobufMessageSerializer.cs @@ -0,0 +1,27 @@ +namespace MassTransit.Serialization +{ + using System; + using System.Net.Mime; + using ProtoBuf.Meta; + + public class ProtobufMessageSerializer : IMessageSerializer + { + // might be application/vnd.google.protobuf or application/x-protobuf or application/x-google-protobuf or application/protobuf + public const string ContentTypeHeaderValue = "application/vnd.masstransit+pbuf"; + public static readonly ContentType ProtobufContentType = new ContentType(ContentTypeHeaderValue); + private static RuntimeTypeModel _typeModel; + + public ContentType ContentType => ProtobufContentType; + + public ProtobufMessageSerializer(RuntimeTypeModel typeModel) + { + _typeModel = typeModel; + } + + public MessageBody GetMessageBody(SendContext context) where T : class + { + return new ProtobufMessageBody(context, _typeModel); + } + + } +} diff --git a/src/MassTransit.Protobuf/Serialization/ProtobufObjectDeserializer.cs b/src/MassTransit.Protobuf/Serialization/ProtobufObjectDeserializer.cs new file mode 100644 index 00000000000..5b8cba7b2f3 --- /dev/null +++ b/src/MassTransit.Protobuf/Serialization/ProtobufObjectDeserializer.cs @@ -0,0 +1,82 @@ +#nullable enable +namespace MassTransit.Serialization +{ + using System; + using System.IO; + using Initializers; + using Initializers.TypeConverters; + using MassTransit.Metadata; + using ProtoBuf.Meta; + + public class ProtobufObjectDeserializer : IObjectDeserializer + { + private readonly RuntimeTypeModel _typeModel; + + public ProtobufObjectDeserializer(RuntimeTypeModel typeModel) + { + _typeModel = typeModel; + } + + public T? DeserializeObject(object? value, T? defaultValue = default) + where T : class + { + switch (value) + { + case null: + return defaultValue; + case T headerValue: + return headerValue; + case string text when string.IsNullOrWhiteSpace(text): + return defaultValue; + case string json when typeof(T).IsInterface && TypeMetadataCache.IsValidMessageType: + using (var stream = new MemoryStream()) + { + var bytes = Convert.FromBase64String(json); + stream.Write(bytes, 0, bytes.Length); + stream.Position = 0; + return _typeModel.Deserialize(stream); + } + case string text when TypeConverterCache.TryGetTypeConverter(out ITypeConverter? typeConverter) + && typeConverter.TryConvert(text, out var result): + return result; + } + + throw new InvalidOperationException($"Unsupported deserialization type: {typeof(T)}"); + } + + public T? DeserializeObject(object? value, T? defaultValue = null) + where T : struct + { + switch (value) + { + case null: + return defaultValue; + case T headerValue: + return headerValue; + case string text when string.IsNullOrWhiteSpace(text): + return defaultValue; + case string text when TypeConverterCache.TryGetTypeConverter(out ITypeConverter? typeConverter) + && typeConverter.TryConvert(text, out var result): + return result; + } + + throw new InvalidOperationException($"Unsupported deserialization type: {typeof(T)}"); + } + + public MessageBody SerializeObject(object? value) + { + if (value == null) + return new EmptyMessageBody(); + + using (var stream = new MemoryStream()) + { + _typeModel.Serialize(stream, value); + stream.Position = 0; + var bytes = stream.ToArray(); + var base64String = Convert.ToBase64String(bytes); + return new StringMessageBody(base64String); + } + } + } + +} diff --git a/src/MassTransit.Protobuf/Serialization/ProtobufSerializerContext.cs b/src/MassTransit.Protobuf/Serialization/ProtobufSerializerContext.cs new file mode 100644 index 00000000000..1f7f518c0e5 --- /dev/null +++ b/src/MassTransit.Protobuf/Serialization/ProtobufSerializerContext.cs @@ -0,0 +1,128 @@ +#nullable enable +namespace MassTransit.Serialization +{ + using System; + using System.Collections.Generic; + using System.Diagnostics.CodeAnalysis; + using System.Net.Mime; + using ProtoBuf; + using ProtoBuf.Meta; + using MassTransit; + using System.IO; + + public abstract class ProtobufBodySerializerContext : + BaseSerializerContext + { + readonly RuntimeTypeModel _typeModel; + private object? _message; + + protected ProtobufBodySerializerContext(RuntimeTypeModel typeModel, IObjectDeserializer objectDeserializer, MessageContext messageContext, + object? message, string[] supportedMessageTypes) + : base(objectDeserializer, messageContext, supportedMessageTypes) + { + _typeModel = typeModel; + _message = message; + } + + public override bool TryGetMessage(out T? message) + where T : class + { + if (_typeModel.CanSerialize(typeof(T))) + { + if (_message is T messageOfT) + { + message = messageOfT; + return true; + } + + using (var stream = new System.IO.MemoryStream()) + { + _typeModel.Serialize(stream, _message); + stream.Position = 0; + + message = (T)_typeModel.Deserialize(stream, null, typeof(T)); + return true; + } + } + + message = null; + return false; + } + + public override bool TryGetMessage(Type messageType, [NotNullWhen(true)] out object? message) + { + if (_message != null && messageType.IsInstanceOfType(_message)) + { + message = _message; + return true; + } + + if (_typeModel.CanSerialize(messageType)) + { + using (var stream = new System.IO.MemoryStream()) + { + _typeModel.Serialize(stream, _message); + stream.Position = 0; + + message = _typeModel.Deserialize(stream, null, messageType); + return true; + } + } + + message = null; + return false; + } + + public override Dictionary ToDictionary(T? message) + where T : class + { + if (message == null) + return new Dictionary(StringComparer.OrdinalIgnoreCase); + + using var stream = new MemoryStream(); + _typeModel.Serialize(stream, message); + stream.Position = 0; + return Serializer.Merge(stream, new Dictionary())!; + } + } + +#nullable enable + public class ProtobufSerializerContext : + ProtobufBodySerializerContext + { + readonly ContentType _contentType; + readonly MessageEnvelope _envelope; + readonly RuntimeTypeModel _typeModel; + + public ProtobufSerializerContext(RuntimeTypeModel typeModel, IObjectDeserializer objectDeserializer, MessageEnvelope envelope, + ContentType contentType) + : base(typeModel, objectDeserializer, new EnvelopeMessageContext(envelope, objectDeserializer), envelope.Message, + envelope.MessageType ?? Array.Empty()) + { + _contentType = contentType; + _envelope = envelope; + _typeModel = typeModel; + } + + public override IMessageSerializer GetMessageSerializer() + { + return new ProtobufBodyMessageSerializer(_envelope, _contentType, _typeModel); + } + + public override IMessageSerializer GetMessageSerializer(MessageEnvelope envelope, T message) + { + var serializer = new ProtobufBodyMessageSerializer(envelope, _contentType, _typeModel); + + serializer.Overlay(message); + + return serializer; + } + + public override IMessageSerializer GetMessageSerializer(object message, string[] messageTypes) + { + var envelope = new ProtobufMessageEnvelope(this, message, messageTypes); + + return new ProtobufBodyMessageSerializer((MessageEnvelope)envelope, _contentType, _typeModel); + } + } +} diff --git a/src/MassTransit.Protobuf/Serialization/ProtobufSerializerFactory.cs b/src/MassTransit.Protobuf/Serialization/ProtobufSerializerFactory.cs new file mode 100644 index 00000000000..2a35f29ea9d --- /dev/null +++ b/src/MassTransit.Protobuf/Serialization/ProtobufSerializerFactory.cs @@ -0,0 +1,36 @@ +namespace MassTransit.Serialization +{ + using MassTransit.Events; + using ProtoBuf.Meta; + using System.Net.Mime; + public class ProtobufSerializerFactory : ISerializerFactory + where TProtoMessage : class + { + private readonly RuntimeTypeModel _typeModel; + + public ContentType ContentType => ProtobufMessageSerializer.ProtobufContentType; + + public ProtobufSerializerFactory() + { + _typeModel = RuntimeTypeModel.Create(); + + _typeModel.UseImplicitZeroDefaults = false; + _typeModel.AutoAddMissingTypes = true; + _typeModel.AllowParseableTypes = true; + _typeModel.AutoCompile = true; + + _typeModel.Add(true); + var receiveFaultModel = _typeModel.Add(true); + receiveFaultModel.AddSubType(100, typeof(ReceiveFaultEvent)); + } + + public IMessageSerializer CreateSerializer() + { + return new ProtobufMessageSerializer(_typeModel); + } + public IMessageDeserializer CreateDeserializer() + { + return new ProtobufMessageDeserializer(_typeModel); + } + } +}