Skip to content

Commit

Permalink
Refactoring telemetry
Browse files Browse the repository at this point in the history
  • Loading branch information
NooNameR authored and phatboyg committed Apr 10, 2023
1 parent f92007a commit 6f0a00a
Show file tree
Hide file tree
Showing 60 changed files with 795 additions and 1,222 deletions.
1 change: 1 addition & 0 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
<PackageVersion Include="NUnit.Analyzers" Version="3.3.0" />
<PackageVersion Include="NUnit3TestAdapter" Version="4.3.1" />
<PackageVersion Include="OpenTelemetry.Exporter.Jaeger" Version="1.3.2" />
<PackageVersion Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.4.0"/>
<PackageVersion Include="prometheus-net" Version="6.0.0" />
<PackageVersion Include="Quartz" Version="3.6.0" />
<PackageVersion Include="Quartz.Extensions.Hosting" Version="3.6.0" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public static IServiceCollection AddMassTransit(this IServiceCollection collecti
}

AddHostedService(collection);
AddInstrumentation(collection);

var configurator = new ServiceCollectionBusConfigurator(collection);

Expand Down Expand Up @@ -85,6 +86,7 @@ public static IServiceCollection AddMassTransit<TBus, TBusInstance>(this IServic
}

AddHostedService(collection);
AddInstrumentation(collection);

var configurator = new ServiceCollectionBusConfigurator<TBus, TBusInstance>(collection);

Expand All @@ -109,6 +111,7 @@ public static IServiceCollection AddMassTransit<TBus>(this IServiceCollection co
throw new ArgumentNullException(nameof(configure));

AddHostedService(collection);
AddInstrumentation(collection);

var doIt = new Callback<TBus>(collection, configure);

Expand Down Expand Up @@ -155,6 +158,12 @@ public static void ReplaceScoped<TService, TImplementation>(this IServiceCollect
services.Replace(new ServiceDescriptor(typeof(TService), typeof(TImplementation), ServiceLifetime.Scoped));
}

static void AddInstrumentation(IServiceCollection collection)
{
collection.AddOptions();
collection.AddSingleton<IConfigureOptions<InstrumentationOptions>, ConfigureDefaultInstrumentationOptions>();
}

static void AddHostedService(IServiceCollection collection)
{
collection.AddOptions();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
namespace MassTransit
{
using System;
using Metadata;
using Logging;
using Monitoring;
using Monitoring.Configuration;


public static class InstrumentationConfigurationExtensions
Expand All @@ -20,25 +19,16 @@ public static class InstrumentationConfigurationExtensions
public static void UseInstrumentation(this IBusFactoryConfigurator configurator, Action<InstrumentationOptions> configureOptions = null,
string serviceName = default)
{
var options = InstrumentationOptions.CreateDefault();
var options = new InstrumentationOptions();
var configureDefault = new ConfigureDefaultInstrumentationOptions();

configureDefault.Configure(options);
configureOptions?.Invoke(options);

Instrumentation.TryConfigure(GetServiceName(serviceName), options);
if (!string.IsNullOrWhiteSpace(serviceName))
options.ServiceName = serviceName;

configurator.ConnectConsumerConfigurationObserver(new InstrumentConsumerConfigurationObserver());
configurator.ConnectHandlerConfigurationObserver(new InstrumentHandlerConfigurationObserver());
configurator.ConnectSagaConfigurationObserver(new InstrumentSagaConfigurationObserver());
configurator.ConnectActivityConfigurationObserver(new InstrumentActivityConfigurationObserver());
configurator.ConnectEndpointConfigurationObserver(new InstrumentReceiveEndpointConfiguratorObserver());
configurator.ConnectBusObserver(new InstrumentBusObserver());
}

static string GetServiceName(string serviceName)
{
return string.IsNullOrWhiteSpace(serviceName)
? HostMetadataCache.Host.ProcessName
: serviceName;
LogContextInstrumentationExtensions.TryConfigure(options);
}
}
}
9 changes: 8 additions & 1 deletion src/MassTransit/Courier/CompensateActivityHost.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ public CompensateActivityHost(IPipe<CompensateContext<TLog>> compensatePipe)

public async Task Send(ConsumeContext<RoutingSlip> context, IPipe<ConsumeContext<RoutingSlip>> next)
{
var timer = Stopwatch.StartNew();

StartedActivity? activity = LogContext.Current?.StartCompensateActivity<TActivity, TLog>(context);
StartedInstrument? instrument = LogContext.Current?.StartActivityCompensateInstrument<TActivity, TLog>(context, timer);

var timer = Stopwatch.StartNew();
try
{
CompensateContext<TLog> compensateContext = new HostCompensateContext<TLog>(context);
Expand Down Expand Up @@ -55,6 +57,8 @@ public async Task Send(ConsumeContext<RoutingSlip> context, IPipe<ConsumeContext

activity?.AddExceptionEvent(exception);

instrument?.AddException(exception);

throw new ConsumerCanceledException($"The operation was canceled by the activity: {TypeCache<TActivity>.ShortName}");
}
catch (Exception exception)
Expand All @@ -63,11 +67,14 @@ public async Task Send(ConsumeContext<RoutingSlip> context, IPipe<ConsumeContext

activity?.AddExceptionEvent(exception);

instrument?.AddException(exception);

throw;
}
finally
{
activity?.Stop();
instrument?.Stop();
}
}

Expand Down
9 changes: 8 additions & 1 deletion src/MassTransit/Courier/ExecuteActivityHost.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ public ExecuteActivityHost(IPipe<ExecuteContext<TArguments>> executePipe, Uri co

public async Task Send(ConsumeContext<RoutingSlip> context, IPipe<ConsumeContext<RoutingSlip>> next)
{
var timer = Stopwatch.StartNew();

StartedActivity? activity = LogContext.Current?.StartExecuteActivity<TActivity, TArguments>(context);
StartedInstrument? instrument = LogContext.Current?.StartActivityExecuteInstrument<TActivity, TArguments>(context, timer);

var timer = Stopwatch.StartNew();
try
{
ExecuteContext<TArguments> executeContext = new HostExecuteContext<TArguments>(_compensateAddress, context);
Expand Down Expand Up @@ -60,6 +62,8 @@ public async Task Send(ConsumeContext<RoutingSlip> context, IPipe<ConsumeContext

activity?.AddExceptionEvent(exception);

instrument?.AddException(exception);

throw new ConsumerCanceledException($"The operation was canceled by the activity: {TypeCache<TActivity>.ShortName}");
}
catch (Exception exception)
Expand All @@ -68,11 +72,14 @@ public async Task Send(ConsumeContext<RoutingSlip> context, IPipe<ConsumeContext

activity?.AddExceptionEvent(exception);

instrument?.AddException(exception);

throw;
}
finally
{
activity?.Stop();
instrument?.Stop();
}
}

Expand Down
18 changes: 6 additions & 12 deletions src/MassTransit/LogContext.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
namespace MassTransit
{
using System;
using System.Diagnostics;
using System.Threading;
using Logging;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Monitoring;


public static class LogContext
Expand Down Expand Up @@ -37,7 +37,7 @@ public static ILogContext Current

public static void ConfigureCurrentLogContext(ILoggerFactory loggerFactory = null)
{
Current = new BusLogContext(loggerFactory ?? NullLoggerFactory.Instance, Cached.Source.Value);
Current = new BusLogContext(loggerFactory ?? NullLoggerFactory.Instance);
}

/// <summary>
Expand All @@ -47,7 +47,7 @@ public static void ConfigureCurrentLogContext(ILoggerFactory loggerFactory = nul
/// <param name="logger">An existing logger</param>
public static void ConfigureCurrentLogContext(ILogger logger)
{
Current = new BusLogContext(new SingleLoggerFactory(logger), Cached.Source.Value);
Current = new BusLogContext(new SingleLoggerFactory(logger));
}

public static ILogContext CreateLogContext(string categoryName)
Expand All @@ -64,6 +64,8 @@ public static ILogContext CreateLogContext(string categoryName)
/// <param name="provider"></param>
public static void ConfigureCurrentLogContextIfNull(IServiceProvider provider)
{
LogContextInstrumentationExtensions.TryConfigure(provider);

if (Current == null || Current.Logger is NullLogger)
{
var loggerFactory = provider.GetService<ILoggerFactory>();
Expand Down Expand Up @@ -207,17 +209,9 @@ void Log(T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, Exception exception)

static ILogContext CreateDefaultLogContext()
{
var source = Cached.Source.Value;

var loggerFactory = NullLoggerFactory.Instance;

return new BusLogContext(loggerFactory, source);
}


static class Cached
{
internal static readonly Lazy<ActivitySource> Source = new Lazy<ActivitySource>(() => new ActivitySource(DiagnosticHeaders.DefaultListenerName));
return new BusLogContext(loggerFactory);
}
}
}
Loading

0 comments on commit 6f0a00a

Please sign in to comment.