From 6f0a00a984d5eb6db9a49256dbe4736fdb9ca460 Mon Sep 17 00:00:00 2001 From: "denys.kozhevnikov" Date: Sun, 9 Apr 2023 14:40:20 +0100 Subject: [PATCH] Refactoring telemetry --- Directory.Packages.props | 1 + ...pendencyInjectionRegistrationExtensions.cs | 9 + .../InstrumentationConfigurationExtensions.cs | 24 +- .../Courier/CompensateActivityHost.cs | 9 +- .../Courier/ExecuteActivityHost.cs | 9 +- src/MassTransit/LogContext.cs | 18 +- src/MassTransit/Logging/BusLogContext.cs | 271 +----------- .../DiagnosticActivityExtensions.cs | 0 .../{ => Diagnostics}/DiagnosticHeaders.cs | 0 .../LogContextActivityExtensions.cs | 269 ++++++++++++ .../{ => Diagnostics}/StartedActivity.cs | 0 src/MassTransit/Logging/ILogContext.cs | 40 +- .../LogContextInstrumentationExtensions.cs} | 403 ++++++++++-------- .../Logging/Monitoring/StartedInstrument.cs | 28 ++ .../Middleware/ConsumerMessageFilter.cs | 9 +- .../Middleware/HandlerMessageFilter.cs | 15 +- ...tiatedByOrOrchestratesSagaMessageFilter.cs | 3 + .../InitiatedBySagaMessageFilter.cs | 3 + .../Middleware/InstanceMessageFilter.cs | 17 +- .../InstrumentCompensateActivityFilter.cs | 24 -- .../Middleware/InstrumentConsumerFilter.cs | 24 -- .../InstrumentExecuteActivityFilter.cs | 24 -- .../Middleware/InstrumentHandlerFilter.cs | 23 - .../Middleware/InstrumentReceiveFilter.cs | 22 - .../Middleware/InstrumentSagaFilter.cs | 24 -- .../Middleware/ObservesSagaMessageFilter.cs | 3 + .../OrchestratesSagaMessageFilter.cs | 3 + .../Middleware/Outbox/OutboxSendEndpoint.cs | 3 + .../Middleware/OutboxMessagePipe.cs | 3 + .../StateMachineSagaMessageFilter.cs | 6 + ...InstrumentActivityConfigurationObserver.cs | 49 --- ...strumentCompensateActivitySpecification.cs | 24 -- ...InstrumentConsumerConfigurationObserver.cs | 43 -- .../InstrumentConsumerSpecification.cs | 24 -- .../InstrumentExecuteActivitySpecification.cs | 24 -- .../InstrumentHandlerConfigurationObserver.cs | 14 - .../InstrumentHandlerSpecification.cs | 23 - ...mentReceiveEndpointConfiguratorObserver.cs | 14 - .../InstrumentReceiveSpecification.cs | 22 - .../InstrumentSagaConfigurationObserver.cs | 25 -- .../InstrumentSagaSpecification.cs | 24 -- .../ConfigureDefaultInstrumentationOptions.cs | 57 +++ .../Monitoring/InstrumentActivityObserver.cs | 60 --- .../Monitoring/InstrumentBusObserver.cs | 51 --- .../Monitoring/InstrumentPublishObserver.cs | 32 -- .../Monitoring/InstrumentReceiveObserver.cs | 45 -- .../Monitoring/InstrumentSendObserver.cs | 32 -- .../Monitoring/InstrumentationOptions.cs | 65 ++- .../Transports/ReceivePipeDispatcher.cs | 8 + src/MassTransit/Transports/SendTransport.cs | 3 + .../BusOutboxDeliveryService.cs | 9 + .../BusOutboxDeliveryService.cs | 3 + .../EventHubIntegration/EventHubProducer.cs | 4 + .../KafkaIntegration/TopicProducer.cs | 3 + .../MassTransit.Benchmark.csproj | 1 + tests/MassTransit.Benchmark/Program.cs | 29 ++ .../MassTransit.Benchmark/ProgramOptionSet.cs | 5 + .../otel-collector/otelcol-config-extras.yml | 2 + .../configs/otel-collector/otelcol-config.yml | 26 ++ .../MassTransit.Benchmark/docker-compose.yml | 9 + 60 files changed, 795 insertions(+), 1222 deletions(-) rename src/MassTransit/Logging/{ => Diagnostics}/DiagnosticActivityExtensions.cs (100%) rename src/MassTransit/Logging/{ => Diagnostics}/DiagnosticHeaders.cs (100%) create mode 100644 src/MassTransit/Logging/Diagnostics/LogContextActivityExtensions.cs rename src/MassTransit/Logging/{ => Diagnostics}/StartedActivity.cs (100%) rename src/MassTransit/{Monitoring/Instrumentation.cs => Logging/Monitoring/LogContextInstrumentationExtensions.cs} (53%) create mode 100644 src/MassTransit/Logging/Monitoring/StartedInstrument.cs delete mode 100644 src/MassTransit/Middleware/InstrumentCompensateActivityFilter.cs delete mode 100644 src/MassTransit/Middleware/InstrumentConsumerFilter.cs delete mode 100644 src/MassTransit/Middleware/InstrumentExecuteActivityFilter.cs delete mode 100644 src/MassTransit/Middleware/InstrumentHandlerFilter.cs delete mode 100644 src/MassTransit/Middleware/InstrumentReceiveFilter.cs delete mode 100644 src/MassTransit/Middleware/InstrumentSagaFilter.cs delete mode 100644 src/MassTransit/Monitoring/Configuration/InstrumentActivityConfigurationObserver.cs delete mode 100644 src/MassTransit/Monitoring/Configuration/InstrumentCompensateActivitySpecification.cs delete mode 100644 src/MassTransit/Monitoring/Configuration/InstrumentConsumerConfigurationObserver.cs delete mode 100644 src/MassTransit/Monitoring/Configuration/InstrumentConsumerSpecification.cs delete mode 100644 src/MassTransit/Monitoring/Configuration/InstrumentExecuteActivitySpecification.cs delete mode 100644 src/MassTransit/Monitoring/Configuration/InstrumentHandlerConfigurationObserver.cs delete mode 100644 src/MassTransit/Monitoring/Configuration/InstrumentHandlerSpecification.cs delete mode 100644 src/MassTransit/Monitoring/Configuration/InstrumentReceiveEndpointConfiguratorObserver.cs delete mode 100644 src/MassTransit/Monitoring/Configuration/InstrumentReceiveSpecification.cs delete mode 100644 src/MassTransit/Monitoring/Configuration/InstrumentSagaConfigurationObserver.cs delete mode 100644 src/MassTransit/Monitoring/Configuration/InstrumentSagaSpecification.cs create mode 100644 src/MassTransit/Monitoring/ConfigureDefaultInstrumentationOptions.cs delete mode 100644 src/MassTransit/Monitoring/InstrumentActivityObserver.cs delete mode 100644 src/MassTransit/Monitoring/InstrumentBusObserver.cs delete mode 100644 src/MassTransit/Monitoring/InstrumentPublishObserver.cs delete mode 100644 src/MassTransit/Monitoring/InstrumentReceiveObserver.cs delete mode 100644 src/MassTransit/Monitoring/InstrumentSendObserver.cs create mode 100644 tests/MassTransit.Benchmark/configs/otel-collector/otelcol-config-extras.yml create mode 100644 tests/MassTransit.Benchmark/configs/otel-collector/otelcol-config.yml diff --git a/Directory.Packages.props b/Directory.Packages.props index b4c5e827ea3..1e6fd4bea67 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -65,6 +65,7 @@ + diff --git a/src/MassTransit/Configuration/DependencyInjection/DependencyInjectionRegistrationExtensions.cs b/src/MassTransit/Configuration/DependencyInjection/DependencyInjectionRegistrationExtensions.cs index 0b5481d648d..8a6a50b37d5 100644 --- a/src/MassTransit/Configuration/DependencyInjection/DependencyInjectionRegistrationExtensions.cs +++ b/src/MassTransit/Configuration/DependencyInjection/DependencyInjectionRegistrationExtensions.cs @@ -34,6 +34,7 @@ public static IServiceCollection AddMassTransit(this IServiceCollection collecti } AddHostedService(collection); + AddInstrumentation(collection); var configurator = new ServiceCollectionBusConfigurator(collection); @@ -85,6 +86,7 @@ public static IServiceCollection AddMassTransit(this IServic } AddHostedService(collection); + AddInstrumentation(collection); var configurator = new ServiceCollectionBusConfigurator(collection); @@ -109,6 +111,7 @@ public static IServiceCollection AddMassTransit(this IServiceCollection co throw new ArgumentNullException(nameof(configure)); AddHostedService(collection); + AddInstrumentation(collection); var doIt = new Callback(collection, configure); @@ -155,6 +158,12 @@ public static void ReplaceScoped(this IServiceCollect services.Replace(new ServiceDescriptor(typeof(TService), typeof(TImplementation), ServiceLifetime.Scoped)); } + static void AddInstrumentation(IServiceCollection collection) + { + collection.AddOptions(); + collection.AddSingleton, ConfigureDefaultInstrumentationOptions>(); + } + static void AddHostedService(IServiceCollection collection) { collection.AddOptions(); diff --git a/src/MassTransit/Configuration/InstrumentationConfigurationExtensions.cs b/src/MassTransit/Configuration/InstrumentationConfigurationExtensions.cs index 34255a0a5f0..6c188965799 100644 --- a/src/MassTransit/Configuration/InstrumentationConfigurationExtensions.cs +++ b/src/MassTransit/Configuration/InstrumentationConfigurationExtensions.cs @@ -1,9 +1,8 @@ namespace MassTransit { using System; - using Metadata; + using Logging; using Monitoring; - using Monitoring.Configuration; public static class InstrumentationConfigurationExtensions @@ -20,25 +19,16 @@ public static class InstrumentationConfigurationExtensions public static void UseInstrumentation(this IBusFactoryConfigurator configurator, Action configureOptions = null, string serviceName = default) { - var options = InstrumentationOptions.CreateDefault(); + var options = new InstrumentationOptions(); + var configureDefault = new ConfigureDefaultInstrumentationOptions(); + configureDefault.Configure(options); configureOptions?.Invoke(options); - Instrumentation.TryConfigure(GetServiceName(serviceName), options); + if (!string.IsNullOrWhiteSpace(serviceName)) + options.ServiceName = serviceName; - configurator.ConnectConsumerConfigurationObserver(new InstrumentConsumerConfigurationObserver()); - configurator.ConnectHandlerConfigurationObserver(new InstrumentHandlerConfigurationObserver()); - configurator.ConnectSagaConfigurationObserver(new InstrumentSagaConfigurationObserver()); - configurator.ConnectActivityConfigurationObserver(new InstrumentActivityConfigurationObserver()); - configurator.ConnectEndpointConfigurationObserver(new InstrumentReceiveEndpointConfiguratorObserver()); - configurator.ConnectBusObserver(new InstrumentBusObserver()); - } - - static string GetServiceName(string serviceName) - { - return string.IsNullOrWhiteSpace(serviceName) - ? HostMetadataCache.Host.ProcessName - : serviceName; + LogContextInstrumentationExtensions.TryConfigure(options); } } } diff --git a/src/MassTransit/Courier/CompensateActivityHost.cs b/src/MassTransit/Courier/CompensateActivityHost.cs index 39889b85cce..5f73b9e237b 100644 --- a/src/MassTransit/Courier/CompensateActivityHost.cs +++ b/src/MassTransit/Courier/CompensateActivityHost.cs @@ -21,9 +21,11 @@ public CompensateActivityHost(IPipe> compensatePipe) public async Task Send(ConsumeContext context, IPipe> next) { + var timer = Stopwatch.StartNew(); + StartedActivity? activity = LogContext.Current?.StartCompensateActivity(context); + StartedInstrument? instrument = LogContext.Current?.StartActivityCompensateInstrument(context, timer); - var timer = Stopwatch.StartNew(); try { CompensateContext compensateContext = new HostCompensateContext(context); @@ -55,6 +57,8 @@ public async Task Send(ConsumeContext context, IPipe.ShortName}"); } catch (Exception exception) @@ -63,11 +67,14 @@ public async Task Send(ConsumeContext context, IPipe> executePipe, Uri co public async Task Send(ConsumeContext context, IPipe> next) { + var timer = Stopwatch.StartNew(); + StartedActivity? activity = LogContext.Current?.StartExecuteActivity(context); + StartedInstrument? instrument = LogContext.Current?.StartActivityExecuteInstrument(context, timer); - var timer = Stopwatch.StartNew(); try { ExecuteContext executeContext = new HostExecuteContext(_compensateAddress, context); @@ -60,6 +62,8 @@ public async Task Send(ConsumeContext context, IPipe.ShortName}"); } catch (Exception exception) @@ -68,11 +72,14 @@ public async Task Send(ConsumeContext context, IPipe @@ -47,7 +47,7 @@ public static void ConfigureCurrentLogContext(ILoggerFactory loggerFactory = nul /// An existing logger public static void ConfigureCurrentLogContext(ILogger logger) { - Current = new BusLogContext(new SingleLoggerFactory(logger), Cached.Source.Value); + Current = new BusLogContext(new SingleLoggerFactory(logger)); } public static ILogContext CreateLogContext(string categoryName) @@ -64,6 +64,8 @@ public static ILogContext CreateLogContext(string categoryName) /// public static void ConfigureCurrentLogContextIfNull(IServiceProvider provider) { + LogContextInstrumentationExtensions.TryConfigure(provider); + if (Current == null || Current.Logger is NullLogger) { var loggerFactory = provider.GetService(); @@ -207,17 +209,9 @@ void Log(T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, Exception exception) static ILogContext CreateDefaultLogContext() { - var source = Cached.Source.Value; - var loggerFactory = NullLoggerFactory.Instance; - return new BusLogContext(loggerFactory, source); - } - - - static class Cached - { - internal static readonly Lazy Source = new Lazy(() => new ActivitySource(DiagnosticHeaders.DefaultListenerName)); + return new BusLogContext(loggerFactory); } } } diff --git a/src/MassTransit/Logging/BusLogContext.cs b/src/MassTransit/Logging/BusLogContext.cs index e1579b97875..8599ff986d2 100644 --- a/src/MassTransit/Logging/BusLogContext.cs +++ b/src/MassTransit/Logging/BusLogContext.cs @@ -1,14 +1,7 @@ #nullable enable namespace MassTransit.Logging { - using System; - using System.Collections.Concurrent; - using System.Collections.Generic; - using System.Diagnostics; - using Courier.Contracts; using Microsoft.Extensions.Logging; - using Middleware; - using Transports; public class BusLogContext : @@ -16,29 +9,24 @@ public class BusLogContext : { readonly ILoggerFactory _loggerFactory; readonly ILogContext _messageLogger; - readonly ActivitySource _source; - public BusLogContext(ILoggerFactory loggerFactory, ActivitySource source) + public BusLogContext(ILoggerFactory loggerFactory) { - _source = source; _loggerFactory = loggerFactory; Logger = loggerFactory.CreateLogger(LogCategoryName.MassTransit); - - _messageLogger = new BusLogContext(source, loggerFactory, loggerFactory.CreateLogger("MassTransit.Messages")); + _messageLogger = new BusLogContext(loggerFactory, loggerFactory.CreateLogger("MassTransit.Messages")); } - BusLogContext(ActivitySource source, ILoggerFactory loggerFactory, ILogContext messageLogger, ILogger logger) + BusLogContext(ILoggerFactory loggerFactory, ILogContext messageLogger, ILogger logger) { - _source = source; _loggerFactory = loggerFactory; _messageLogger = messageLogger; Logger = logger; } - BusLogContext(ActivitySource source, ILoggerFactory loggerFactory, ILogger logger) + BusLogContext(ILoggerFactory loggerFactory, ILogger logger) { - _source = source; _loggerFactory = loggerFactory; Logger = logger; @@ -51,7 +39,7 @@ public ILogContext CreateLogContext(string categoryName) { var logger = _loggerFactory.CreateLogger(categoryName); - return new BusLogContext(_source, _loggerFactory, _messageLogger, logger); + return new BusLogContext(_loggerFactory, _messageLogger, logger); } public ILogger Logger { get; } @@ -67,254 +55,5 @@ public ILogContext CreateLogContext(string categoryName) public EnabledLogger? Trace => Logger.IsEnabled(LogLevel.Trace) ? new EnabledLogger(Logger, LogLevel.Trace) : default(EnabledLogger?); public EnabledLogger? Warning => Logger.IsEnabled(LogLevel.Warning) ? new EnabledLogger(Logger, LogLevel.Warning) : default(EnabledLogger?); - - public StartedActivity? StartSendActivity(SendTransportContext transportContext, SendContext context, params (string Key, object? Value)[] tags) - where T : class - { - var activity = _source.CreateActivity(transportContext.ActivityName, ActivityKind.Producer); - if (activity == null) - return null; - - activity.SetTag(DiagnosticHeaders.Messaging.System, transportContext.ActivitySystem); - activity.SetTag(DiagnosticHeaders.Messaging.DestinationName, transportContext.ActivityDestination); - activity.SetTag(DiagnosticHeaders.Messaging.Operation, "send"); - - return PopulateSendActivity(context, activity, tags); - } - - public StartedActivity? StartOutboxSendActivity(SendContext context) - where T : class - { - var parentActivityContext = System.Diagnostics.Activity.Current?.Context ?? default; - - var activity = _source.CreateActivity("outbox send", ActivityKind.Producer, parentActivityContext); - if (activity == null) - return null; - - activity.SetTag(DiagnosticHeaders.Messaging.Operation, "send"); - - return PopulateSendActivity(context, activity); - } - - public StartedActivity? StartOutboxDeliverActivity(OutboxMessageContext context) - { - var parentActivityContext = GetParentActivityContext(context.Headers); - - var activity = _source.CreateActivity("outbox process", ActivityKind.Client, parentActivityContext); - if (activity == null) - return null; - - activity.Start(); - - return new StartedActivity(activity); - } - - public StartedActivity? StartReceiveActivity(string name, string inputAddress, string endpointName, ReceiveContext context) - { - var parentActivityContext = GetParentActivityContext(context.TransportHeaders); - - var activity = _source.CreateActivity(name, ActivityKind.Consumer, parentActivityContext); - if (activity == null) - return null; - - if (activity.IsAllDataRequested) - { - activity.SetTag(DiagnosticHeaders.Messaging.DestinationName, endpointName); - activity.SetTag(DiagnosticHeaders.Messaging.Operation, "receive"); - activity.SetTag(DiagnosticHeaders.InputAddress, inputAddress); - - if ((context.TransportHeaders.TryGetHeader(MessageHeaders.TransportMessageId, out var messageIdHeader) - || context.TransportHeaders.TryGetHeader(MessageHeaders.MessageId, out messageIdHeader)) - && messageIdHeader is string text) - activity.SetTag(DiagnosticHeaders.Messaging.TransportMessageId, text); - } - - activity.Start(); - - return new StartedActivity(activity); - } - - public StartedActivity? StartConsumerActivity(ConsumeContext context) - where T : class - { - return StartActivity(activity => - { - activity.SetTag(DiagnosticHeaders.ConsumerType, TypeCache.ShortName); - activity.SetTag(DiagnosticHeaders.PeerAddress, MessageTypeCache.DiagnosticAddress); - }); - } - - public StartedActivity? StartHandlerActivity(ConsumeContext context) - where T : class - { - return StartActivity(activity => - { - activity.SetTag(DiagnosticHeaders.ConsumerType, "Handler"); - activity.SetTag(DiagnosticHeaders.PeerAddress, MessageTypeCache.DiagnosticAddress); - }); - } - - public StartedActivity? StartSagaActivity(SagaConsumeContext context) - where TSaga : class, ISaga - where T : class - { - return StartActivity(activity => - { - activity.SetTag(DiagnosticHeaders.SagaId, context.Saga.CorrelationId.ToString("D")); - activity.SetTag(DiagnosticHeaders.ConsumerType, TypeCache.ShortName); - activity.SetTag(DiagnosticHeaders.PeerAddress, MessageTypeCache.DiagnosticAddress); - }); - } - - public StartedActivity? StartSagaStateMachineActivity(BehaviorContext context) - where TSaga : class, ISaga - where T : class - { - return StartActivity(activity => - { - activity.SetTag(DiagnosticHeaders.SagaId, context.Saga.CorrelationId.ToString("D")); - activity.SetTag(DiagnosticHeaders.ConsumerType, context.StateMachine.Name); - activity.SetTag(DiagnosticHeaders.PeerAddress, MessageTypeCache.DiagnosticAddress); - }); - } - - public StartedActivity? StartExecuteActivity(ConsumeContext context) - where TActivity : IExecuteActivity - where TArguments : class - { - return StartActivity(activity => - { - activity.SetTag(DiagnosticHeaders.TrackingNumber, context.Message.TrackingNumber.ToString("D")); - activity.SetTag(DiagnosticHeaders.ConsumerType, TypeCache.ShortName); - activity.SetTag(DiagnosticHeaders.PeerAddress, MessageTypeCache.DiagnosticAddress); - }); - } - - public StartedActivity? StartCompensateActivity(ConsumeContext context) - where TActivity : ICompensateActivity - where TLog : class - { - return StartActivity(activity => - { - activity.SetTag(DiagnosticHeaders.TrackingNumber, context.Message.TrackingNumber.ToString("D")); - activity.SetTag(DiagnosticHeaders.ConsumerType, TypeCache.ShortName); - activity.SetTag(DiagnosticHeaders.PeerAddress, MessageTypeCache.DiagnosticAddress); - }); - } - - public StartedActivity? StartGenericActivity(string operationName) - { - var activity = _source.CreateActivity(operationName, ActivityKind.Client); - if (activity == null) - return null; - - activity.Start(); - - return new StartedActivity(activity); - } - - static StartedActivity? PopulateSendActivity(SendContext context, System.Diagnostics.Activity activity, params (string Key, object? Value)[] tags) - where T : class - { - var conversationId = context.ConversationId?.ToString("D"); - - if (context.CorrelationId.HasValue) - activity.SetBaggage(DiagnosticHeaders.CorrelationId, context.CorrelationId.Value.ToString("D")); - if (conversationId != null) - activity.SetBaggage(DiagnosticHeaders.Messaging.ConversationId, conversationId); - - if (activity.IsAllDataRequested) - { - if (context.MessageId.HasValue) - activity.SetTag(DiagnosticHeaders.MessageId, context.MessageId.Value.ToString("D")); - if (conversationId != null) - activity.SetTag(DiagnosticHeaders.Messaging.ConversationId, conversationId); - if (context.CorrelationId.HasValue) - activity.SetTag(DiagnosticHeaders.CorrelationId, context.CorrelationId.Value.ToString("D")); - if (context.RequestId.HasValue) - activity.SetTag(DiagnosticHeaders.RequestId, context.RequestId.Value.ToString("D")); - if (context.InitiatorId.HasValue) - activity.SetTag(DiagnosticHeaders.InitiatorId, context.InitiatorId.Value.ToString("D")); - if (context.SourceAddress != null) - activity.SetTag(DiagnosticHeaders.SourceAddress, context.SourceAddress.ToString()); - if (context.DestinationAddress != null) - activity.SetTag(DiagnosticHeaders.DestinationAddress, context.DestinationAddress.ToString()); - - activity.SetTag(DiagnosticHeaders.MessageTypes, string.Join(",", MessageTypeCache.MessageTypeNames)); - - for (var i = 0; i < tags.Length; i++) - { - if (tags[i].Value != null) - activity.SetTag(tags[i].Key, tags[i].Value?.ToString()); - } - } - - activity.Start(); - - IList>? baggage = null; - foreach (KeyValuePair pair in activity.Baggage) - { - if (pair.Key.Equals(DiagnosticHeaders.Messaging.ConversationId) || pair.Key.Equals(DiagnosticHeaders.CorrelationId)) - continue; - - if (string.IsNullOrWhiteSpace(pair.Value)) - continue; - - baggage ??= new List>(); - baggage.Add(pair); - } - - if (activity.Id != null) - context.Headers.Set(DiagnosticHeaders.ActivityId, activity.Id); - - if (baggage != null) - context.Headers.Set(DiagnosticHeaders.ActivityCorrelationContext, baggage); - - return new StartedActivity(activity); - } - - StartedActivity? StartActivity(Action started) - { - var currentActivity = System.Diagnostics.Activity.Current; - if (currentActivity == null) - return null; - - var operationName = Cached.OperationNames.GetOrAdd(currentActivity.OperationName, add => - { - if (add.EndsWith(" receive")) - return add.Substring(0, add.Length - 8) + " process"; - if (add.EndsWith(" process")) - return add; - - return currentActivity.OperationName; - }); - - var activity = _source.CreateActivity(operationName, ActivityKind.Consumer); - if (activity == null) - return null; - - activity.SetTag(DiagnosticHeaders.Messaging.Operation, "process"); - - if (activity.IsAllDataRequested) - started(activity); - - activity.Start(); - - return new StartedActivity(activity); - } - - static ActivityContext GetParentActivityContext(Headers headers) - { - return headers.TryGetHeader(DiagnosticHeaders.ActivityId, out var headerValue) && headerValue is string activityId - && ActivityContext.TryParse(activityId, null, out var activityContext) - ? activityContext - : default; - } - - - static class Cached - { - internal static readonly ConcurrentDictionary OperationNames = new ConcurrentDictionary(); - } } } diff --git a/src/MassTransit/Logging/DiagnosticActivityExtensions.cs b/src/MassTransit/Logging/Diagnostics/DiagnosticActivityExtensions.cs similarity index 100% rename from src/MassTransit/Logging/DiagnosticActivityExtensions.cs rename to src/MassTransit/Logging/Diagnostics/DiagnosticActivityExtensions.cs diff --git a/src/MassTransit/Logging/DiagnosticHeaders.cs b/src/MassTransit/Logging/Diagnostics/DiagnosticHeaders.cs similarity index 100% rename from src/MassTransit/Logging/DiagnosticHeaders.cs rename to src/MassTransit/Logging/Diagnostics/DiagnosticHeaders.cs diff --git a/src/MassTransit/Logging/Diagnostics/LogContextActivityExtensions.cs b/src/MassTransit/Logging/Diagnostics/LogContextActivityExtensions.cs new file mode 100644 index 00000000000..458f244b904 --- /dev/null +++ b/src/MassTransit/Logging/Diagnostics/LogContextActivityExtensions.cs @@ -0,0 +1,269 @@ +#nullable enable +// ReSharper disable once CheckNamespace +namespace MassTransit.Logging +{ + using System; + using System.Collections.Concurrent; + using System.Collections.Generic; + using System.Diagnostics; + using Courier.Contracts; + using Middleware; + using Transports; + + + public static class LogContextActivityExtensions + { + public static StartedActivity? StartSendActivity(this ILogContext logContext, SendTransportContext transportContext, SendContext context, + params (string Key, object? Value)[] tags) + where T : class + { + var activity = Cached.Source.Value.CreateActivity(transportContext.ActivityName, ActivityKind.Producer); + if (activity == null) + return null; + + activity.SetTag(DiagnosticHeaders.Messaging.Operation, "send"); + activity.SetTag(DiagnosticHeaders.Messaging.System, transportContext.ActivitySystem); + activity.SetTag(DiagnosticHeaders.Messaging.DestinationName, transportContext.ActivityDestination); + + return PopulateSendActivity(context, activity, tags); + } + + public static StartedActivity? StartOutboxSendActivity(this ILogContext logContext, SendContext context) + where T : class + { + var parentActivityContext = System.Diagnostics.Activity.Current?.Context ?? default; + + var activity = Cached.Source.Value.CreateActivity("outbox send", ActivityKind.Producer, parentActivityContext); + if (activity == null) + return null; + + activity.SetTag(DiagnosticHeaders.Messaging.Operation, "send"); + + return PopulateSendActivity(context, activity); + } + + public static StartedActivity? StartOutboxDeliverActivity(this ILogContext logContext, OutboxMessageContext context) + { + var parentActivityContext = GetParentActivityContext(context.Headers); + + var activity = Cached.Source.Value.CreateActivity("outbox process", ActivityKind.Client, parentActivityContext); + if (activity == null) + return null; + + activity.Start(); + + return new StartedActivity(activity); + } + + public static StartedActivity? StartReceiveActivity(this ILogContext logContext, string name, string inputAddress, string endpointName, + ReceiveContext context) + { + var parentActivityContext = GetParentActivityContext(context.TransportHeaders); + + var activity = Cached.Source.Value.CreateActivity(name, ActivityKind.Consumer, parentActivityContext); + if (activity == null) + return null; + + activity.SetTag(DiagnosticHeaders.Messaging.Operation, "receive"); + activity.SetTag(DiagnosticHeaders.Messaging.DestinationName, endpointName); + + if (activity.IsAllDataRequested) + { + activity.SetTag(DiagnosticHeaders.InputAddress, inputAddress); + + if ((context.TransportHeaders.TryGetHeader(MessageHeaders.TransportMessageId, out var messageIdHeader) + || context.TransportHeaders.TryGetHeader(MessageHeaders.MessageId, out messageIdHeader)) + && messageIdHeader is string text) + activity.SetTag(DiagnosticHeaders.Messaging.TransportMessageId, text); + } + + activity.Start(); + + return new StartedActivity(activity); + } + + public static StartedActivity? StartConsumerActivity(this ILogContext logContext, ConsumeContext context) + where T : class + { + return StartActivity(activity => + { + activity.SetTag(DiagnosticHeaders.ConsumerType, TypeCache.ShortName); + activity.SetTag(DiagnosticHeaders.PeerAddress, MessageTypeCache.DiagnosticAddress); + }); + } + + public static StartedActivity? StartHandlerActivity(this ILogContext logContext, ConsumeContext context) + where T : class + { + return StartActivity(activity => + { + activity.SetTag(DiagnosticHeaders.ConsumerType, "Handler"); + activity.SetTag(DiagnosticHeaders.PeerAddress, MessageTypeCache.DiagnosticAddress); + }); + } + + public static StartedActivity? StartSagaActivity(this ILogContext logContext, SagaConsumeContext context) + where TSaga : class, ISaga + where T : class + { + return StartActivity(activity => + { + activity.SetTag(DiagnosticHeaders.SagaId, context.Saga.CorrelationId.ToString("D")); + activity.SetTag(DiagnosticHeaders.ConsumerType, TypeCache.ShortName); + activity.SetTag(DiagnosticHeaders.PeerAddress, MessageTypeCache.DiagnosticAddress); + }); + } + + public static StartedActivity? StartSagaStateMachineActivity(this ILogContext logContext, BehaviorContext context) + where TSaga : class, ISaga + where T : class + { + return StartActivity(activity => + { + activity.SetTag(DiagnosticHeaders.SagaId, context.Saga.CorrelationId.ToString("D")); + activity.SetTag(DiagnosticHeaders.ConsumerType, context.StateMachine.Name); + activity.SetTag(DiagnosticHeaders.PeerAddress, MessageTypeCache.DiagnosticAddress); + }); + } + + public static StartedActivity? StartExecuteActivity(this ILogContext logContext, ConsumeContext context) + where TActivity : IExecuteActivity + where TArguments : class + { + return StartActivity(activity => + { + activity.SetTag(DiagnosticHeaders.TrackingNumber, context.Message.TrackingNumber.ToString("D")); + activity.SetTag(DiagnosticHeaders.ConsumerType, TypeCache.ShortName); + activity.SetTag(DiagnosticHeaders.PeerAddress, MessageTypeCache.DiagnosticAddress); + }); + } + + public static StartedActivity? StartCompensateActivity(this ILogContext logContext, ConsumeContext context) + where TActivity : ICompensateActivity + where TLog : class + { + return StartActivity(activity => + { + activity.SetTag(DiagnosticHeaders.TrackingNumber, context.Message.TrackingNumber.ToString("D")); + activity.SetTag(DiagnosticHeaders.ConsumerType, TypeCache.ShortName); + activity.SetTag(DiagnosticHeaders.PeerAddress, MessageTypeCache.DiagnosticAddress); + }); + } + + public static StartedActivity? StartGenericActivity(this ILogContext logContext, string operationName) + { + var activity = Cached.Source.Value.CreateActivity(operationName, ActivityKind.Client); + if (activity == null) + return null; + + activity.Start(); + + return new StartedActivity(activity); + } + + static StartedActivity? PopulateSendActivity(SendContext context, System.Diagnostics.Activity activity, params (string Key, object? Value)[] tags) + where T : class + { + var conversationId = context.ConversationId?.ToString("D"); + + if (context.CorrelationId.HasValue) + activity.SetBaggage(DiagnosticHeaders.CorrelationId, context.CorrelationId.Value.ToString("D")); + if (conversationId != null) + activity.SetBaggage(DiagnosticHeaders.Messaging.ConversationId, conversationId); + + if (activity.IsAllDataRequested) + { + if (context.MessageId.HasValue) + activity.SetTag(DiagnosticHeaders.MessageId, context.MessageId.Value.ToString("D")); + if (conversationId != null) + activity.SetTag(DiagnosticHeaders.Messaging.ConversationId, conversationId); + if (context.CorrelationId.HasValue) + activity.SetTag(DiagnosticHeaders.CorrelationId, context.CorrelationId.Value.ToString("D")); + if (context.RequestId.HasValue) + activity.SetTag(DiagnosticHeaders.RequestId, context.RequestId.Value.ToString("D")); + if (context.InitiatorId.HasValue) + activity.SetTag(DiagnosticHeaders.InitiatorId, context.InitiatorId.Value.ToString("D")); + if (context.SourceAddress != null) + activity.SetTag(DiagnosticHeaders.SourceAddress, context.SourceAddress.ToString()); + if (context.DestinationAddress != null) + activity.SetTag(DiagnosticHeaders.DestinationAddress, context.DestinationAddress.ToString()); + + activity.SetTag(DiagnosticHeaders.MessageTypes, string.Join(",", MessageTypeCache.MessageTypeNames)); + + for (var i = 0; i < tags.Length; i++) + { + if (tags[i].Value != null) + activity.SetTag(tags[i].Key, tags[i].Value?.ToString()); + } + } + + activity.Start(); + + IList>? baggage = null; + foreach (KeyValuePair pair in activity.Baggage) + { + if (pair.Key.Equals(DiagnosticHeaders.Messaging.ConversationId) || pair.Key.Equals(DiagnosticHeaders.CorrelationId)) + continue; + + if (string.IsNullOrWhiteSpace(pair.Value)) + continue; + + baggage ??= new List>(); + baggage.Add(pair); + } + + if (activity.Id != null) + context.Headers.Set(DiagnosticHeaders.ActivityId, activity.Id); + + if (baggage != null) + context.Headers.Set(DiagnosticHeaders.ActivityCorrelationContext, baggage); + + return new StartedActivity(activity); + } + + static ActivityContext GetParentActivityContext(Headers headers) + { + return headers.TryGetHeader(DiagnosticHeaders.ActivityId, out var headerValue) && headerValue is string activityId + && ActivityContext.TryParse(activityId, null, out var activityContext) + ? activityContext + : default; + } + + static StartedActivity? StartActivity(Action started) + { + var currentActivity = System.Diagnostics.Activity.Current; + if (currentActivity == null) + return null; + + var operationName = Cached.OperationNames.GetOrAdd(currentActivity.OperationName, add => + { + if (add.EndsWith(" receive")) + return add.Substring(0, add.Length - 8) + " process"; + if (add.EndsWith(" process")) + return add; + + return currentActivity.OperationName; + }); + + var activity = Cached.Source.Value.CreateActivity(operationName, ActivityKind.Consumer); + if (activity == null) + return null; + + activity.SetTag(DiagnosticHeaders.Messaging.Operation, "process"); + + if (activity.IsAllDataRequested) + started(activity); + + activity.Start(); + + return new StartedActivity(activity); + } + + + static class Cached + { + internal static readonly Lazy Source = new Lazy(() => new ActivitySource(DiagnosticHeaders.DefaultListenerName)); + internal static readonly ConcurrentDictionary OperationNames = new ConcurrentDictionary(); + } + } +} diff --git a/src/MassTransit/Logging/StartedActivity.cs b/src/MassTransit/Logging/Diagnostics/StartedActivity.cs similarity index 100% rename from src/MassTransit/Logging/StartedActivity.cs rename to src/MassTransit/Logging/Diagnostics/StartedActivity.cs diff --git a/src/MassTransit/Logging/ILogContext.cs b/src/MassTransit/Logging/ILogContext.cs index fcaba678b0a..f586de37cf4 100644 --- a/src/MassTransit/Logging/ILogContext.cs +++ b/src/MassTransit/Logging/ILogContext.cs @@ -1,10 +1,7 @@ #nullable enable namespace MassTransit.Logging { - using Courier.Contracts; using Microsoft.Extensions.Logging; - using Middleware; - using Transports; /// @@ -12,12 +9,13 @@ namespace MassTransit.Logging /// public interface ILogContext { + ILogger Logger { get; } + /// /// The log context for all message movement, sent, received, etc. /// ILogContext Messages { get; } - ILogger Logger { get; } EnabledLogger? Critical { get; } EnabledLogger? Debug { get; } EnabledLogger? Error { get; } @@ -31,39 +29,5 @@ public interface ILogContext /// The category name for messages produced by the logger. /// The . ILogContext CreateLogContext(string categoryName); - - StartedActivity? StartSendActivity(SendTransportContext transportContext, SendContext context, params (string Key, object? Value)[] tags) - where T : class; - - StartedActivity? StartOutboxSendActivity(SendContext context) - where T : class; - - StartedActivity? StartOutboxDeliverActivity(OutboxMessageContext context); - - StartedActivity? StartReceiveActivity(string name, string inputAddress, string endpointName, ReceiveContext context); - - StartedActivity? StartConsumerActivity(ConsumeContext context) - where T : class; - - StartedActivity? StartHandlerActivity(ConsumeContext context) - where T : class; - - StartedActivity? StartSagaActivity(SagaConsumeContext context) - where TSaga : class, ISaga - where T : class; - - StartedActivity? StartSagaStateMachineActivity(BehaviorContext context) - where TSaga : class, ISaga - where T : class; - - StartedActivity? StartExecuteActivity(ConsumeContext context) - where TActivity : IExecuteActivity - where TArguments : class; - - StartedActivity? StartCompensateActivity(ConsumeContext context) - where TActivity : ICompensateActivity - where TLog : class; - - StartedActivity? StartGenericActivity(string operationName); } } diff --git a/src/MassTransit/Monitoring/Instrumentation.cs b/src/MassTransit/Logging/Monitoring/LogContextInstrumentationExtensions.cs similarity index 53% rename from src/MassTransit/Monitoring/Instrumentation.cs rename to src/MassTransit/Logging/Monitoring/LogContextInstrumentationExtensions.cs index d5ef5b53de2..b1026df3893 100644 --- a/src/MassTransit/Monitoring/Instrumentation.cs +++ b/src/MassTransit/Logging/Monitoring/LogContextInstrumentationExtensions.cs @@ -1,4 +1,4 @@ -namespace MassTransit.Monitoring +namespace MassTransit.Logging { using System; using System.Collections.Concurrent; @@ -7,38 +7,49 @@ namespace MassTransit.Monitoring using System.Linq; using System.Reflection; using System.Text; + using Courier.Contracts; using Metadata; + using Microsoft.Extensions.DependencyInjection; + using Microsoft.Extensions.Options; + using Middleware; + using Monitoring; + using Transports; - public static class Instrumentation + public static class LogContextInstrumentationExtensions { - public const string MeterName = "MassTransit"; - static readonly ConcurrentDictionary _labelCache = new ConcurrentDictionary(); static bool _isConfigured; - static string _serviceName; static Counter _receiveTotal; static Counter _receiveFaultTotal; static Counter _receiveInProgress; - static Histogram _receiveDuration; static Counter _consumeTotal; static Counter _consumeFaultTotal; static Counter _consumeRetryTotal; - static Counter _publishTotal; - static Counter _publishFaultTotal; + static Counter _sagaTotal; + static Counter _sagaFaultTotal; static Counter _sendTotal; static Counter _sendFaultTotal; static Counter _executeTotal; static Counter _executeFaultTotal; static Counter _compensateTotal; - static Counter _compensateFailureTotal; + static Counter _compensateFaultTotal; static Counter _consumerInProgress; + static Counter _handlerTotal; + static Counter _handlerFaultTotal; static Counter _handlerInProgress; static Counter _sagaInProgress; static Counter _executeInProgress; static Counter _compensateInProgress; + static Counter _outboxSendTotal; + static Counter _outboxSendFaultTotal; + static Counter _outboxDeliveryTotal; + static Counter _outboxDeliveryFaultTotal; + static Histogram _receiveDuration; static Histogram _consumeDuration; + static Histogram _handlerDuration; + static Histogram _sagaDuration; static Histogram _deliveryDuration; static Histogram _executeDuration; static Histogram _compensateDuration; @@ -48,270 +59,293 @@ public static class Instrumentation static Meter _meter; static InstrumentationOptions _options; - public static void MeasureReceived(ReceiveContext context, Exception exception = default) + public static StartedInstrument? StartReceiveInstrument(this ILogContext logContext, ReceiveContext context) { - if (!_receiveTotal.Enabled) - return; + if (!_isConfigured || !_receiveTotal.Enabled) + return null; var tagList = new TagList { - { _options.ServiceNameLabel, _serviceName }, - { _options.EndpointLabel, GetEndpointLabel(context.InputAddress) }, + { _options.ServiceNameLabel, _options.ServiceName }, + { _options.EndpointLabel, GetEndpointLabel(context.InputAddress) } }; _receiveTotal.Add(1, tagList); - _receiveDuration.Record(context.ElapsedTime.TotalMilliseconds, tagList); + _receiveInProgress.Add(1, tagList); - if (exception != null) + return new StartedInstrument(exception => { tagList.Add(_options.ExceptionTypeLabel, exception.GetType().Name); - _receiveFaultTotal.Add(1, tagList); - } + }, () => + { + _receiveInProgress.Add(-1, tagList); + _receiveDuration.Record(context.ElapsedTime.TotalMilliseconds, tagList); + }); } - public static void MeasureConsume(ConsumeContext context, TimeSpan duration, string consumerType, Exception exception = default) - where T : class + public static StartedInstrument? StartHandlerInstrument(this ILogContext logContext, ConsumeContext context, + Stopwatch stopwatch) + where TMessage : class { - if (!_consumeTotal.Enabled) - return; + if (!_isConfigured || !_handlerTotal.Enabled) + return null; - var messageTypeLabel = GetMessageTypeLabel(); + var messageTypeLabel = GetMessageTypeLabel(); var tagList = new TagList { - { _options.ServiceNameLabel, _serviceName }, + { _options.ServiceNameLabel, _options.ServiceName }, { _options.MessageTypeLabel, messageTypeLabel }, - { _options.ConsumerTypeLabel, GetConsumerTypeLabel(consumerType, TypeCache.ShortName, messageTypeLabel) } + { _options.ConsumerTypeLabel, GetConsumerTypeLabel, TMessage>(messageTypeLabel) } }; - _consumeTotal.Add(1, tagList); - _consumeDuration.Record(duration.TotalMilliseconds, tagList); - - var retryAttempt = context.GetRetryAttempt(); - if (retryAttempt > 0) - _consumeRetryTotal.Add(1, tagList); - - if (context.SentTime.HasValue) - { - var deliveryDuration = DateTime.UtcNow - context.SentTime.Value; - if (deliveryDuration < TimeSpan.Zero) - deliveryDuration = TimeSpan.Zero; + _handlerTotal.Add(1, tagList); + _handlerInProgress.Add(1, tagList); - _deliveryDuration.Record(deliveryDuration.TotalMilliseconds, tagList); - } - - if (exception != null) + return new StartedInstrument(exception => { tagList.Add(_options.ExceptionTypeLabel, exception.GetType().Name); - - _consumeFaultTotal.Add(1, tagList); - } + _handlerFaultTotal.Add(1, tagList); + }, () => + { + _handlerInProgress.Add(-1, tagList); + _handlerDuration.Record(stopwatch.ElapsedMilliseconds, tagList); + }); } - public static void MeasureExecute(ExecuteActivityContext context, Exception exception = default) - where TActivity : class, IExecuteActivity - where TArguments : class + public static StartedInstrument? StartSagaInstrument(this ILogContext logContext, SagaConsumeContext context) + where T : class + where TSaga : class, ISaga { - if (!_executeTotal.Enabled) - return; + if (!_isConfigured || !_sagaTotal.Enabled) + return null; + var messageTypeLabel = GetMessageTypeLabel(); var tagList = new TagList { - { _options.ServiceNameLabel, _serviceName }, - { _options.ActivityNameLabel, context.ActivityName }, - { _options.ArgumentTypeLabel, GetArgumentTypeLabel() } + { _options.ServiceNameLabel, _options.ServiceName }, + { _options.MessageTypeLabel, messageTypeLabel }, + { _options.ConsumerTypeLabel, GetConsumerTypeLabel(messageTypeLabel) } }; - _executeTotal.Add(1, tagList); - _executeDuration.Record(context.Elapsed.TotalMilliseconds, tagList); + _sagaTotal.Add(1, tagList); + _sagaInProgress.Add(1, tagList); - if (exception != null) + return new StartedInstrument(exception => { tagList.Add(_options.ExceptionTypeLabel, exception.GetType().Name); - - _executeFaultTotal.Add(1, tagList); - } + _sagaFaultTotal.Add(1, tagList); + }, () => + { + _sagaInProgress.Add(-1, tagList); + _sagaDuration.Record(context.ReceiveContext.ElapsedTime.TotalMilliseconds, tagList); + }); } - public static void MeasureCompensate(CompensateActivityContext context, Exception exception = default) - where TActivity : class, ICompensateActivity - where TLog : class + public static StartedInstrument? StartSagaStateMachineInstrument(this ILogContext logContext, BehaviorContext context) + where T : class + where TSaga : class, ISaga { - if (!_compensateTotal.Enabled) - return; + if (!_isConfigured || !_sagaTotal.Enabled) + return null; + var messageTypeLabel = GetMessageTypeLabel(); var tagList = new TagList { - { _options.ServiceNameLabel, _serviceName }, - { _options.ActivityNameLabel, context.ActivityName }, - { _options.LogTypeLabel, GetLogTypeLabel() } + { _options.ServiceNameLabel, _options.ServiceName }, + { _options.MessageTypeLabel, messageTypeLabel }, + { _options.ConsumerTypeLabel, GetConsumerTypeLabel(messageTypeLabel) } }; - _compensateTotal.Add(1, tagList); - _compensateDuration.Record(context.Elapsed.TotalMilliseconds, tagList); + _sagaTotal.Add(1, tagList); + _sagaInProgress.Add(1, tagList); - if (exception != null) + return new StartedInstrument(exception => { tagList.Add(_options.ExceptionTypeLabel, exception.GetType().Name); - - _compensateFailureTotal.Add(1, tagList); - } + _sagaFaultTotal.Add(1, tagList); + }, () => + { + _sagaInProgress.Add(-1, tagList); + _sagaDuration.Record(context.ReceiveContext.ElapsedTime.TotalMilliseconds, tagList); + }); } - public static void MeasurePublish(Exception exception = default) + public static StartedInstrument? StartConsumeInstrument(this ILogContext logContext, ConsumeContext context, Stopwatch timer) where T : class { - if (!_publishTotal.Enabled) - return; + if (!_isConfigured || !_consumeTotal.Enabled) + return null; + + var messageTypeLabel = GetMessageTypeLabel(); var tagList = new TagList { - { _options.ServiceNameLabel, _serviceName }, - { _options.MessageTypeLabel, GetMessageTypeLabel() }, + { _options.ServiceNameLabel, _options.ServiceName }, + { _options.MessageTypeLabel, messageTypeLabel }, + { _options.ConsumerTypeLabel, GetConsumerTypeLabel(messageTypeLabel) } }; - _publishTotal.Add(1, tagList); - - if (exception != null) - { - tagList.Add(_options.ExceptionTypeLabel, exception.GetType().Name); - _publishFaultTotal.Add(1, tagList); - } - } + _consumeTotal.Add(1, tagList); + _consumerInProgress.Add(1, tagList); - public static void MeasureSend(Exception exception = default) - where T : class - { - if (!_sendTotal.Enabled) - return; + var retryAttempt = context.GetRetryAttempt(); + if (retryAttempt > 0) + _consumeRetryTotal.Add(1, tagList); - var tagList = new TagList + if (context.SentTime.HasValue) { - { _options.ServiceNameLabel, _serviceName }, - { _options.MessageTypeLabel, GetMessageTypeLabel() }, - }; + var deliveryDuration = DateTime.UtcNow - context.SentTime.Value; + if (deliveryDuration < TimeSpan.Zero) + deliveryDuration = TimeSpan.Zero; - _sendTotal.Add(1, tagList); + _deliveryDuration.Record(deliveryDuration.TotalMilliseconds, tagList); + } - if (exception != null) + return new StartedInstrument(exception => { tagList.Add(_options.ExceptionTypeLabel, exception.GetType().Name); - _sendFaultTotal.Add(1, tagList); - } + _consumeFaultTotal.Add(1, tagList); + }, () => + { + _consumerInProgress.Add(-1, tagList); + _consumeDuration.Record(timer.ElapsedMilliseconds, tagList); + }); } - public static IDisposable TrackReceiveInProgress(ReceiveContext context) + public static StartedInstrument? StartActivityExecuteInstrument(this ILogContext logContext, + ConsumeContext context, Stopwatch timer) + where TActivity : class, IExecuteActivity + where TArguments : class { - if (!_receiveTotal.Enabled) + if (!_isConfigured || !_executeTotal.Enabled) return null; var tagList = new TagList { - { _options.ServiceNameLabel, _serviceName }, - { _options.EndpointLabel, GetEndpointLabel(context.InputAddress) }, + { _options.ServiceNameLabel, _options.ServiceName }, + { _options.ActivityNameLabel, GetActivityTypeLabel() }, + { _options.ArgumentTypeLabel, GetArgumentTypeLabel() } }; - return TrackInProgress(_receiveInProgress, tagList); + _executeTotal.Add(1, tagList); + _executeInProgress.Add(1, tagList); + + return new StartedInstrument(exception => + { + tagList.Add(_options.ExceptionTypeLabel, exception.GetType().Name); + _executeFaultTotal.Add(1, tagList); + }, () => + { + _executeInProgress.Add(-1, tagList); + _executeDuration.Record(timer.ElapsedMilliseconds, tagList); + }); } - public static IDisposable TrackConsumerInProgress() - where TConsumer : class - where TMessage : class + public static StartedInstrument? StartActivityCompensateInstrument(this ILogContext logContext, + ConsumeContext context, Stopwatch timer) + where TActivity : class, ICompensateActivity + where TLog : class { - if (!_consumeTotal.Enabled) + if (!_isConfigured || !_compensateTotal.Enabled) return null; - var messageTypeLabel = GetMessageTypeLabel(); var tagList = new TagList { - { _options.ServiceNameLabel, _serviceName }, - { _options.MessageTypeLabel, messageTypeLabel }, - { _options.ConsumerTypeLabel, GetConsumerTypeLabel(TypeCache.ShortName, TypeCache.ShortName, messageTypeLabel) } + { _options.ServiceNameLabel, _options.ServiceName }, + { _options.ActivityNameLabel, GetActivityTypeLabel() }, + { _options.LogTypeLabel, GetLogTypeLabel() } }; - return TrackInProgress(_consumerInProgress, tagList); + _compensateTotal.Add(1, tagList); + _compensateInProgress.Add(1, tagList); + + return new StartedInstrument(exception => + { + tagList.Add(_options.ExceptionTypeLabel, exception.GetType().Name); + _compensateFaultTotal.Add(1, tagList); + }, () => + { + _compensateInProgress.Add(-1, tagList); + _compensateDuration.Record(timer.ElapsedMilliseconds, tagList); + }); } - public static IDisposable TrackSagaInProgress() - where TSaga : class, ISaga - where TMessage : class + public static StartedInstrument? StartSendInstrument(this ILogContext logContext, SendTransportContext transportContext, SendContext context) + where T : class { - if (!_sagaInProgress.Enabled) + if (!_isConfigured || !_sendTotal.Enabled) return null; - var messageTypeLabel = GetMessageTypeLabel(); var tagList = new TagList { - { _options.ServiceNameLabel, _serviceName }, - { _options.MessageTypeLabel, messageTypeLabel }, - { _options.ConsumerTypeLabel, GetConsumerTypeLabel(TypeCache.ShortName, TypeCache.ShortName, messageTypeLabel) } + { _options.ServiceNameLabel, _options.ServiceName }, + { _options.MessageTypeLabel, GetMessageTypeLabel() } }; + _sendTotal.Add(1, tagList); - return TrackInProgress(_sagaInProgress, tagList); + return new StartedInstrument(exception => + { + tagList.Add(_options.ExceptionTypeLabel, exception.GetType().Name); + _sendFaultTotal.Add(1, tagList); + }); } - public static IDisposable TrackExecuteActivityInProgress(ExecuteActivityContext context) - where TActivity : class, IExecuteActivity - where TArguments : class + public static StartedInstrument? StartOutboxSendInstrument(this ILogContext logContext, SendContext context) + where T : class { - if (!_executeTotal.Enabled) + if (!_isConfigured || !_outboxSendTotal.Enabled) return null; var tagList = new TagList { - { _options.ServiceNameLabel, _serviceName }, - { _options.ActivityNameLabel, context.ActivityName }, - { _options.ArgumentTypeLabel, GetArgumentTypeLabel() } + { _options.ServiceNameLabel, _options.ServiceName }, + { _options.MessageTypeLabel, GetMessageTypeLabel() } }; - return TrackInProgress(_executeInProgress, tagList); + _outboxSendTotal.Add(1, tagList); + + return new StartedInstrument(exception => + { + tagList.Add(_options.ExceptionTypeLabel, exception.GetType().Name); + _outboxSendFaultTotal.Add(1, tagList); + }); } - public static IDisposable TrackCompensateActivityInProgress(CompensateActivityContext context) - where TActivity : class, ICompensateActivity - where TLog : class + public static StartedInstrument? StartOutboxDeliveryInstrument(this ILogContext logContext, OutboxMessageContext context) { - if (!_compensateTotal.Enabled) + if (!_isConfigured || !_outboxDeliveryTotal.Enabled) return null; var tagList = new TagList { - { _options.ServiceNameLabel, _serviceName }, - { _options.ActivityNameLabel, context.ActivityName }, - { _options.LogTypeLabel, GetLogTypeLabel() } + { _options.ServiceNameLabel, _options.ServiceName }, + { _options.EndpointLabel, GetEndpointLabel(context.DestinationAddress) } }; - return TrackInProgress(_compensateInProgress, tagList); - } + _outboxDeliveryTotal.Add(1, tagList); - public static IDisposable TrackHandlerInProgress() - where TMessage : class - { - var tagList = new TagList + return new StartedInstrument(exception => { - { _options.ServiceNameLabel, _serviceName }, - { _options.MessageTypeLabel, GetMessageTypeLabel() }, - }; - - return TrackInProgress(_handlerInProgress, tagList); + tagList.Add(_options.ExceptionTypeLabel, exception.GetType().Name); + _outboxDeliveryFaultTotal.Add(1, tagList); + }); } - static IDisposable TrackInProgress(Counter counter, TagList tagList) + public static void TryConfigure(IServiceProvider provider) { - counter.Add(1, tagList); + if (_isConfigured) + return; - return new InProgressTracker(counter, tagList); + TryConfigure(provider.GetRequiredService>().Value); } - public static void TryConfigure(string serviceName, InstrumentationOptions options) + public static void TryConfigure(InstrumentationOptions options) { if (_isConfigured) return; - _meter = new Meter(MeterName, HostMetadataCache.Host.MassTransitVersion); - - _serviceName = serviceName; - _options = options; + _meter = new Meter(InstrumentationOptions.MeterName, HostMetadataCache.Host.MassTransitVersion); // Counters @@ -322,17 +356,26 @@ public static void TryConfigure(string serviceName, InstrumentationOptions optio _consumeFaultTotal = _meter.CreateCounter(options.ConsumeFaultTotal, "ea", "Number of message consume faults"); _consumeRetryTotal = _meter.CreateCounter(options.ConsumeRetryTotal, "ea", "Number of message consume faults"); - _publishTotal = _meter.CreateCounter(options.PublishTotal, "ea", "Number of messages published"); - _publishFaultTotal = _meter.CreateCounter(options.PublishFaultTotal, "ea", "Number of message publish faults"); + _sagaTotal = _meter.CreateCounter(options.SagaTotal, "ea", "Number of sagas executed"); + _sagaFaultTotal = _meter.CreateCounter(options.SagaFaultTotal, "ea", "Number of sagas faults"); + + _handlerTotal = _meter.CreateCounter(options.HandlerTotal, "ea", "Number of messages handled"); + _handlerFaultTotal = _meter.CreateCounter(options.HandlerFaultTotal, "ea", "Number of message handler faults"); _sendTotal = _meter.CreateCounter(options.SendTotal, "ea", "Number of messages sent"); _sendFaultTotal = _meter.CreateCounter(options.SendFaultTotal, "ea", "Number of message send faults"); + _outboxSendTotal = _meter.CreateCounter(options.OutboxSendTotal, "ea", "Number of messages sent to outbox"); + _outboxSendFaultTotal = _meter.CreateCounter(options.OutboxSendFaultTotal, "ea", "Number of message send to outbox faults"); + _executeTotal = _meter.CreateCounter(options.ActivityExecuteTotal, "ea", "Number of activities executed"); _executeFaultTotal = _meter.CreateCounter(options.ActivityExecuteFaultTotal, "ea", "Number of activity execution faults"); _compensateTotal = _meter.CreateCounter(options.ActivityCompensateTotal, "ea", "Number of activities compensated"); - _compensateFailureTotal = _meter.CreateCounter(options.ActivityCompensateFailureTotal, "ea", "Number of activity compensation failures"); + _compensateFaultTotal = _meter.CreateCounter(options.ActivityCompensateFailureTotal, "ea", "Number of activity compensation failures"); + + _outboxDeliveryTotal = _meter.CreateCounter(options.OutboxDeliveryTotal, "ea", "Number of outbox delivery messages executed"); + _outboxDeliveryFaultTotal = _meter.CreateCounter(options.OutboxDeliveryFaultTotal, "ea", "Number of outbox delivery message failures"); // Gauges @@ -350,29 +393,33 @@ public static void TryConfigure(string serviceName, InstrumentationOptions optio // Histograms - _receiveDuration = _meter.CreateHistogram(options.ReceiveDuration, "ms", "Elapsed time spent receiving a message, in seconds"); + _receiveDuration = _meter.CreateHistogram(options.ReceiveDuration, "ms", "Elapsed time spent receiving a message, in millis"); - _consumeDuration = _meter.CreateHistogram(options.ConsumeDuration, "ms", "Elapsed time spent consuming a message, in seconds"); + _consumeDuration = _meter.CreateHistogram(options.ConsumeDuration, "ms", "Elapsed time spent consuming a message, in millis"); + + _sagaDuration = _meter.CreateHistogram(options.SagaDuration, "ms", "Elapsed time spent saga processing a message, in millis"); + + _handlerDuration = _meter.CreateHistogram(options.HandlerDuration, "ms", "Elapsed time spent handler processing a message, in millis"); _deliveryDuration = _meter.CreateHistogram(options.DeliveryDuration, "ms", - "Elapsed time between when the message was sent and when it was consumed, in seconds."); + "Elapsed time between when the message was sent and when it was consumed, in millis."); - _executeDuration = _meter.CreateHistogram(options.ActivityExecuteDuration, "ms", "Elapsed time spent executing an activity, in seconds"); + _executeDuration = _meter.CreateHistogram(options.ActivityExecuteDuration, "ms", "Elapsed time spent executing an activity, in millis"); _compensateDuration = _meter.CreateHistogram(options.ActivityCompensateDuration, "ms", - "Elapsed time spent compensating an activity, in seconds"); + "Elapsed time spent compensating an activity, in millis"); _isConfigured = true; } - static string GetConsumerTypeLabel(string consumerType, string messageType, string messageLabel) + static string GetConsumerTypeLabel(string messageLabel) { - return _labelCache.GetOrAdd(consumerType, type => + return _labelCache.GetOrAdd(TypeCache.ShortName, type => { if (type.StartsWith("MassTransit.MessageHandler<")) return "Handler"; - var genericMessageType = "<" + messageType + ">"; + var genericMessageType = "<" + TypeCache.ShortName + ">"; if (type.IndexOf(genericMessageType, StringComparison.Ordinal) >= 0) type = type.Replace(genericMessageType, "_" + messageLabel); @@ -413,6 +460,11 @@ static string GetLogTypeLabel() return _labelCache.GetOrAdd(TypeCache.ShortName, type => FormatTypeName(new StringBuilder(), typeof(TLog)).Replace("Log", "")); } + static string GetActivityTypeLabel() + { + return _labelCache.GetOrAdd(TypeCache.ShortName, type => FormatTypeName(new StringBuilder(), typeof(TActivity)).Replace("Activity", "")); + } + static string GetEndpointLabel(Uri inputAddress) { return inputAddress?.AbsolutePath.Split('/').LastOrDefault()?.Replace(".", "_").Replace("/", "_"); @@ -453,24 +505,5 @@ static string FormatTypeName(StringBuilder sb, Type type) return sb.ToString(); } - - - class InProgressTracker : - IDisposable - { - readonly Counter _counter; - readonly TagList _tagList; - - public InProgressTracker(Counter counter, TagList tagList) - { - _counter = counter; - _tagList = tagList; - } - - public void Dispose() - { - _counter.Add(-1, _tagList); - } - } } } diff --git a/src/MassTransit/Logging/Monitoring/StartedInstrument.cs b/src/MassTransit/Logging/Monitoring/StartedInstrument.cs new file mode 100644 index 00000000000..4e111af95d2 --- /dev/null +++ b/src/MassTransit/Logging/Monitoring/StartedInstrument.cs @@ -0,0 +1,28 @@ +#nullable enable +namespace MassTransit.Logging +{ + using System; + + + public readonly struct StartedInstrument + { + readonly Action _onFault; + readonly Action? _onStop; + + public StartedInstrument(Action onFault, Action? onStop = default) + { + _onFault = onFault; + _onStop = onStop; + } + + public void AddException(Exception exception) + { + _onFault(exception); + } + + public void Stop() + { + _onStop?.Invoke(); + } + } +} diff --git a/src/MassTransit/Middleware/ConsumerMessageFilter.cs b/src/MassTransit/Middleware/ConsumerMessageFilter.cs index c04b4728908..244929b3edc 100644 --- a/src/MassTransit/Middleware/ConsumerMessageFilter.cs +++ b/src/MassTransit/Middleware/ConsumerMessageFilter.cs @@ -38,9 +38,11 @@ void IProbeSite.Probe(ProbeContext context) [DebuggerNonUserCode] async Task IFilter>.Send(ConsumeContext context, IPipe> next) { + var timer = Stopwatch.StartNew(); + StartedActivity? activity = LogContext.Current?.StartConsumerActivity(context); + StartedInstrument? instrument = LogContext.Current?.StartConsumeInstrument(context, timer); - var timer = Stopwatch.StartNew(); try { await _consumerFactory.Send(context, _consumerPipe).ConfigureAwait(false); @@ -55,6 +57,8 @@ async Task IFilter>.Send(ConsumeContext conte activity?.AddExceptionEvent(exception); + instrument?.AddException(exception); + throw new ConsumerCanceledException($"The operation was canceled by the consumer: {TypeCache.ShortName}"); } catch (Exception exception) @@ -63,11 +67,14 @@ async Task IFilter>.Send(ConsumeContext conte activity?.AddExceptionEvent(exception); + instrument?.AddException(exception); + throw; } finally { activity?.Stop(); + instrument?.Stop(); } } } diff --git a/src/MassTransit/Middleware/HandlerMessageFilter.cs b/src/MassTransit/Middleware/HandlerMessageFilter.cs index c34979fb06f..aa49ec8638f 100644 --- a/src/MassTransit/Middleware/HandlerMessageFilter.cs +++ b/src/MassTransit/Middleware/HandlerMessageFilter.cs @@ -22,10 +22,7 @@ public class HandlerMessageFilter : // TODO this needs a pipe like instance and consumer, to handle things like retry, etc. public HandlerMessageFilter(MessageHandler handler) { - if (handler == null) - throw new ArgumentNullException(nameof(handler)); - - _handler = handler; + _handler = handler ?? throw new ArgumentNullException(nameof(handler)); } void IProbeSite.Probe(ProbeContext context) @@ -38,9 +35,10 @@ void IProbeSite.Probe(ProbeContext context) [DebuggerNonUserCode] async Task IFilter>.Send(ConsumeContext context, IPipe> next) { + var timer = Stopwatch.StartNew(); StartedActivity? activity = LogContext.Current?.StartHandlerActivity(context); + StartedInstrument? instrument = LogContext.Current?.StartHandlerInstrument(context, timer); - var timer = Stopwatch.StartNew(); try { await _handler(context).ConfigureAwait(false); @@ -55,6 +53,9 @@ async Task IFilter>.Send(ConsumeContext conte { await context.NotifyFaulted(timer.Elapsed, TypeCache>.ShortName, exception).ConfigureAwait(false); + activity?.AddExceptionEvent(exception); + instrument?.AddException(exception); + if (exception.CancellationToken == context.CancellationToken) throw; @@ -64,12 +65,16 @@ async Task IFilter>.Send(ConsumeContext conte { await context.NotifyFaulted(timer.Elapsed, TypeCache>.ShortName, ex).ConfigureAwait(false); + activity?.AddExceptionEvent(ex); + instrument?.AddException(ex); + Interlocked.Increment(ref _faulted); throw; } finally { activity?.Stop(); + instrument?.Stop(); } } } diff --git a/src/MassTransit/Middleware/InitiatedByOrOrchestratesSagaMessageFilter.cs b/src/MassTransit/Middleware/InitiatedByOrOrchestratesSagaMessageFilter.cs index 62e8205883b..252241b98eb 100644 --- a/src/MassTransit/Middleware/InitiatedByOrOrchestratesSagaMessageFilter.cs +++ b/src/MassTransit/Middleware/InitiatedByOrOrchestratesSagaMessageFilter.cs @@ -24,6 +24,7 @@ void IProbeSite.Probe(ProbeContext context) public async Task Send(SagaConsumeContext context, IPipe> next) { StartedActivity? activity = LogContext.Current?.StartSagaActivity(context); + StartedInstrument? instrument = LogContext.Current?.StartSagaInstrument(context); try { await context.Saga.Consume(context).ConfigureAwait(false); @@ -33,12 +34,14 @@ public async Task Send(SagaConsumeContext context, IPipe context, IPipe> next) { StartedActivity? activity = LogContext.Current?.StartSagaActivity(context); + StartedInstrument? instrument = LogContext.Current?.StartSagaInstrument(context); try { await context.Saga.Consume(context).ConfigureAwait(false); @@ -33,12 +34,14 @@ public async Task Send(SagaConsumeContext context, IPipe : public InstanceMessageFilter(TConsumer instance, IPipe> instancePipe) { - if (instance == null) - throw new ArgumentNullException(nameof(instance)); - - if (instancePipe == null) - throw new ArgumentNullException(nameof(instancePipe)); - - _instance = instance; - _instancePipe = instancePipe; + _instance = instance ?? throw new ArgumentNullException(nameof(instance)); + _instancePipe = instancePipe ?? throw new ArgumentNullException(nameof(instancePipe)); } void IProbeSite.Probe(ProbeContext context) @@ -43,9 +37,11 @@ void IProbeSite.Probe(ProbeContext context) [DebuggerNonUserCode] async Task IFilter>.Send(ConsumeContext context, IPipe> next) { + var timer = Stopwatch.StartNew(); + StartedActivity? activity = LogContext.Current?.StartConsumerActivity(context); + StartedInstrument? instrument = LogContext.Current?.StartConsumeInstrument(context, timer); - var timer = Stopwatch.StartNew(); try { await _instancePipe.Send(new ConsumerConsumeContextScope(context, _instance)).ConfigureAwait(false); @@ -59,6 +55,7 @@ async Task IFilter>.Send(ConsumeContext conte await context.NotifyFaulted(timer.Elapsed, TypeCache.ShortName, exception).ConfigureAwait(false); activity?.AddExceptionEvent(exception); + instrument?.AddException(exception); throw new ConsumerCanceledException($"The operation was canceled by the consumer: {TypeCache.ShortName}"); } @@ -67,12 +64,14 @@ async Task IFilter>.Send(ConsumeContext conte await context.NotifyFaulted(timer.Elapsed, TypeCache.ShortName, exception).ConfigureAwait(false); activity?.AddExceptionEvent(exception); + instrument?.AddException(exception); throw; } finally { activity?.Stop(); + instrument?.Stop(); } } } diff --git a/src/MassTransit/Middleware/InstrumentCompensateActivityFilter.cs b/src/MassTransit/Middleware/InstrumentCompensateActivityFilter.cs deleted file mode 100644 index b3660370b3c..00000000000 --- a/src/MassTransit/Middleware/InstrumentCompensateActivityFilter.cs +++ /dev/null @@ -1,24 +0,0 @@ -namespace MassTransit.Middleware -{ - using System.Threading.Tasks; - using Monitoring; - - - public class InstrumentCompensateActivityFilter : - IFilter> - where TActivity : class, ICompensateActivity - where TLog : class - { - public async Task Send(CompensateActivityContext context, IPipe> next) - { - using var inProgress = Instrumentation.TrackCompensateActivityInProgress(context); - - await next.Send(context).ConfigureAwait(false); - } - - public void Probe(ProbeContext context) - { - context.CreateFilterScope("instrument"); - } - } -} diff --git a/src/MassTransit/Middleware/InstrumentConsumerFilter.cs b/src/MassTransit/Middleware/InstrumentConsumerFilter.cs deleted file mode 100644 index f43edbb2196..00000000000 --- a/src/MassTransit/Middleware/InstrumentConsumerFilter.cs +++ /dev/null @@ -1,24 +0,0 @@ -namespace MassTransit.Middleware -{ - using System.Threading.Tasks; - using Monitoring; - - - public class InstrumentConsumerFilter : - IFilter> - where TConsumer : class - where TMessage : class - { - public async Task Send(ConsumerConsumeContext context, IPipe> next) - { - using var inProgress = Instrumentation.TrackConsumerInProgress(); - - await next.Send(context).ConfigureAwait(false); - } - - public void Probe(ProbeContext context) - { - context.CreateFilterScope("instrument"); - } - } -} diff --git a/src/MassTransit/Middleware/InstrumentExecuteActivityFilter.cs b/src/MassTransit/Middleware/InstrumentExecuteActivityFilter.cs deleted file mode 100644 index 44fa725a2dd..00000000000 --- a/src/MassTransit/Middleware/InstrumentExecuteActivityFilter.cs +++ /dev/null @@ -1,24 +0,0 @@ -namespace MassTransit.Middleware -{ - using System.Threading.Tasks; - using Monitoring; - - - public class InstrumentExecuteActivityFilter : - IFilter> - where TActivity : class, IExecuteActivity - where TArguments : class - { - public async Task Send(ExecuteActivityContext context, IPipe> next) - { - using var inProgress = Instrumentation.TrackExecuteActivityInProgress(context); - - await next.Send(context).ConfigureAwait(false); - } - - public void Probe(ProbeContext context) - { - context.CreateFilterScope("instrument"); - } - } -} diff --git a/src/MassTransit/Middleware/InstrumentHandlerFilter.cs b/src/MassTransit/Middleware/InstrumentHandlerFilter.cs deleted file mode 100644 index 978a2961f7c..00000000000 --- a/src/MassTransit/Middleware/InstrumentHandlerFilter.cs +++ /dev/null @@ -1,23 +0,0 @@ -namespace MassTransit.Middleware -{ - using System.Threading.Tasks; - using Monitoring; - - - public class InstrumentHandlerFilter : - IFilter> - where TMessage : class - { - public async Task Send(ConsumeContext context, IPipe> next) - { - using var inProgress = Instrumentation.TrackHandlerInProgress(); - - await next.Send(context).ConfigureAwait(false); - } - - public void Probe(ProbeContext context) - { - context.CreateFilterScope("instrument"); - } - } -} diff --git a/src/MassTransit/Middleware/InstrumentReceiveFilter.cs b/src/MassTransit/Middleware/InstrumentReceiveFilter.cs deleted file mode 100644 index c922d40496f..00000000000 --- a/src/MassTransit/Middleware/InstrumentReceiveFilter.cs +++ /dev/null @@ -1,22 +0,0 @@ -namespace MassTransit.Middleware -{ - using System.Threading.Tasks; - using Monitoring; - - - public class InstrumentReceiveFilter : - IFilter - { - public async Task Send(ReceiveContext context, IPipe next) - { - using var inProgress = Instrumentation.TrackReceiveInProgress(context); - - await next.Send(context).ConfigureAwait(false); - } - - public void Probe(ProbeContext context) - { - context.CreateFilterScope("instrument"); - } - } -} diff --git a/src/MassTransit/Middleware/InstrumentSagaFilter.cs b/src/MassTransit/Middleware/InstrumentSagaFilter.cs deleted file mode 100644 index fc736510fa2..00000000000 --- a/src/MassTransit/Middleware/InstrumentSagaFilter.cs +++ /dev/null @@ -1,24 +0,0 @@ -namespace MassTransit.Middleware -{ - using System.Threading.Tasks; - using Monitoring; - - - public class InstrumentSagaFilter : - IFilter> - where TSaga : class, ISaga - where TMessage : class - { - public async Task Send(SagaConsumeContext context, IPipe> next) - { - using var inProgress = Instrumentation.TrackSagaInProgress(); - - await next.Send(context).ConfigureAwait(false); - } - - public void Probe(ProbeContext context) - { - context.CreateFilterScope("instrument"); - } - } -} diff --git a/src/MassTransit/Middleware/ObservesSagaMessageFilter.cs b/src/MassTransit/Middleware/ObservesSagaMessageFilter.cs index fdc990a8977..896aeb1547b 100644 --- a/src/MassTransit/Middleware/ObservesSagaMessageFilter.cs +++ b/src/MassTransit/Middleware/ObservesSagaMessageFilter.cs @@ -24,6 +24,7 @@ void IProbeSite.Probe(ProbeContext context) public async Task Send(SagaConsumeContext context, IPipe> next) { StartedActivity? activity = LogContext.Current?.StartSagaActivity(context); + StartedInstrument? instrument = LogContext.Current?.StartSagaInstrument(context); try { await context.Saga.Consume(context).ConfigureAwait(false); @@ -33,12 +34,14 @@ public async Task Send(SagaConsumeContext context, IPipe context, IPipe> next) { StartedActivity? activity = LogContext.Current?.StartSagaActivity(context); + StartedInstrument? instrument = LogContext.Current?.StartSagaInstrument(context); try { await context.Saga.Consume(context).ConfigureAwait(false); @@ -33,12 +34,14 @@ public async Task Send(SagaConsumeContext context, IPipe(SendContext context) where T : class { StartedActivity? activity = LogContext.Current?.StartOutboxSendActivity(context); + StartedInstrument? instrument = LogContext.Current?.StartOutboxSendInstrument(context); try { await _context.AddSend(context).ConfigureAwait(false); @@ -185,11 +186,13 @@ async Task AddSend(SendContext context) catch (Exception ex) { activity?.AddExceptionEvent(ex); + instrument?.AddException(ex); throw; } finally { activity?.Stop(); + instrument?.Stop(); } } diff --git a/src/MassTransit/Middleware/OutboxMessagePipe.cs b/src/MassTransit/Middleware/OutboxMessagePipe.cs index ea339da0a30..24e7bb8b21c 100644 --- a/src/MassTransit/Middleware/OutboxMessagePipe.cs +++ b/src/MassTransit/Middleware/OutboxMessagePipe.cs @@ -100,6 +100,7 @@ async Task DeliverOutboxMessages(OutboxConsumeContext context) throw new ApplicationException("Simulated Delivery Failure Requested"); StartedActivity? activity = LogContext.Current?.StartOutboxDeliverActivity(message); + StartedInstrument? instrument = LogContext.Current?.StartOutboxDeliveryInstrument(message); try { await endpoint.Send(new SerializedMessageBody(), pipe, token.Token).ConfigureAwait(false); @@ -107,12 +108,14 @@ async Task DeliverOutboxMessages(OutboxConsumeContext context) catch (Exception exception) { activity?.AddExceptionEvent(exception); + instrument?.AddException(exception); throw; } finally { activity?.Stop(); + instrument?.Stop(); } LogContext.Debug?.Log("Outbox Sent: {InboxMessageId} {SequenceNumber} {MessageId}", context.MessageId, message.SequenceNumber, diff --git a/src/MassTransit/Middleware/StateMachineSagaMessageFilter.cs b/src/MassTransit/Middleware/StateMachineSagaMessageFilter.cs index 4850b8f6f15..e36466a5d13 100644 --- a/src/MassTransit/Middleware/StateMachineSagaMessageFilter.cs +++ b/src/MassTransit/Middleware/StateMachineSagaMessageFilter.cs @@ -52,6 +52,8 @@ public async Task Send(SagaConsumeContext context, IPipe behaviorContext = new BehaviorContextProxy(_machine, context, context, _event); StartedActivity? activity = LogContext.Current?.StartSagaStateMachineActivity(behaviorContext); + StartedInstrument? instrument = LogContext.Current?.StartSagaStateMachineInstrument(behaviorContext); + try { if (activity is { Activity: { IsAllDataRequested: true } }) @@ -74,12 +76,14 @@ public async Task Send(SagaConsumeContext context, IPipe context, IPipe(IExecuteActivityConfigurator configurator, Uri compensateAddress) - where TActivity : class, IExecuteActivity - where TArguments : class - { - var specification = new InstrumentExecuteActivitySpecification(); - - configurator.AddPipeSpecification(specification); - - configurator.ConnectActivityObserver(_observer); - } - - public void ExecuteActivityConfigured(IExecuteActivityConfigurator configurator) - where TActivity : class, IExecuteActivity - where TArguments : class - { - var specification = new InstrumentExecuteActivitySpecification(); - - configurator.AddPipeSpecification(specification); - - configurator.ConnectActivityObserver(_observer); - } - - public void CompensateActivityConfigured(ICompensateActivityConfigurator configurator) - where TActivity : class, ICompensateActivity - where TLog : class - { - var specification = new InstrumentCompensateActivitySpecification(); - - configurator.AddPipeSpecification(specification); - - configurator.ConnectActivityObserver(_observer); - } - } -} diff --git a/src/MassTransit/Monitoring/Configuration/InstrumentCompensateActivitySpecification.cs b/src/MassTransit/Monitoring/Configuration/InstrumentCompensateActivitySpecification.cs deleted file mode 100644 index 82d17ccefb8..00000000000 --- a/src/MassTransit/Monitoring/Configuration/InstrumentCompensateActivitySpecification.cs +++ /dev/null @@ -1,24 +0,0 @@ -namespace MassTransit.Monitoring.Configuration -{ - using System.Collections.Generic; - using System.Linq; - using MassTransit.Configuration; - using Middleware; - - - public class InstrumentCompensateActivitySpecification : - IPipeSpecification> - where TActivity : class, ICompensateActivity - where TLog : class - { - public void Apply(IPipeBuilder> builder) - { - builder.AddFilter(new InstrumentCompensateActivityFilter()); - } - - public IEnumerable Validate() - { - return Enumerable.Empty(); - } - } -} diff --git a/src/MassTransit/Monitoring/Configuration/InstrumentConsumerConfigurationObserver.cs b/src/MassTransit/Monitoring/Configuration/InstrumentConsumerConfigurationObserver.cs deleted file mode 100644 index 069f1e0808f..00000000000 --- a/src/MassTransit/Monitoring/Configuration/InstrumentConsumerConfigurationObserver.cs +++ /dev/null @@ -1,43 +0,0 @@ -namespace MassTransit.Monitoring.Configuration -{ - using System; - using Internals; - - - public class InstrumentConsumerConfigurationObserver : - IConsumerConfigurationObserver - { - public void ConsumerConfigured(IConsumerConfigurator configurator) - where TConsumer : class - { - } - - public void ConsumerMessageConfigured(IConsumerMessageConfigurator configurator) - where TConsumer : class - where TMessage : class - { - if (typeof(TMessage).ClosesType(typeof(Batch<>), out Type[] types)) - { - typeof(InstrumentConsumerConfigurationObserver) - .GetMethod(nameof(BatchConsumerConfigured)) - .MakeGenericMethod(typeof(TConsumer), types[0]) - .Invoke(this, new object[] { configurator }); - } - else - { - var specification = new InstrumentConsumerSpecification(); - - configurator.AddPipeSpecification(specification); - } - } - - public void BatchConsumerConfigured(IConsumerMessageConfigurator> configurator) - where TConsumer : class, IConsumer> - where TMessage : class - { - var specification = new InstrumentConsumerSpecification>(); - - configurator.AddPipeSpecification(specification); - } - } -} diff --git a/src/MassTransit/Monitoring/Configuration/InstrumentConsumerSpecification.cs b/src/MassTransit/Monitoring/Configuration/InstrumentConsumerSpecification.cs deleted file mode 100644 index be304ef6330..00000000000 --- a/src/MassTransit/Monitoring/Configuration/InstrumentConsumerSpecification.cs +++ /dev/null @@ -1,24 +0,0 @@ -namespace MassTransit.Monitoring.Configuration -{ - using System.Collections.Generic; - using System.Linq; - using MassTransit.Configuration; - using Middleware; - - - public class InstrumentConsumerSpecification : - IPipeSpecification> - where TConsumer : class - where TMessage : class - { - public void Apply(IPipeBuilder> builder) - { - builder.AddFilter(new InstrumentConsumerFilter()); - } - - public IEnumerable Validate() - { - return Enumerable.Empty(); - } - } -} diff --git a/src/MassTransit/Monitoring/Configuration/InstrumentExecuteActivitySpecification.cs b/src/MassTransit/Monitoring/Configuration/InstrumentExecuteActivitySpecification.cs deleted file mode 100644 index cf64471cfa4..00000000000 --- a/src/MassTransit/Monitoring/Configuration/InstrumentExecuteActivitySpecification.cs +++ /dev/null @@ -1,24 +0,0 @@ -namespace MassTransit.Monitoring.Configuration -{ - using System.Collections.Generic; - using System.Linq; - using MassTransit.Configuration; - using Middleware; - - - public class InstrumentExecuteActivitySpecification : - IPipeSpecification> - where TActivity : class, IExecuteActivity - where TArguments : class - { - public void Apply(IPipeBuilder> builder) - { - builder.AddFilter(new InstrumentExecuteActivityFilter()); - } - - public IEnumerable Validate() - { - return Enumerable.Empty(); - } - } -} diff --git a/src/MassTransit/Monitoring/Configuration/InstrumentHandlerConfigurationObserver.cs b/src/MassTransit/Monitoring/Configuration/InstrumentHandlerConfigurationObserver.cs deleted file mode 100644 index 9cecb6b4dfa..00000000000 --- a/src/MassTransit/Monitoring/Configuration/InstrumentHandlerConfigurationObserver.cs +++ /dev/null @@ -1,14 +0,0 @@ -namespace MassTransit.Monitoring.Configuration -{ - public class InstrumentHandlerConfigurationObserver : - IHandlerConfigurationObserver - { - public void HandlerConfigured(IHandlerConfigurator configurator) - where T : class - { - var specification = new InstrumentHandlerSpecification(); - - configurator.AddPipeSpecification(specification); - } - } -} diff --git a/src/MassTransit/Monitoring/Configuration/InstrumentHandlerSpecification.cs b/src/MassTransit/Monitoring/Configuration/InstrumentHandlerSpecification.cs deleted file mode 100644 index b4fe3be84b6..00000000000 --- a/src/MassTransit/Monitoring/Configuration/InstrumentHandlerSpecification.cs +++ /dev/null @@ -1,23 +0,0 @@ -namespace MassTransit.Monitoring.Configuration -{ - using System.Collections.Generic; - using System.Linq; - using MassTransit.Configuration; - using Middleware; - - - public class InstrumentHandlerSpecification : - IPipeSpecification> - where TMessage : class - { - public void Apply(IPipeBuilder> builder) - { - builder.AddFilter(new InstrumentHandlerFilter()); - } - - public IEnumerable Validate() - { - return Enumerable.Empty(); - } - } -} diff --git a/src/MassTransit/Monitoring/Configuration/InstrumentReceiveEndpointConfiguratorObserver.cs b/src/MassTransit/Monitoring/Configuration/InstrumentReceiveEndpointConfiguratorObserver.cs deleted file mode 100644 index 255ef9f4e70..00000000000 --- a/src/MassTransit/Monitoring/Configuration/InstrumentReceiveEndpointConfiguratorObserver.cs +++ /dev/null @@ -1,14 +0,0 @@ -namespace MassTransit.Monitoring.Configuration -{ - public class InstrumentReceiveEndpointConfiguratorObserver : - IEndpointConfigurationObserver - { - public void EndpointConfigured(T configurator) - where T : IReceiveEndpointConfigurator - { - var specification = new InstrumentReceiveSpecification(); - - configurator.ConfigureReceive(r => r.AddPipeSpecification(specification)); - } - } -} diff --git a/src/MassTransit/Monitoring/Configuration/InstrumentReceiveSpecification.cs b/src/MassTransit/Monitoring/Configuration/InstrumentReceiveSpecification.cs deleted file mode 100644 index 2ba7d6271fe..00000000000 --- a/src/MassTransit/Monitoring/Configuration/InstrumentReceiveSpecification.cs +++ /dev/null @@ -1,22 +0,0 @@ -namespace MassTransit.Monitoring.Configuration -{ - using System.Collections.Generic; - using System.Linq; - using MassTransit.Configuration; - using Middleware; - - - public class InstrumentReceiveSpecification : - IPipeSpecification - { - public void Apply(IPipeBuilder builder) - { - builder.AddFilter(new InstrumentReceiveFilter()); - } - - public IEnumerable Validate() - { - return Enumerable.Empty(); - } - } -} diff --git a/src/MassTransit/Monitoring/Configuration/InstrumentSagaConfigurationObserver.cs b/src/MassTransit/Monitoring/Configuration/InstrumentSagaConfigurationObserver.cs deleted file mode 100644 index a051c54501a..00000000000 --- a/src/MassTransit/Monitoring/Configuration/InstrumentSagaConfigurationObserver.cs +++ /dev/null @@ -1,25 +0,0 @@ -namespace MassTransit.Monitoring.Configuration -{ - public class InstrumentSagaConfigurationObserver : - ISagaConfigurationObserver - { - public void SagaConfigured(ISagaConfigurator configurator) - where T : class, ISaga - { - } - - public void StateMachineSagaConfigured(ISagaConfigurator configurator, SagaStateMachine stateMachine) - where TInstance : class, ISaga, SagaStateMachineInstance - { - } - - public void SagaMessageConfigured(ISagaMessageConfigurator configurator) - where T : class, ISaga - where TMessage : class - { - var specification = new InstrumentSagaSpecification(); - - configurator.AddPipeSpecification(specification); - } - } -} diff --git a/src/MassTransit/Monitoring/Configuration/InstrumentSagaSpecification.cs b/src/MassTransit/Monitoring/Configuration/InstrumentSagaSpecification.cs deleted file mode 100644 index 998d22249f2..00000000000 --- a/src/MassTransit/Monitoring/Configuration/InstrumentSagaSpecification.cs +++ /dev/null @@ -1,24 +0,0 @@ -namespace MassTransit.Monitoring.Configuration -{ - using System.Collections.Generic; - using System.Linq; - using MassTransit.Configuration; - using Middleware; - - - public class InstrumentSagaSpecification : - IPipeSpecification> - where TSaga : class, ISaga - where TMessage : class - { - public void Apply(IPipeBuilder> builder) - { - builder.AddFilter(new InstrumentSagaFilter()); - } - - public IEnumerable Validate() - { - return Enumerable.Empty(); - } - } -} diff --git a/src/MassTransit/Monitoring/ConfigureDefaultInstrumentationOptions.cs b/src/MassTransit/Monitoring/ConfigureDefaultInstrumentationOptions.cs new file mode 100644 index 00000000000..c6ee5afcea5 --- /dev/null +++ b/src/MassTransit/Monitoring/ConfigureDefaultInstrumentationOptions.cs @@ -0,0 +1,57 @@ +namespace MassTransit.Monitoring +{ + using Metadata; + using Microsoft.Extensions.Options; + + + public class ConfigureDefaultInstrumentationOptions : + IConfigureOptions + { + public void Configure(InstrumentationOptions options) + { + options.ServiceName = HostMetadataCache.Host.ProcessName; + options.EndpointLabel = "messaging.masstransit.destination"; + options.ConsumerTypeLabel = "messaging.masstransit.consumer_type"; + options.ExceptionTypeLabel = "messaging.masstransit.exception_type"; + options.MessageTypeLabel = "messaging.masstransit.message_type"; + options.ActivityNameLabel = "messaging.masstransit.activity_type"; + options.ArgumentTypeLabel = "messaging.masstransit.argument_type"; + options.LogTypeLabel = "messaging.masstransit.log_type"; + options.ServiceNameLabel = "messaging.masstransit.service"; + options.ReceiveTotal = "messaging.masstransit.receive"; + options.ReceiveFaultTotal = "messaging.masstransit.receive.errors"; + options.ReceiveDuration = "messaging.masstransit.receive.duration"; + options.ReceiveInProgress = "messaging.masstransit.receive.active"; + options.ConsumeTotal = "messaging.masstransit.consume"; + options.ConsumeFaultTotal = "messaging.masstransit.consume.errors"; + options.ConsumeRetryTotal = "messaging.masstransit.consume.retries"; + options.ConsumeDuration = "messaging.masstransit.consume.duration"; + options.ConsumerInProgress = "messaging.masstransit.consume.active"; + options.SagaTotal = "messaging.masstransit.saga"; + options.SagaFaultTotal = "messaging.masstransit.saga.errors"; + options.SagaDuration = "messaging.masstransit.saga.duration"; + options.HandlerTotal = "messaging.masstransit.handler"; + options.HandlerFaultTotal = "messaging.masstransit.handler.errors"; + options.HandlerDuration = "messaging.masstransit.handler.duration"; + options.OutboxDeliveryTotal = "messaging.masstransit.outbox.delivery"; + options.OutboxDeliveryFaultTotal = "messaging.masstransit.outbox.delivery.errors"; + options.DeliveryDuration = "messaging.masstransit.delivery.duration"; + options.SendTotal = "messaging.masstransit.send"; + options.SendFaultTotal = "messaging.masstransit.send.errors"; + options.OutboxSendTotal = "messaging.masstransit.outbox.send"; + options.OutboxSendFaultTotal = "messaging.masstransit.outbox.send.errors"; + options.ActivityExecuteTotal = "messaging.masstransit.execute"; + options.ActivityExecuteFaultTotal = "messaging.masstransit.execute.errors"; + options.ActivityExecuteDuration = "messaging.masstransit.execute.duration"; + options.ExecuteInProgress = "messaging.masstransit.execute.active"; + options.ActivityCompensateTotal = "messaging.masstransit.compensate"; + options.ActivityCompensateFailureTotal = "messaging.masstransit.compensate.errors"; + options.ActivityCompensateDuration = "messaging.masstransit.compensate.duration"; + options.CompensateInProgress = "messaging.masstransit.compensate.active"; + options.BusInstances = "messaging.masstransit.bus"; + options.EndpointInstances = "messaging.masstransit.endpoint"; + options.HandlerInProgress = "messaging.masstransit.handler.active"; + options.SagaInProgress = "messaging.masstransit.saga.active"; + } + } +} diff --git a/src/MassTransit/Monitoring/InstrumentActivityObserver.cs b/src/MassTransit/Monitoring/InstrumentActivityObserver.cs deleted file mode 100644 index 6043c9dade3..00000000000 --- a/src/MassTransit/Monitoring/InstrumentActivityObserver.cs +++ /dev/null @@ -1,60 +0,0 @@ -namespace MassTransit.Monitoring -{ - using System; - using System.Threading.Tasks; - - - public class InstrumentActivityObserver : - IActivityObserver - { - public Task PreExecute(ExecuteActivityContext context) - where TActivity : class, IExecuteActivity - where TArguments : class - { - return Task.CompletedTask; - } - - public Task PostExecute(ExecuteActivityContext context) - where TActivity : class, IExecuteActivity - where TArguments : class - { - Instrumentation.MeasureExecute(context); - - return Task.CompletedTask; - } - - public Task ExecuteFault(ExecuteActivityContext context, Exception exception) - where TActivity : class, IExecuteActivity - where TArguments : class - { - Instrumentation.MeasureExecute(context, exception); - - return Task.CompletedTask; - } - - public Task PreCompensate(CompensateActivityContext context) - where TActivity : class, ICompensateActivity - where TLog : class - { - return Task.CompletedTask; - } - - public Task PostCompensate(CompensateActivityContext context) - where TActivity : class, ICompensateActivity - where TLog : class - { - Instrumentation.MeasureCompensate(context); - - return Task.CompletedTask; - } - - public Task CompensateFail(CompensateActivityContext context, Exception exception) - where TActivity : class, ICompensateActivity - where TLog : class - { - Instrumentation.MeasureCompensate(context, exception); - - return Task.CompletedTask; - } - } -} diff --git a/src/MassTransit/Monitoring/InstrumentBusObserver.cs b/src/MassTransit/Monitoring/InstrumentBusObserver.cs deleted file mode 100644 index 5a5300c098b..00000000000 --- a/src/MassTransit/Monitoring/InstrumentBusObserver.cs +++ /dev/null @@ -1,51 +0,0 @@ -namespace MassTransit.Monitoring -{ - using System; - using System.Threading.Tasks; - - - public class InstrumentBusObserver : - IBusObserver - { - public void PostCreate(IBus bus) - { - bus.ConnectPublishObserver(new InstrumentPublishObserver()); - bus.ConnectSendObserver(new InstrumentSendObserver()); - bus.ConnectReceiveObserver(new InstrumentReceiveObserver()); - } - - public void CreateFaulted(Exception exception) - { - } - - public Task PreStart(IBus bus) - { - return Task.CompletedTask; - } - - public Task PostStart(IBus bus, Task busReady) - { - return Task.CompletedTask; - } - - public Task StartFaulted(IBus bus, Exception exception) - { - return Task.CompletedTask; - } - - public Task PreStop(IBus bus) - { - return Task.CompletedTask; - } - - public Task PostStop(IBus bus) - { - return Task.CompletedTask; - } - - public Task StopFaulted(IBus bus, Exception exception) - { - return Task.CompletedTask; - } - } -} diff --git a/src/MassTransit/Monitoring/InstrumentPublishObserver.cs b/src/MassTransit/Monitoring/InstrumentPublishObserver.cs deleted file mode 100644 index 070dc96a414..00000000000 --- a/src/MassTransit/Monitoring/InstrumentPublishObserver.cs +++ /dev/null @@ -1,32 +0,0 @@ -namespace MassTransit.Monitoring -{ - using System; - using System.Threading.Tasks; - - - public class InstrumentPublishObserver : - IPublishObserver - { - public Task PrePublish(PublishContext context) - where T : class - { - return Task.CompletedTask; - } - - public Task PostPublish(PublishContext context) - where T : class - { - Instrumentation.MeasurePublish(); - - return Task.CompletedTask; - } - - public Task PublishFault(PublishContext context, Exception exception) - where T : class - { - Instrumentation.MeasurePublish(exception); - - return Task.CompletedTask; - } - } -} diff --git a/src/MassTransit/Monitoring/InstrumentReceiveObserver.cs b/src/MassTransit/Monitoring/InstrumentReceiveObserver.cs deleted file mode 100644 index fb0c8325aa9..00000000000 --- a/src/MassTransit/Monitoring/InstrumentReceiveObserver.cs +++ /dev/null @@ -1,45 +0,0 @@ -namespace MassTransit.Monitoring -{ - using System; - using System.Threading.Tasks; - - - public class InstrumentReceiveObserver : - IReceiveObserver - { - public Task PreReceive(ReceiveContext context) - { - return Task.CompletedTask; - } - - public Task PostReceive(ReceiveContext context) - { - Instrumentation.MeasureReceived(context); - - return Task.CompletedTask; - } - - public Task PostConsume(ConsumeContext context, TimeSpan duration, string consumerType) - where T : class - { - Instrumentation.MeasureConsume(context, duration, consumerType); - - return Task.CompletedTask; - } - - public Task ConsumeFault(ConsumeContext context, TimeSpan duration, string consumerType, Exception exception) - where T : class - { - Instrumentation.MeasureConsume(context, duration, consumerType, exception); - - return Task.CompletedTask; - } - - public Task ReceiveFault(ReceiveContext context, Exception exception) - { - Instrumentation.MeasureReceived(context, exception); - - return Task.CompletedTask; - } - } -} diff --git a/src/MassTransit/Monitoring/InstrumentSendObserver.cs b/src/MassTransit/Monitoring/InstrumentSendObserver.cs deleted file mode 100644 index f46a59913b1..00000000000 --- a/src/MassTransit/Monitoring/InstrumentSendObserver.cs +++ /dev/null @@ -1,32 +0,0 @@ -namespace MassTransit.Monitoring -{ - using System; - using System.Threading.Tasks; - - - public class InstrumentSendObserver : - ISendObserver - { - public Task PreSend(SendContext context) - where T : class - { - return Task.CompletedTask; - } - - public Task PostSend(SendContext context) - where T : class - { - Instrumentation.MeasureSend(); - - return Task.CompletedTask; - } - - public Task SendFault(SendContext context, Exception exception) - where T : class - { - Instrumentation.MeasureSend(exception); - - return Task.CompletedTask; - } - } -} diff --git a/src/MassTransit/Monitoring/InstrumentationOptions.cs b/src/MassTransit/Monitoring/InstrumentationOptions.cs index af012a2bf5e..17d489b608e 100644 --- a/src/MassTransit/Monitoring/InstrumentationOptions.cs +++ b/src/MassTransit/Monitoring/InstrumentationOptions.cs @@ -1,7 +1,14 @@ namespace MassTransit.Monitoring { + using System; + + public class InstrumentationOptions { + public const string MeterName = "MassTransit"; + + public string ServiceName { get; set; } + public string EndpointLabel { get; set; } public string ConsumerTypeLabel { get; set; } public string ExceptionTypeLabel { get; set; } @@ -19,8 +26,21 @@ public class InstrumentationOptions public string ConsumeTotal { get; set; } public string ConsumeFaultTotal { get; set; } public string ConsumeRetryTotal { get; set; } + + public string SagaTotal { get; set; } + public string SagaFaultTotal { get; set; } + public string SagaDuration { get; set; } + + public string HandlerTotal { get; set; } + public string HandlerFaultTotal { get; set; } + public string HandlerDuration { get; set; } + + [Obsolete] public string PublishTotal { get; set; } + + [Obsolete] public string PublishFaultTotal { get; set; } + public string SendTotal { get; set; } public string SendFaultTotal { get; set; } public string ActivityExecuteTotal { get; set; } @@ -41,45 +61,10 @@ public class InstrumentationOptions public string ConsumeDuration { get; set; } public string DeliveryDuration { get; set; } - public static InstrumentationOptions CreateDefault() - { - return new InstrumentationOptions - { - EndpointLabel = "messaging.masstransit.destination", - ConsumerTypeLabel = "messaging.masstransit.consumer_type", - ExceptionTypeLabel = "messaging.masstransit.exception_type", - MessageTypeLabel = "messaging.masstransit.message_type", - ActivityNameLabel = "messaging.masstransit.activity_type", - ArgumentTypeLabel = "messaging.masstransit.argument_type", - LogTypeLabel = "messaging.masstransit.log_type", - ServiceNameLabel = "messaging.masstransit.service", - ReceiveTotal = "messaging.masstransit.receive", - ReceiveFaultTotal = "messaging.masstransit.receive.errors", - ReceiveDuration = "messaging.masstransit.receive.duration", - ReceiveInProgress = "messaging.masstransit.receive.active", - ConsumeTotal = "messaging.masstransit.consume", - ConsumeFaultTotal = "messaging.masstransit.consume.errors", - ConsumeRetryTotal = "messaging.masstransit.consume.retries", - ConsumeDuration = "messaging.masstransit.consume.duration", - ConsumerInProgress = "messaging.masstransit.consume.active", - DeliveryDuration = "messaging.masstransit.delivery.duration", - PublishTotal = "messaging.masstransit.publish", - PublishFaultTotal = "messaging.masstransit.publish.errors", - SendTotal = "messaging.masstransit.send", - SendFaultTotal = "messaging.masstransit.send.errors", - ActivityExecuteTotal = "messaging.masstransit.execute", - ActivityExecuteFaultTotal = "messaging.masstransit.execute.errors", - ActivityExecuteDuration = "messaging.masstransit.execute.duration", - ExecuteInProgress = "messaging.masstransit.execute.active", - ActivityCompensateTotal = "messaging.masstransit.compensate", - ActivityCompensateFailureTotal = "messaging.masstransit.compensate.errors", - ActivityCompensateDuration = "messaging.masstransit.compensate.duration", - CompensateInProgress = "messaging.masstransit.compensate.active", - BusInstances = "messaging.masstransit.bus", - EndpointInstances = "messaging.masstransit.endpoint", - HandlerInProgress = "messaging.masstransit.handler.active", - SagaInProgress = "messaging.masstransit.saga.active", - }; - } + public string OutboxSendTotal { get; set; } + public string OutboxSendFaultTotal { get; set; } + + public string OutboxDeliveryTotal { get; set; } + public string OutboxDeliveryFaultTotal { get; set; } } } diff --git a/src/MassTransit/Transports/ReceivePipeDispatcher.cs b/src/MassTransit/Transports/ReceivePipeDispatcher.cs index c7c06327189..45b73952cb5 100644 --- a/src/MassTransit/Transports/ReceivePipeDispatcher.cs +++ b/src/MassTransit/Transports/ReceivePipeDispatcher.cs @@ -51,6 +51,8 @@ public async Task Dispatch(ReceiveContext context, ReceiveLockContext receiveLoc var active = StartDispatch(); StartedActivity? activity = LogContext.Current?.StartReceiveActivity(_activityName, _inputAddress, _endpointName, context); + StartedInstrument? instrument = LogContext.Current?.StartReceiveInstrument(context); + try { if (_observers.Count > 0) @@ -85,24 +87,30 @@ public async Task Dispatch(ReceiveContext context, ReceiveLockContext receiveLoc await receiveLockFaultedTask.ConfigureAwait(false); activity?.AddExceptionEvent(ex); + instrument?.AddException(ex); } catch (Exception releaseLockException) { var aggregateException = new AggregateException("ReceiveLock.Faulted threw an exception", releaseLockException, ex); activity?.AddExceptionEvent(aggregateException); + instrument?.AddException(aggregateException); throw aggregateException; } } else + { activity?.AddExceptionEvent(ex); + instrument?.AddException(ex); + } throw; } finally { activity?.Stop(); + instrument?.Stop(); await active.Complete().ConfigureAwait(false); } diff --git a/src/MassTransit/Transports/SendTransport.cs b/src/MassTransit/Transports/SendTransport.cs index ba81490ae77..68c32f3dadc 100644 --- a/src/MassTransit/Transports/SendTransport.cs +++ b/src/MassTransit/Transports/SendTransport.cs @@ -84,6 +84,7 @@ public async Task Send(TContext context) SendContext sendContext = await _sendTransportContext.CreateSendContext(context, _message, _pipe, _cancellationToken).ConfigureAwait(false); StartedActivity? activity = LogContext.Current?.StartSendActivity(_sendTransportContext, sendContext); + StartedInstrument? instrument = LogContext.Current?.StartSendInstrument(_sendTransportContext, sendContext); try { if (_sendTransportContext.SendObservers.Count > 0) @@ -105,12 +106,14 @@ public async Task Send(TContext context) await _sendTransportContext.SendObservers.SendFault(sendContext, ex).ConfigureAwait(false); activity?.AddExceptionEvent(ex); + instrument?.AddException(ex); throw; } finally { activity?.Stop(); + instrument?.Stop(); } } diff --git a/src/Persistence/MassTransit.EntityFrameworkCoreIntegration/EntityFrameworkCoreIntegration/BusOutboxDeliveryService.cs b/src/Persistence/MassTransit.EntityFrameworkCoreIntegration/EntityFrameworkCoreIntegration/BusOutboxDeliveryService.cs index 96842720440..8bc82847acf 100644 --- a/src/Persistence/MassTransit.EntityFrameworkCoreIntegration/EntityFrameworkCoreIntegration/BusOutboxDeliveryService.cs +++ b/src/Persistence/MassTransit.EntityFrameworkCoreIntegration/EntityFrameworkCoreIntegration/BusOutboxDeliveryService.cs @@ -249,13 +249,22 @@ async Task DeliverOutboxMessages(TDbContext dbContext, OutboxState outboxSt var endpoint = await _busControl.GetSendEndpoint(message.DestinationAddress).ConfigureAwait(false); StartedActivity? activity = LogContext.Current?.StartOutboxDeliverActivity(message); + StartedInstrument? instrument = LogContext.Current?.StartOutboxDeliveryInstrument(message); + try { await endpoint.Send(new SerializedMessageBody(), pipe, token.Token).ConfigureAwait(false); } + catch (Exception ex) + { + activity?.AddExceptionEvent(ex); + instrument?.AddException(ex); + throw; + } finally { activity?.Stop(); + instrument?.Stop(); } sentSequenceNumber = message.SequenceNumber; diff --git a/src/Persistence/MassTransit.MongoDbIntegration/MongoDbIntegration/BusOutboxDeliveryService.cs b/src/Persistence/MassTransit.MongoDbIntegration/MongoDbIntegration/BusOutboxDeliveryService.cs index 17b3ee737be..2b77b32560f 100644 --- a/src/Persistence/MassTransit.MongoDbIntegration/MongoDbIntegration/BusOutboxDeliveryService.cs +++ b/src/Persistence/MassTransit.MongoDbIntegration/MongoDbIntegration/BusOutboxDeliveryService.cs @@ -239,6 +239,7 @@ async Task DeliverOutboxMessages(MongoDbCollectionContext m var endpoint = await _busControl.GetSendEndpoint(message.DestinationAddress).ConfigureAwait(false); StartedActivity? activity = LogContext.Current?.StartOutboxDeliverActivity(message); + StartedInstrument? instrument = LogContext.Current?.StartOutboxDeliveryInstrument(message); try { await endpoint.Send(new SerializedMessageBody(), pipe, token.Token).ConfigureAwait(false); @@ -246,12 +247,14 @@ async Task DeliverOutboxMessages(MongoDbCollectionContext m catch (Exception exception) { activity?.AddExceptionEvent(exception); + instrument?.AddException(exception); throw; } finally { activity?.Stop(); + instrument?.Stop(); } sentSequenceNumber = message.SequenceNumber; diff --git a/src/Transports/MassTransit.EventHubIntegration/EventHubIntegration/EventHubProducer.cs b/src/Transports/MassTransit.EventHubIntegration/EventHubIntegration/EventHubProducer.cs index fff70be3ae8..f2378805801 100644 --- a/src/Transports/MassTransit.EventHubIntegration/EventHubIntegration/EventHubProducer.cs +++ b/src/Transports/MassTransit.EventHubIntegration/EventHubIntegration/EventHubProducer.cs @@ -123,6 +123,8 @@ public async Task Send(ProducerContext context) sendContext.CancellationToken.ThrowIfCancellationRequested(); StartedActivity? activity = LogContext.Current?.StartSendActivity(_context, sendContext); + StartedInstrument? instrument = LogContext.Current?.StartSendInstrument(_context, sendContext); + try { if (_context.SendObservers.Count > 0) @@ -144,12 +146,14 @@ public async Task Send(ProducerContext context) await _context.SendObservers.SendFault(sendContext, exception).ConfigureAwait(false); activity?.AddExceptionEvent(exception); + instrument?.AddException(exception); throw; } finally { activity?.Stop(); + instrument?.Stop(); } } diff --git a/src/Transports/MassTransit.KafkaIntegration/KafkaIntegration/TopicProducer.cs b/src/Transports/MassTransit.KafkaIntegration/KafkaIntegration/TopicProducer.cs index 5a50b05e3d4..ce3c9dbbf57 100644 --- a/src/Transports/MassTransit.KafkaIntegration/KafkaIntegration/TopicProducer.cs +++ b/src/Transports/MassTransit.KafkaIntegration/KafkaIntegration/TopicProducer.cs @@ -105,6 +105,7 @@ public async Task Send(ProducerContext context) sendContext.CancellationToken.ThrowIfCancellationRequested(); StartedActivity? activity = LogContext.Current?.StartSendActivity(_context, sendContext); + StartedInstrument? instrument = LogContext.Current?.StartSendInstrument(_context, sendContext); try { if (_context.SendObservers.Count > 0) @@ -126,12 +127,14 @@ public async Task Send(ProducerContext context) await _context.SendObservers.SendFault(sendContext, exception).ConfigureAwait(false); activity?.AddExceptionEvent(exception); + instrument?.AddException(exception); throw; } finally { activity?.Stop(); + instrument?.Stop(); } } diff --git a/tests/MassTransit.Benchmark/MassTransit.Benchmark.csproj b/tests/MassTransit.Benchmark/MassTransit.Benchmark.csproj index 50f2d41d655..0ca1743adfc 100644 --- a/tests/MassTransit.Benchmark/MassTransit.Benchmark.csproj +++ b/tests/MassTransit.Benchmark/MassTransit.Benchmark.csproj @@ -20,5 +20,6 @@ + diff --git a/tests/MassTransit.Benchmark/Program.cs b/tests/MassTransit.Benchmark/Program.cs index e1bb05e356b..0e716c62570 100644 --- a/tests/MassTransit.Benchmark/Program.cs +++ b/tests/MassTransit.Benchmark/Program.cs @@ -8,7 +8,13 @@ using System.Threading.Tasks; using BusOutbox; using Latency; + using MassTransit.Logging; + using MassTransit.Monitoring; using NDesk.Options; + using OpenTelemetry; + using OpenTelemetry.Metrics; + using OpenTelemetry.Resources; + using OpenTelemetry.Trace; using RequestResponse; @@ -23,6 +29,7 @@ static async Task Main(string[] args) var optionSet = new ProgramOptionSet(); + var disposables = new List(); try { _remaining = optionSet.Parse(args); @@ -37,6 +44,24 @@ static async Task Main(string[] args) { } + if (optionSet.EnableMetrics) + { + disposables.Add(Sdk.CreateMeterProviderBuilder() + .AddMeter(InstrumentationOptions.MeterName) + .ConfigureResource(r => r.AddService("MassTransit.Benchmark")) + .AddOtlpExporter() + .Build()); + } + + if (optionSet.EnableTraces) + { + disposables.Add(Sdk.CreateTracerProviderBuilder() + .AddSource(DiagnosticHeaders.DefaultListenerName) + .ConfigureResource(r => r.AddService("MassTransit.Benchmark")) + .AddOtlpExporter() + .Build()); + } + optionSet.ShowOptions(); if (optionSet.Threads.HasValue) @@ -70,6 +95,10 @@ static async Task Main(string[] args) { Console.WriteLine("Crashed: {0}", ex.Message); } + finally + { + disposables.ForEach(x => x.Dispose()); + } } static async Task RunLatencyBenchmark(ProgramOptionSet optionSet) diff --git a/tests/MassTransit.Benchmark/ProgramOptionSet.cs b/tests/MassTransit.Benchmark/ProgramOptionSet.cs index 732f9144151..afdb53b0f52 100644 --- a/tests/MassTransit.Benchmark/ProgramOptionSet.cs +++ b/tests/MassTransit.Benchmark/ProgramOptionSet.cs @@ -35,6 +35,9 @@ public ProgramOptionSet() Add("v|verbose", "Verbose output", x => Verbose = x != null); Add("?|help", "Display this help and exit", x => Help = x != null); Add("threads:", "The minimum number of thread pool threads", value => Threads = value); + Add("traces", "Enable traces capturing to OTel exporter", x => EnableTraces = x != null); + Add("metrics", "Enable metrics capturing to OTel exporter", x => EnableMetrics = x != null); + Add("t|transport:", "Transport (RabbitMQ, AzureServiceBus, Mediator, AmazonSqs, InMemory, Grpc)", value => Transport = value); Add("rabbitmq", "Use RabbitMQ", x => Transport = TransportOptions.RabbitMq); @@ -59,6 +62,8 @@ public ProgramOptionSet() public int? Threads { get; set; } public bool Verbose { get; set; } public bool Help { get; set; } + public bool EnableTraces { get; set; } + public bool EnableMetrics { get; set; } public TransportOptions Transport { get; private set; } diff --git a/tests/MassTransit.Benchmark/configs/otel-collector/otelcol-config-extras.yml b/tests/MassTransit.Benchmark/configs/otel-collector/otelcol-config-extras.yml new file mode 100644 index 00000000000..9b76acda52e --- /dev/null +++ b/tests/MassTransit.Benchmark/configs/otel-collector/otelcol-config-extras.yml @@ -0,0 +1,2 @@ +# extra settings to be merged into OpenTelemetry Collector configuration +# do not delete this file diff --git a/tests/MassTransit.Benchmark/configs/otel-collector/otelcol-config.yml b/tests/MassTransit.Benchmark/configs/otel-collector/otelcol-config.yml new file mode 100644 index 00000000000..1f254c11104 --- /dev/null +++ b/tests/MassTransit.Benchmark/configs/otel-collector/otelcol-config.yml @@ -0,0 +1,26 @@ +receivers: + otlp: + protocols: + grpc: + +exporters: + logging: + loglevel: debug + +processors: + batch: + +service: + pipelines: + traces: + receivers: [ otlp ] + processors: [ batch ] + exporters: [ logging ] + metrics: + receivers: [ otlp ] + processors: [ batch ] + exporters: [ logging ] + logs: + receivers: [ otlp ] + processors: [ batch ] + exporters: [ logging ] diff --git a/tests/MassTransit.Benchmark/docker-compose.yml b/tests/MassTransit.Benchmark/docker-compose.yml index d521d87b5d3..455d6b977e1 100644 --- a/tests/MassTransit.Benchmark/docker-compose.yml +++ b/tests/MassTransit.Benchmark/docker-compose.yml @@ -13,6 +13,15 @@ services: ports: - "1433:1433" + otel-collector: + image: otel/opentelemetry-collector + restart: always + command: [ "--config=/etc/otelcol-config.yml" ] + volumes: + - ./configs/otel-collector/otelcol-config.yml:/etc/otelcol-config.yml + ports: + - "4317:4317" # OTLP gRPC receiver + redpanda: image: docker.redpanda.com/vectorized/redpanda:latest command: