diff --git a/src/Directory.Packages.props b/src/Directory.Packages.props index 37eaf6fc3b..044ee33716 100644 --- a/src/Directory.Packages.props +++ b/src/Directory.Packages.props @@ -50,6 +50,8 @@ + + diff --git a/src/ServiceControl.Audit.AcceptanceTests/TestSupport/ServiceControlComponentRunner.cs b/src/ServiceControl.Audit.AcceptanceTests/TestSupport/ServiceControlComponentRunner.cs index 35990ee062..0866b504db 100644 --- a/src/ServiceControl.Audit.AcceptanceTests/TestSupport/ServiceControlComponentRunner.cs +++ b/src/ServiceControl.Audit.AcceptanceTests/TestSupport/ServiceControlComponentRunner.cs @@ -55,7 +55,6 @@ async Task InitializeServiceControl(ScenarioContext context) { var id = messageContext.NativeMessageId; var headers = messageContext.Headers; - var log = NServiceBus.Logging.LogManager.GetLogger(); headers.TryGetValue(Headers.MessageId, out var originalMessageId); log.Debug($"OnMessage for message '{id}'({originalMessageId ?? string.Empty})."); diff --git a/src/ServiceControl.Audit.UnitTests/ApprovalFiles/APIApprovals.PlatformSampleSettings.approved.txt b/src/ServiceControl.Audit.UnitTests/ApprovalFiles/APIApprovals.PlatformSampleSettings.approved.txt index c1dbbb4ec1..9862349774 100644 --- a/src/ServiceControl.Audit.UnitTests/ApprovalFiles/APIApprovals.PlatformSampleSettings.approved.txt +++ b/src/ServiceControl.Audit.UnitTests/ApprovalFiles/APIApprovals.PlatformSampleSettings.approved.txt @@ -12,6 +12,7 @@ "ApiUrl": "http://localhost:8888/api", "Port": 8888, "PrintMetrics": false, + "OtelMetricsUrl": null, "Hostname": "localhost", "VirtualDirectory": "", "TransportType": "LearningTransport", diff --git a/src/ServiceControl.Audit/App.config b/src/ServiceControl.Audit/App.config index 83610fa6ee..7cf230c9f0 100644 --- a/src/ServiceControl.Audit/App.config +++ b/src/ServiceControl.Audit/App.config @@ -8,7 +8,7 @@ These settings are only here so that we can debug ServiceControl while developin - + diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs index 56d4244e84..dbbe5a2ad4 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs @@ -3,6 +3,7 @@ using System; using System.Collections.Generic; using System.Diagnostics; + using System.Diagnostics.Metrics; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; @@ -14,18 +15,14 @@ using Persistence; using Persistence.UnitOfWork; using ServiceControl.Infrastructure; - using ServiceControl.Infrastructure.Metrics; using Transports; class AuditIngestion : IHostedService { - static readonly long FrequencyInMilliseconds = Stopwatch.Frequency / 1000; - public AuditIngestion( Settings settings, ITransportCustomization transportCustomization, TransportSettings transportSettings, - Metrics metrics, IFailedAuditStorage failedImportsStorage, AuditIngestionCustomCheck.State ingestionState, AuditIngestor auditIngestor, @@ -40,10 +37,6 @@ public AuditIngestion( this.settings = settings; this.applicationLifetime = applicationLifetime; - batchSizeMeter = metrics.GetMeter("Audit ingestion - batch size"); - batchDurationMeter = metrics.GetMeter("Audit ingestion - batch processing duration", FrequencyInMilliseconds); - receivedMeter = metrics.GetCounter("Audit ingestion - received"); - if (!transportSettings.MaxConcurrency.HasValue) { throw new ArgumentException("MaxConcurrency is not set in TransportSettings"); @@ -102,6 +95,7 @@ async Task EnsureStarted(CancellationToken cancellationToken = default) await stoppable.StopReceive(cancellationToken); logger.Info("Shutting down due to failed persistence health check. Infrastructure shut down completed"); } + return; } @@ -168,6 +162,7 @@ async Task EnsureStopped(CancellationToken cancellationToken = default) logger.Info("Shutting down. Already stopped, skipping shut down"); return; //Already stopped } + var stoppable = queueIngestor; queueIngestor = null; logger.Info("Shutting down. Infrastructure shut down commencing"); @@ -196,7 +191,7 @@ async Task OnMessage(MessageContext messageContext, CancellationToken cancellati var taskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); messageContext.SetTaskCompletionSource(taskCompletionSource); - receivedMeter.Mark(); + receivedAudits.Add(1); await channel.Writer.WriteAsync(messageContext, cancellationToken); await taskCompletionSource.Task; @@ -215,13 +210,14 @@ async Task Loop() while (channel.Reader.TryRead(out var context)) { contexts.Add(context); + auditMessageSize.Record(context.Body.Length / 1024.0); } - batchSizeMeter.Mark(contexts.Count); - using (batchDurationMeter.Measure()) - { - await auditIngestor.Ingest(contexts); - } + auditBatchSize.Record(contexts.Count); + var sw = Stopwatch.StartNew(); + + await auditIngestor.Ingest(contexts); + auditBatchDuration.Record(sw.ElapsedMilliseconds); } catch (OperationCanceledException) { @@ -252,7 +248,7 @@ async Task Loop() TransportInfrastructure transportInfrastructure; IMessageReceiver queueIngestor; - readonly SemaphoreSlim startStopSemaphore = new SemaphoreSlim(1); + readonly SemaphoreSlim startStopSemaphore = new(1); readonly string inputEndpoint; readonly ITransportCustomization transportCustomization; readonly TransportSettings transportSettings; @@ -261,9 +257,10 @@ async Task Loop() readonly IAuditIngestionUnitOfWorkFactory unitOfWorkFactory; readonly Settings settings; readonly Channel channel; - readonly Meter batchSizeMeter; - readonly Meter batchDurationMeter; - readonly Counter receivedMeter; + readonly Histogram auditBatchSize = Telemetry.Meter.CreateHistogram("messages_batch_size"); + readonly Histogram auditBatchDuration = Telemetry.Meter.CreateHistogram("messages_batch_duration", unit: "ms"); + readonly Histogram auditMessageSize = Telemetry.Meter.CreateHistogram("messages_size", unit: "kilobytes"); + readonly Counter receivedAudits = Telemetry.Meter.CreateCounter("messages_received"); readonly Watchdog watchdog; readonly Task ingestionWorker; readonly IHostApplicationLifetime applicationLifetime; diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestor.cs b/src/ServiceControl.Audit/Auditing/AuditIngestor.cs index 84d110a1f2..fb6f2a7240 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestor.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestor.cs @@ -2,7 +2,6 @@ { using System; using System.Collections.Generic; - using System.Diagnostics; using System.Linq; using System.Threading.Tasks; using Infrastructure.Settings; @@ -14,13 +13,11 @@ using Persistence.UnitOfWork; using Recoverability; using SagaAudit; - using ServiceControl.Infrastructure.Metrics; using ServiceControl.Transports; public class AuditIngestor { public AuditIngestor( - Metrics metrics, Settings settings, IAuditIngestionUnitOfWorkFactory unitOfWorkFactory, EndpointInstanceMonitoring endpointInstanceMonitoring, @@ -32,26 +29,16 @@ ITransportCustomization transportCustomization { this.settings = settings; this.messageDispatcher = messageDispatcher; - - var ingestedAuditMeter = metrics.GetCounter("Audit ingestion - ingested audit"); - var ingestedSagaAuditMeter = metrics.GetCounter("Audit ingestion - ingested saga audit"); - var auditBulkInsertDurationMeter = metrics.GetMeter("Audit ingestion - audit bulk insert duration", FrequencyInMilliseconds); - var sagaAuditBulkInsertDurationMeter = metrics.GetMeter("Audit ingestion - saga audit bulk insert duration", FrequencyInMilliseconds); - var bulkInsertCommitDurationMeter = metrics.GetMeter("Audit ingestion - bulk insert commit duration", FrequencyInMilliseconds); - - var enrichers = new IEnrichImportedAuditMessages[] - { - new MessageTypeEnricher(), - new EnrichWithTrackingIds(), - new ProcessingStatisticsEnricher(), - new DetectNewEndpointsFromAuditImportsEnricher(endpointInstanceMonitoring), - new DetectSuccessfulRetriesEnricher(), - new SagaRelationshipsEnricher() - }.Concat(auditEnrichers).ToArray(); + var enrichers = new IEnrichImportedAuditMessages[] { new MessageTypeEnricher(), new EnrichWithTrackingIds(), new ProcessingStatisticsEnricher(), new DetectNewEndpointsFromAuditImportsEnricher(endpointInstanceMonitoring), new DetectSuccessfulRetriesEnricher(), new SagaRelationshipsEnricher() }.Concat(auditEnrichers).ToArray(); logQueueAddress = transportCustomization.ToTransportQualifiedQueueName(settings.AuditLogQueue); - auditPersister = new AuditPersister(unitOfWorkFactory, enrichers, ingestedAuditMeter, ingestedSagaAuditMeter, auditBulkInsertDurationMeter, sagaAuditBulkInsertDurationMeter, bulkInsertCommitDurationMeter, messageSession, messageDispatcher); + auditPersister = new AuditPersister( + unitOfWorkFactory, + enrichers, + messageSession, + messageDispatcher + ); } public async Task Ingest(List contexts) @@ -71,6 +58,7 @@ public async Task Ingest(List contexts) { Log.Debug($"Forwarding {stored.Count} messages"); } + await Forward(stored, logQueueAddress); if (Log.IsDebugEnabled) { @@ -159,7 +147,6 @@ public async Task VerifyCanReachForwardingAddress() readonly Lazy messageDispatcher; readonly string logQueueAddress; - static readonly long FrequencyInMilliseconds = Stopwatch.Frequency / 1000; static readonly ILog Log = LogManager.GetLogger(); } } \ No newline at end of file diff --git a/src/ServiceControl.Audit/Auditing/AuditPersister.cs b/src/ServiceControl.Audit/Auditing/AuditPersister.cs index 0a5d0d9938..4548a1cd3b 100644 --- a/src/ServiceControl.Audit/Auditing/AuditPersister.cs +++ b/src/ServiceControl.Audit/Auditing/AuditPersister.cs @@ -3,6 +3,7 @@ using System; using System.Collections.Generic; using System.Diagnostics; + using System.Diagnostics.Metrics; using System.Text.Json; using System.Threading.Tasks; using Infrastructure; @@ -15,29 +16,13 @@ using ServiceControl.Audit.Persistence.Monitoring; using ServiceControl.EndpointPlugin.Messages.SagaState; using ServiceControl.Infrastructure; - using ServiceControl.Infrastructure.Metrics; using ServiceControl.SagaAudit; - class AuditPersister + class AuditPersister(IAuditIngestionUnitOfWorkFactory unitOfWorkFactory, + IEnrichImportedAuditMessages[] enrichers, + IMessageSession messageSession, + Lazy messageDispatcher) { - public AuditPersister(IAuditIngestionUnitOfWorkFactory unitOfWorkFactory, - IEnrichImportedAuditMessages[] enrichers, - Counter ingestedAuditMeter, Counter ingestedSagaAuditMeter, Meter auditBulkInsertDurationMeter, - Meter sagaAuditBulkInsertDurationMeter, Meter bulkInsertCommitDurationMeter, IMessageSession messageSession, - Lazy messageDispatcher) - { - this.unitOfWorkFactory = unitOfWorkFactory; - this.enrichers = enrichers; - - this.ingestedAuditMeter = ingestedAuditMeter; - this.ingestedSagaAuditMeter = ingestedSagaAuditMeter; - this.auditBulkInsertDurationMeter = auditBulkInsertDurationMeter; - this.sagaAuditBulkInsertDurationMeter = sagaAuditBulkInsertDurationMeter; - this.bulkInsertCommitDurationMeter = bulkInsertCommitDurationMeter; - this.messageSession = messageSession; - this.messageDispatcher = messageDispatcher; - } - public async Task> Persist(IReadOnlyList contexts) { var stopwatch = Stopwatch.StartNew(); @@ -51,7 +36,6 @@ public async Task> Persist(IReadOnlyList(contexts.Count); @@ -89,12 +73,11 @@ public async Task> Persist(IReadOnlyList> Persist(IReadOnlyList> Persist(IReadOnlyList messageDispatcher; + readonly Counter storedAudits = Telemetry.Meter.CreateCounter("messages_stored"); + readonly Counter storedSagas = Telemetry.Meter.CreateCounter("sagas_stored"); + readonly Histogram auditBulkInsertDuration = Telemetry.Meter.CreateHistogram("messages_bulk_insert_duration", unit: "ms"); + readonly Histogram sagaAuditBulkInsertDuration = Telemetry.Meter.CreateHistogram("sagas_bulk_insert_duration", unit: "ms"); + readonly Histogram auditCommitDuration = Telemetry.Meter.CreateHistogram("messages_commit_duration", unit: "ms"); - readonly IEnrichImportedAuditMessages[] enrichers; - readonly IAuditIngestionUnitOfWorkFactory unitOfWorkFactory; static readonly ILog Logger = LogManager.GetLogger(); } } \ No newline at end of file diff --git a/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs b/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs index 8968565a50..a9835669ba 100644 --- a/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs +++ b/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs @@ -6,7 +6,6 @@ namespace ServiceControl.Audit; using System.Threading.Tasks; using Auditing; using Infrastructure; -using Infrastructure.Metrics; using Infrastructure.Settings; using Microsoft.AspNetCore.HttpLogging; using Microsoft.Extensions.DependencyInjection; @@ -20,6 +19,8 @@ namespace ServiceControl.Audit; using NServiceBus.Transport; using Persistence; using Transports; +using OpenTelemetry.Metrics; +using OpenTelemetry.Resources; static class HostApplicationBuilderExtensions { @@ -28,10 +29,11 @@ public static void AddServiceControlAudit(this IHostApplicationBuilder builder, Settings settings, EndpointConfiguration configuration) { + var version = FileVersionInfo.GetVersionInfo(typeof(HostApplicationBuilderExtensions).Assembly.Location).ProductVersion; var persistenceConfiguration = PersistenceConfigurationFactory.LoadPersistenceConfiguration(settings); var persistenceSettings = persistenceConfiguration.BuildPersistenceSettings(settings); - RecordStartup(settings, configuration, persistenceConfiguration); + RecordStartup(version, settings, configuration, persistenceConfiguration); builder.Logging.ClearProviders(); builder.Logging.AddNLog(); @@ -61,13 +63,36 @@ public static void AddServiceControlAudit(this IHostApplicationBuilder builder, // directly and to make things more complex of course the order of registration still matters ;) services.AddSingleton(provider => new Lazy(provider.GetRequiredService)); - services.AddMetrics(settings.PrintMetrics); - services.AddPersistence(persistenceSettings, persistenceConfiguration); NServiceBusFactory.Configure(settings, transportCustomization, transportSettings, onCriticalError, configuration); builder.UseNServiceBus(configuration); + if (!string.IsNullOrEmpty(settings.OtelMetricsUrl)) + { + if (!Uri.TryCreate(settings.OtelMetricsUrl, UriKind.Absolute, out var otelMetricsUri)) + { + throw new UriFormatException($"Invalid OtelMetricsUrl: {settings.OtelMetricsUrl}"); + } + + builder.Services.AddOpenTelemetry() + .ConfigureResource(b => b.AddService( + serviceName: Telemetry.ServiceName, + serviceVersion: version, + serviceInstanceId: settings.InstanceName)) + .WithMetrics(b => + { + b.AddMeter(Telemetry.MeterName); + b.AddOtlpExporter(e => + { + e.Endpoint = otelMetricsUri; + }); + }); + + var logger = LogManager.GetLogger(typeof(HostApplicationBuilderExtensions)); + logger.InfoFormat("OpenTelemetry metrics exporter enabled: {0}", settings.OtelMetricsUrl); + } + // Configure after the NServiceBus hosted service to ensure NServiceBus is already started if (settings.IngestAuditMessages) { @@ -84,10 +109,8 @@ public static void AddServiceControlAuditInstallers(this IHostApplicationBuilder builder.Services.AddInstaller(persistenceSettings, persistenceConfiguration); } - static void RecordStartup(Settings settings, EndpointConfiguration endpointConfiguration, IPersistenceConfiguration persistenceConfiguration) + static void RecordStartup(string version, Settings settings, EndpointConfiguration endpointConfiguration, IPersistenceConfiguration persistenceConfiguration) { - var version = FileVersionInfo.GetVersionInfo(typeof(HostApplicationBuilderExtensions).Assembly.Location).ProductVersion; - var startupMessage = $@" ------------------------------------------------------------- ServiceControl Audit Version: {version} @@ -101,9 +124,6 @@ static void RecordStartup(Settings settings, EndpointConfiguration endpointConfi var logger = LogManager.GetLogger(typeof(HostApplicationBuilderExtensions)); logger.Info(startupMessage); - endpointConfiguration.GetSettings().AddStartupDiagnosticsSection("Startup", new - { - Settings = settings - }); + endpointConfiguration.GetSettings().AddStartupDiagnosticsSection("Startup", new { Settings = settings }); } } \ No newline at end of file diff --git a/src/ServiceControl.Audit/Infrastructure/Metrics/MetricsReporterHostedService.cs b/src/ServiceControl.Audit/Infrastructure/Metrics/MetricsReporterHostedService.cs deleted file mode 100644 index c378a8f4d6..0000000000 --- a/src/ServiceControl.Audit/Infrastructure/Metrics/MetricsReporterHostedService.cs +++ /dev/null @@ -1,30 +0,0 @@ -namespace ServiceControl.Audit.Infrastructure.Metrics -{ - using System; - using System.Threading; - using System.Threading.Tasks; - using Microsoft.Extensions.Hosting; - using NServiceBus.Logging; - using ServiceControl.Infrastructure.Metrics; - - class MetricsReporterHostedService : IHostedService - { - readonly Metrics metrics; - MetricsReporter reporter; - - public MetricsReporterHostedService(Metrics metrics) => this.metrics = metrics; - - public Task StartAsync(CancellationToken cancellationToken) - { - var metricsLog = LogManager.GetLogger("Metrics"); - - reporter = new MetricsReporter(metrics, x => metricsLog.Info(x), TimeSpan.FromSeconds(5)); - - reporter.Start(); - - return Task.CompletedTask; - } - - public Task StopAsync(CancellationToken cancellationToken) => reporter.Stop(); - } -} \ No newline at end of file diff --git a/src/ServiceControl.Audit/Infrastructure/Metrics/MetricsServiceCollectionExtensions.cs b/src/ServiceControl.Audit/Infrastructure/Metrics/MetricsServiceCollectionExtensions.cs deleted file mode 100644 index a7bc6b1dd5..0000000000 --- a/src/ServiceControl.Audit/Infrastructure/Metrics/MetricsServiceCollectionExtensions.cs +++ /dev/null @@ -1,14 +0,0 @@ -namespace ServiceControl.Audit.Infrastructure.Metrics -{ - using Microsoft.Extensions.DependencyInjection; - using ServiceControl.Infrastructure.Metrics; - - static class MetricsServiceCollectionExtensions - { - public static void AddMetrics(this IServiceCollection services, bool printMetrics) - { - services.AddSingleton(new Metrics { Enabled = printMetrics }); - services.AddHostedService(); - } - } -} \ No newline at end of file diff --git a/src/ServiceControl.Audit/Infrastructure/Settings/Settings.cs b/src/ServiceControl.Audit/Infrastructure/Settings/Settings.cs index 183b695555..d6a84f0662 100644 --- a/src/ServiceControl.Audit/Infrastructure/Settings/Settings.cs +++ b/src/ServiceControl.Audit/Infrastructure/Settings/Settings.cs @@ -109,6 +109,7 @@ public string RootUrl public int Port { get; set; } public bool PrintMetrics => SettingsReader.Read(SettingsRootNamespace, "PrintMetrics"); + public string OtelMetricsUrl { get; set; } = SettingsReader.Read(SettingsRootNamespace, nameof(OtelMetricsUrl)); public string Hostname { get; private set; } public string VirtualDirectory => SettingsReader.Read(SettingsRootNamespace, "VirtualDirectory", string.Empty); diff --git a/src/ServiceControl.Audit/Infrastructure/Telemetry.cs b/src/ServiceControl.Audit/Infrastructure/Telemetry.cs new file mode 100644 index 0000000000..b1b85a0b06 --- /dev/null +++ b/src/ServiceControl.Audit/Infrastructure/Telemetry.cs @@ -0,0 +1,10 @@ +namespace ServiceControl.Audit; + +using System.Diagnostics.Metrics; + +static class Telemetry +{ + public const string MeterName = "Particular.ServiceControl.Audit"; + public static string ServiceName = MeterName; + public static readonly Meter Meter = new(MeterName, "0.1.0"); +} \ No newline at end of file diff --git a/src/ServiceControl.Audit/ServiceControl.Audit.csproj b/src/ServiceControl.Audit/ServiceControl.Audit.csproj index 3303782e3f..8f41ba97b1 100644 --- a/src/ServiceControl.Audit/ServiceControl.Audit.csproj +++ b/src/ServiceControl.Audit/ServiceControl.Audit.csproj @@ -18,7 +18,6 @@ - @@ -29,6 +28,8 @@ + +