diff --git a/src/Directory.Packages.props b/src/Directory.Packages.props index 37eaf6fc3b..044ee33716 100644 --- a/src/Directory.Packages.props +++ b/src/Directory.Packages.props @@ -50,6 +50,8 @@ + + diff --git a/src/ServiceControl.Audit.AcceptanceTests/TestSupport/ServiceControlComponentRunner.cs b/src/ServiceControl.Audit.AcceptanceTests/TestSupport/ServiceControlComponentRunner.cs index 35990ee062..0866b504db 100644 --- a/src/ServiceControl.Audit.AcceptanceTests/TestSupport/ServiceControlComponentRunner.cs +++ b/src/ServiceControl.Audit.AcceptanceTests/TestSupport/ServiceControlComponentRunner.cs @@ -55,7 +55,6 @@ async Task InitializeServiceControl(ScenarioContext context) { var id = messageContext.NativeMessageId; var headers = messageContext.Headers; - var log = NServiceBus.Logging.LogManager.GetLogger(); headers.TryGetValue(Headers.MessageId, out var originalMessageId); log.Debug($"OnMessage for message '{id}'({originalMessageId ?? string.Empty})."); diff --git a/src/ServiceControl.Audit.Persistence.InMemory/InMemoryAuditIngestionUnitOfWorkFactory.cs b/src/ServiceControl.Audit.Persistence.InMemory/InMemoryAuditIngestionUnitOfWorkFactory.cs index ee654999a1..f07e7036de 100644 --- a/src/ServiceControl.Audit.Persistence.InMemory/InMemoryAuditIngestionUnitOfWorkFactory.cs +++ b/src/ServiceControl.Audit.Persistence.InMemory/InMemoryAuditIngestionUnitOfWorkFactory.cs @@ -1,5 +1,6 @@ namespace ServiceControl.Audit.Persistence.InMemory { + using System.Threading; using System.Threading.Tasks; using ServiceControl.Audit.Auditing.BodyStorage; using ServiceControl.Audit.Persistence.UnitOfWork; @@ -12,7 +13,7 @@ public InMemoryAuditIngestionUnitOfWorkFactory(InMemoryAuditDataStore dataStore, bodyStorageEnricher = new BodyStorageEnricher(bodyStorage, settings); } - public ValueTask StartNew(int batchSize) + public ValueTask StartNew(int batchSize, CancellationToken cancellationToken) { //The batchSize argument is ignored: the in-memory storage implementation doesn't support batching. return new ValueTask(new InMemoryAuditIngestionUnitOfWork(dataStore, bodyStorageEnricher)); diff --git a/src/ServiceControl.Audit.Persistence.RavenDB/UnitOfWork/RavenAuditUnitOfWorkFactory.cs b/src/ServiceControl.Audit.Persistence.RavenDB/UnitOfWork/RavenAuditUnitOfWorkFactory.cs index 3f9c56a6c8..8a9747fbc0 100644 --- a/src/ServiceControl.Audit.Persistence.RavenDB/UnitOfWork/RavenAuditUnitOfWorkFactory.cs +++ b/src/ServiceControl.Audit.Persistence.RavenDB/UnitOfWork/RavenAuditUnitOfWorkFactory.cs @@ -13,9 +13,10 @@ class RavenAuditIngestionUnitOfWorkFactory( MinimumRequiredStorageState customCheckState) : IAuditIngestionUnitOfWorkFactory { - public async ValueTask StartNew(int batchSize) + public async ValueTask StartNew(int batchSize, CancellationToken cancellationToken) { - var timedCancellationSource = new CancellationTokenSource(databaseConfiguration.BulkInsertCommitTimeout); + var timedCancellationSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + timedCancellationSource.CancelAfter(databaseConfiguration.BulkInsertCommitTimeout); var bulkInsert = (await documentStoreProvider.GetDocumentStore(timedCancellationSource.Token)) .BulkInsert(new BulkInsertOptions { SkipOverwriteIfUnchanged = true, }, timedCancellationSource.Token); diff --git a/src/ServiceControl.Audit.Persistence/UnitOfWork/IAuditIngestionUnitOfWorkFactory.cs b/src/ServiceControl.Audit.Persistence/UnitOfWork/IAuditIngestionUnitOfWorkFactory.cs index 25a5859e50..1d3470bdea 100644 --- a/src/ServiceControl.Audit.Persistence/UnitOfWork/IAuditIngestionUnitOfWorkFactory.cs +++ b/src/ServiceControl.Audit.Persistence/UnitOfWork/IAuditIngestionUnitOfWorkFactory.cs @@ -1,10 +1,11 @@ namespace ServiceControl.Audit.Persistence.UnitOfWork { + using System.Threading; using System.Threading.Tasks; public interface IAuditIngestionUnitOfWorkFactory { - ValueTask StartNew(int batchSize); //Throws if not enough space or some other problem preventing from writing data + ValueTask StartNew(int batchSize, CancellationToken cancellationToken = default); //Throws if not enough space or some other problem preventing from writing data bool CanIngestMore(); } } \ No newline at end of file diff --git a/src/ServiceControl.Audit.UnitTests/ApprovalFiles/APIApprovals.PlatformSampleSettings.approved.txt b/src/ServiceControl.Audit.UnitTests/ApprovalFiles/APIApprovals.PlatformSampleSettings.approved.txt index c1dbbb4ec1..9862349774 100644 --- a/src/ServiceControl.Audit.UnitTests/ApprovalFiles/APIApprovals.PlatformSampleSettings.approved.txt +++ b/src/ServiceControl.Audit.UnitTests/ApprovalFiles/APIApprovals.PlatformSampleSettings.approved.txt @@ -12,6 +12,7 @@ "ApiUrl": "http://localhost:8888/api", "Port": 8888, "PrintMetrics": false, + "OtelMetricsUrl": null, "Hostname": "localhost", "VirtualDirectory": "", "TransportType": "LearningTransport", diff --git a/src/ServiceControl.Audit/App.config b/src/ServiceControl.Audit/App.config index 83610fa6ee..7cf230c9f0 100644 --- a/src/ServiceControl.Audit/App.config +++ b/src/ServiceControl.Audit/App.config @@ -8,7 +8,7 @@ These settings are only here so that we can debug ServiceControl while developin - + diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs index 56d4244e84..602c2ff34e 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs @@ -3,6 +3,7 @@ using System; using System.Collections.Generic; using System.Diagnostics; + using System.Diagnostics.Metrics; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; @@ -14,18 +15,14 @@ using Persistence; using Persistence.UnitOfWork; using ServiceControl.Infrastructure; - using ServiceControl.Infrastructure.Metrics; using Transports; class AuditIngestion : IHostedService { - static readonly long FrequencyInMilliseconds = Stopwatch.Frequency / 1000; - public AuditIngestion( Settings settings, ITransportCustomization transportCustomization, TransportSettings transportSettings, - Metrics metrics, IFailedAuditStorage failedImportsStorage, AuditIngestionCustomCheck.State ingestionState, AuditIngestor auditIngestor, @@ -40,10 +37,6 @@ public AuditIngestion( this.settings = settings; this.applicationLifetime = applicationLifetime; - batchSizeMeter = metrics.GetMeter("Audit ingestion - batch size"); - batchDurationMeter = metrics.GetMeter("Audit ingestion - batch processing duration", FrequencyInMilliseconds); - receivedMeter = metrics.GetCounter("Audit ingestion - received"); - if (!transportSettings.MaxConcurrency.HasValue) { throw new ArgumentException("MaxConcurrency is not set in TransportSettings"); @@ -59,15 +52,27 @@ public AuditIngestion( errorHandlingPolicy = new AuditIngestionFaultPolicy(failedImportsStorage, settings.LoggingSettings, OnCriticalError); - watchdog = new Watchdog("audit message ingestion", EnsureStarted, EnsureStopped, ingestionState.ReportError, ingestionState.Clear, settings.TimeToRestartAuditIngestionAfterFailure, logger); - - ingestionWorker = Task.Run(() => Loop(), CancellationToken.None); + watchdog = new Watchdog( + "audit message ingestion", + EnsureStarted, + EnsureStopped, + ingestionState.ReportError, + ingestionState.Clear, + settings.TimeToRestartAuditIngestionAfterFailure, + logger + ); } - public Task StartAsync(CancellationToken _) => watchdog.Start(() => applicationLifetime.StopApplication()); + public async Task StartAsync(CancellationToken cancellationToken) + { + stopSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + ingestionWorker = Loop(stopSource.Token); + await watchdog.Start(() => applicationLifetime.StopApplication()); + } public async Task StopAsync(CancellationToken cancellationToken) { + await stopSource.CancelAsync(); await watchdog.Stop(); channel.Writer.Complete(); await ingestionWorker; @@ -102,6 +107,7 @@ async Task EnsureStarted(CancellationToken cancellationToken = default) await stoppable.StopReceive(cancellationToken); logger.Info("Shutting down due to failed persistence health check. Infrastructure shut down completed"); } + return; } @@ -123,7 +129,7 @@ async Task EnsureStarted(CancellationToken cancellationToken = default) queueIngestor = transportInfrastructure.Receivers[inputEndpoint]; - await auditIngestor.VerifyCanReachForwardingAddress(); + await auditIngestor.VerifyCanReachForwardingAddress(cancellationToken); await queueIngestor.StartReceive(cancellationToken); @@ -168,6 +174,7 @@ async Task EnsureStopped(CancellationToken cancellationToken = default) logger.Info("Shutting down. Already stopped, skipping shut down"); return; //Already stopped } + var stoppable = queueIngestor; queueIngestor = null; logger.Info("Shutting down. Infrastructure shut down commencing"); @@ -196,63 +203,108 @@ async Task OnMessage(MessageContext messageContext, CancellationToken cancellati var taskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); messageContext.SetTaskCompletionSource(taskCompletionSource); - receivedMeter.Mark(); + receivedAudits.Add(1); await channel.Writer.WriteAsync(messageContext, cancellationToken); await taskCompletionSource.Task; } - async Task Loop() + async Task Loop(CancellationToken cancellationToken) { - var contexts = new List(transportSettings.MaxConcurrency.Value); - - while (await channel.Reader.WaitToReadAsync()) + try { - // will only enter here if there is something to read. - try + var contexts = new List(transportSettings.MaxConcurrency.Value); + + long sequentialFailureCount = 0; + DateTime lastSuccess = DateTime.MinValue; + + while (await channel.Reader.WaitToReadAsync(cancellationToken)) { - // as long as there is something to read this will fetch up to MaximumConcurrency items - while (channel.Reader.TryRead(out var context)) + var sw = Stopwatch.StartNew(); + // TODO: Add timeout handling, if processing takes over for example 1 minute + // will only enter here if there is something to read. + try { - contexts.Add(context); - } + // as long as there is something to read this will fetch up to MaximumConcurrency items + while (channel.Reader.TryRead(out var context)) + { + contexts.Add(context); + auditMessageSize.Record(context.Body.Length / 1024D); + } + + auditBatchSize.Record(contexts.Count); - batchSizeMeter.Mark(contexts.Count); - using (batchDurationMeter.Measure()) + await auditIngestor.Ingest(contexts, cancellationToken); + auditBatchDuration.Record(sw.ElapsedMilliseconds); + + // No locking for consistency needed, just write, don't care about multi-threading + sequentialFailureCount = 0; + lastSuccess = DateTime.UtcNow; + } + catch (OperationCanceledException e) when (e.CancellationToken == cancellationToken) { - await auditIngestor.Ingest(contexts); + logger.Debug("Cancelled by host"); + return; // No point in continueing as WaitToReadAsync will throw OCE } - } - catch (OperationCanceledException) - { - //Do nothing as we are shutting down - continue; - } - catch (Exception e) // show must go on - { - if (logger.IsInfoEnabled) + catch (Exception e) // show must go on { - logger.Info("Ingesting messages failed", e); + Interlocked.Increment(ref sequentialFailureCount); + logger.Warn($"Batch processing failed [#{sequentialFailureCount} @{lastSuccess:O}] ", e); + + // Signal circuitbreaker, throttle whatever + + // signal all message handling tasks to terminate + foreach (var context in contexts) + { + if (!context.GetTaskCompletionSource().TrySetException(e)) + { + logger.Error("Loop TrySetException failed"); + } + } } - - // signal all message handling tasks to terminate - foreach (var context in contexts) + finally { - context.GetTaskCompletionSource().TrySetException(e); + const int infoThreshold = 5000; + const int warnThreshold = 15000; + const int errorThreshold = 60000; + var elapsed = sw.ElapsedMilliseconds; + + if (elapsed > errorThreshold) + { + logger.ErrorFormat("Processing duration {0} exceeded {1}", elapsed, errorThreshold); + } + else if (elapsed > warnThreshold) + { + logger.WarnFormat("Processing duration {0} exceeded {1}", elapsed, warnThreshold); + } + else if (elapsed > infoThreshold) + { + logger.InfoFormat("Processing duration {0} exceeded {1}", elapsed, infoThreshold); + } + + contexts.Clear(); } } - finally - { - contexts.Clear(); - } + // will fall out here when writer is completed + } + catch (OperationCanceledException e) when (e.CancellationToken == cancellationToken) + { + logger.Debug("Cancelled by host"); + } + catch (Exception e) + { + // Might the next exception scope throw an exception, consider this fatal as that cannot be an OCE + logger.Fatal("Loop interrupted", e); + applicationLifetime.StopApplication(); + throw; } - // will fall out here when writer is completed } TransportInfrastructure transportInfrastructure; IMessageReceiver queueIngestor; + Task ingestionWorker; - readonly SemaphoreSlim startStopSemaphore = new SemaphoreSlim(1); + readonly SemaphoreSlim startStopSemaphore = new(1); readonly string inputEndpoint; readonly ITransportCustomization transportCustomization; readonly TransportSettings transportSettings; @@ -261,13 +313,15 @@ async Task Loop() readonly IAuditIngestionUnitOfWorkFactory unitOfWorkFactory; readonly Settings settings; readonly Channel channel; - readonly Meter batchSizeMeter; - readonly Meter batchDurationMeter; - readonly Counter receivedMeter; + readonly Histogram auditBatchSize = AuditMetrics.Meter.CreateHistogram($"{AuditMetrics.Prefix}.batch_size_audits"); + readonly Histogram auditBatchDuration = AuditMetrics.Meter.CreateHistogram($"{AuditMetrics.Prefix}.batch_duration_audits", unit: "ms"); + readonly Histogram auditMessageSize = AuditMetrics.Meter.CreateHistogram($"{AuditMetrics.Prefix}.audit_message_size", unit: "kilobytes"); + readonly Counter receivedAudits = AuditMetrics.Meter.CreateCounter($"{AuditMetrics.Prefix}.received_audits"); readonly Watchdog watchdog; - readonly Task ingestionWorker; readonly IHostApplicationLifetime applicationLifetime; + CancellationTokenSource stopSource; + static readonly ILog logger = LogManager.GetLogger(); } } \ No newline at end of file diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestor.cs b/src/ServiceControl.Audit/Auditing/AuditIngestor.cs index 84d110a1f2..62ee58360f 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestor.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestor.cs @@ -2,8 +2,8 @@ { using System; using System.Collections.Generic; - using System.Diagnostics; using System.Linq; + using System.Threading; using System.Threading.Tasks; using Infrastructure.Settings; using Monitoring; @@ -14,13 +14,11 @@ using Persistence.UnitOfWork; using Recoverability; using SagaAudit; - using ServiceControl.Infrastructure.Metrics; using ServiceControl.Transports; public class AuditIngestor { public AuditIngestor( - Metrics metrics, Settings settings, IAuditIngestionUnitOfWorkFactory unitOfWorkFactory, EndpointInstanceMonitoring endpointInstanceMonitoring, @@ -32,36 +30,26 @@ ITransportCustomization transportCustomization { this.settings = settings; this.messageDispatcher = messageDispatcher; - - var ingestedAuditMeter = metrics.GetCounter("Audit ingestion - ingested audit"); - var ingestedSagaAuditMeter = metrics.GetCounter("Audit ingestion - ingested saga audit"); - var auditBulkInsertDurationMeter = metrics.GetMeter("Audit ingestion - audit bulk insert duration", FrequencyInMilliseconds); - var sagaAuditBulkInsertDurationMeter = metrics.GetMeter("Audit ingestion - saga audit bulk insert duration", FrequencyInMilliseconds); - var bulkInsertCommitDurationMeter = metrics.GetMeter("Audit ingestion - bulk insert commit duration", FrequencyInMilliseconds); - - var enrichers = new IEnrichImportedAuditMessages[] - { - new MessageTypeEnricher(), - new EnrichWithTrackingIds(), - new ProcessingStatisticsEnricher(), - new DetectNewEndpointsFromAuditImportsEnricher(endpointInstanceMonitoring), - new DetectSuccessfulRetriesEnricher(), - new SagaRelationshipsEnricher() - }.Concat(auditEnrichers).ToArray(); + var enrichers = new IEnrichImportedAuditMessages[] { new MessageTypeEnricher(), new EnrichWithTrackingIds(), new ProcessingStatisticsEnricher(), new DetectNewEndpointsFromAuditImportsEnricher(endpointInstanceMonitoring), new DetectSuccessfulRetriesEnricher(), new SagaRelationshipsEnricher() }.Concat(auditEnrichers).ToArray(); logQueueAddress = transportCustomization.ToTransportQualifiedQueueName(settings.AuditLogQueue); - auditPersister = new AuditPersister(unitOfWorkFactory, enrichers, ingestedAuditMeter, ingestedSagaAuditMeter, auditBulkInsertDurationMeter, sagaAuditBulkInsertDurationMeter, bulkInsertCommitDurationMeter, messageSession, messageDispatcher); + auditPersister = new AuditPersister( + unitOfWorkFactory, + enrichers, + messageSession, + messageDispatcher + ); } - public async Task Ingest(List contexts) + public async Task Ingest(List contexts, CancellationToken cancellationToken) { if (Log.IsDebugEnabled) { Log.Debug($"Ingesting {contexts.Count} message contexts"); } - var stored = await auditPersister.Persist(contexts); + var stored = await auditPersister.Persist(contexts, cancellationToken); try { @@ -71,7 +59,8 @@ public async Task Ingest(List contexts) { Log.Debug($"Forwarding {stored.Count} messages"); } - await Forward(stored, logQueueAddress); + + await Forward(stored, logQueueAddress, cancellationToken); if (Log.IsDebugEnabled) { Log.Debug("Forwarded messages"); @@ -80,7 +69,10 @@ public async Task Ingest(List contexts) foreach (var context in contexts) { - context.GetTaskCompletionSource().TrySetResult(true); + if (!context.GetTaskCompletionSource().TrySetResult(true)) + { + Log.Warn("TrySetResult failed"); + } } } catch (Exception e) @@ -95,7 +87,7 @@ public async Task Ingest(List contexts) } } - Task Forward(IReadOnlyCollection messageContexts, string forwardingAddress) + Task Forward(IReadOnlyCollection messageContexts, string forwardingAddress, CancellationToken cancellationToken) { var transportOperations = new TransportOperation[messageContexts.Count]; //We could allocate based on the actual number of ProcessedMessages but this should be OK var index = 0; @@ -112,7 +104,8 @@ Task Forward(IReadOnlyCollection messageContexts, string forward var outgoingMessage = new OutgoingMessage( messageContext.NativeMessageId, messageContext.Headers, - messageContext.Body); + messageContext.Body + ); // Forwarded messages should last as long as possible outgoingMessage.Headers.Remove(Headers.TimeToBeReceived); @@ -124,12 +117,13 @@ Task Forward(IReadOnlyCollection messageContexts, string forward return anyContext != null ? messageDispatcher.Value.Dispatch( new TransportOperations(transportOperations), - anyContext.TransportTransaction + anyContext.TransportTransaction, + cancellationToken ) : Task.CompletedTask; } - public async Task VerifyCanReachForwardingAddress() + public async Task VerifyCanReachForwardingAddress(CancellationToken cancellationToken) { if (!settings.ForwardAuditMessages) { @@ -146,7 +140,7 @@ public async Task VerifyCanReachForwardingAddress() ) ); - await messageDispatcher.Value.Dispatch(transportOperations, new TransportTransaction()); + await messageDispatcher.Value.Dispatch(transportOperations, new TransportTransaction(), cancellationToken); } catch (Exception e) { @@ -159,7 +153,6 @@ public async Task VerifyCanReachForwardingAddress() readonly Lazy messageDispatcher; readonly string logQueueAddress; - static readonly long FrequencyInMilliseconds = Stopwatch.Frequency / 1000; static readonly ILog Log = LogManager.GetLogger(); } } \ No newline at end of file diff --git a/src/ServiceControl.Audit/Auditing/AuditPersister.cs b/src/ServiceControl.Audit/Auditing/AuditPersister.cs index 0a5d0d9938..520910eae0 100644 --- a/src/ServiceControl.Audit/Auditing/AuditPersister.cs +++ b/src/ServiceControl.Audit/Auditing/AuditPersister.cs @@ -3,7 +3,9 @@ using System; using System.Collections.Generic; using System.Diagnostics; + using System.Diagnostics.Metrics; using System.Text.Json; + using System.Threading; using System.Threading.Tasks; using Infrastructure; using Monitoring; @@ -15,30 +17,14 @@ using ServiceControl.Audit.Persistence.Monitoring; using ServiceControl.EndpointPlugin.Messages.SagaState; using ServiceControl.Infrastructure; - using ServiceControl.Infrastructure.Metrics; using ServiceControl.SagaAudit; - class AuditPersister + class AuditPersister(IAuditIngestionUnitOfWorkFactory unitOfWorkFactory, + IEnrichImportedAuditMessages[] enrichers, + IMessageSession messageSession, + Lazy messageDispatcher) { - public AuditPersister(IAuditIngestionUnitOfWorkFactory unitOfWorkFactory, - IEnrichImportedAuditMessages[] enrichers, - Counter ingestedAuditMeter, Counter ingestedSagaAuditMeter, Meter auditBulkInsertDurationMeter, - Meter sagaAuditBulkInsertDurationMeter, Meter bulkInsertCommitDurationMeter, IMessageSession messageSession, - Lazy messageDispatcher) - { - this.unitOfWorkFactory = unitOfWorkFactory; - this.enrichers = enrichers; - - this.ingestedAuditMeter = ingestedAuditMeter; - this.ingestedSagaAuditMeter = ingestedSagaAuditMeter; - this.auditBulkInsertDurationMeter = auditBulkInsertDurationMeter; - this.sagaAuditBulkInsertDurationMeter = sagaAuditBulkInsertDurationMeter; - this.bulkInsertCommitDurationMeter = bulkInsertCommitDurationMeter; - this.messageSession = messageSession; - this.messageDispatcher = messageDispatcher; - } - - public async Task> Persist(IReadOnlyList contexts) + public async Task> Persist(IReadOnlyList contexts, CancellationToken cancellationToken) { var stopwatch = Stopwatch.StartNew(); @@ -51,9 +37,8 @@ public async Task> Persist(IReadOnlyList(contexts.Count); foreach (var context in contexts) { @@ -89,12 +74,11 @@ public async Task> Persist(IReadOnlyList> Persist(IReadOnlyList> Persist(IReadOnlyList messageDispatcher; + readonly Counter storedAudits = AuditMetrics.Meter.CreateCounter($"{AuditMetrics.Prefix}.stored_audit_messages"); + readonly Counter storedSagas = AuditMetrics.Meter.CreateCounter($"{AuditMetrics.Prefix}.stored_saga_audits"); + readonly Histogram auditBulkInsertDuration = AuditMetrics.Meter.CreateHistogram($"{AuditMetrics.Prefix}.bulk_insert_duration_audits", unit: "ms"); + readonly Histogram sagaAuditBulkInsertDuration = AuditMetrics.Meter.CreateHistogram($"{AuditMetrics.Prefix}.bulk_insert_duration_sagas", unit: "ms"); + readonly Histogram auditCommitDuration = AuditMetrics.Meter.CreateHistogram($"{AuditMetrics.Prefix}.commit_duration_audits", unit: "ms"); - readonly IEnrichImportedAuditMessages[] enrichers; - readonly IAuditIngestionUnitOfWorkFactory unitOfWorkFactory; static readonly ILog Logger = LogManager.GetLogger(); } } \ No newline at end of file diff --git a/src/ServiceControl.Audit/Auditing/ImportFailedAudits.cs b/src/ServiceControl.Audit/Auditing/ImportFailedAudits.cs index 7da46df703..5c3b67f17e 100644 --- a/src/ServiceControl.Audit/Auditing/ImportFailedAudits.cs +++ b/src/ServiceControl.Audit/Auditing/ImportFailedAudits.cs @@ -23,7 +23,7 @@ public ImportFailedAudits( public async Task Run(CancellationToken cancellationToken = default) { - await auditIngestor.VerifyCanReachForwardingAddress(); + await auditIngestor.VerifyCanReachForwardingAddress(cancellationToken); var succeeded = 0; var failed = 0; @@ -37,7 +37,7 @@ await failedAuditStore.ProcessFailedMessages( var taskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); messageContext.SetTaskCompletionSource(taskCompletionSource); - await auditIngestor.Ingest([messageContext]); + await auditIngestor.Ingest([messageContext], cancellationToken); await taskCompletionSource.Task; diff --git a/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs b/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs index 8968565a50..47ac4e2976 100644 --- a/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs +++ b/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs @@ -6,7 +6,6 @@ namespace ServiceControl.Audit; using System.Threading.Tasks; using Auditing; using Infrastructure; -using Infrastructure.Metrics; using Infrastructure.Settings; using Microsoft.AspNetCore.HttpLogging; using Microsoft.Extensions.DependencyInjection; @@ -20,6 +19,8 @@ namespace ServiceControl.Audit; using NServiceBus.Transport; using Persistence; using Transports; +using OpenTelemetry.Metrics; +using OpenTelemetry.Resources; static class HostApplicationBuilderExtensions { @@ -28,10 +29,11 @@ public static void AddServiceControlAudit(this IHostApplicationBuilder builder, Settings settings, EndpointConfiguration configuration) { + var version = FileVersionInfo.GetVersionInfo(typeof(HostApplicationBuilderExtensions).Assembly.Location).ProductVersion; var persistenceConfiguration = PersistenceConfigurationFactory.LoadPersistenceConfiguration(settings); var persistenceSettings = persistenceConfiguration.BuildPersistenceSettings(settings); - RecordStartup(settings, configuration, persistenceConfiguration); + RecordStartup(version, settings, configuration, persistenceConfiguration); builder.Logging.ClearProviders(); builder.Logging.AddNLog(); @@ -61,13 +63,35 @@ public static void AddServiceControlAudit(this IHostApplicationBuilder builder, // directly and to make things more complex of course the order of registration still matters ;) services.AddSingleton(provider => new Lazy(provider.GetRequiredService)); - services.AddMetrics(settings.PrintMetrics); - services.AddPersistence(persistenceSettings, persistenceConfiguration); NServiceBusFactory.Configure(settings, transportCustomization, transportSettings, onCriticalError, configuration); builder.UseNServiceBus(configuration); + if (!string.IsNullOrEmpty(settings.OtelMetricsUrl)) + { + if (!Uri.TryCreate(settings.OtelMetricsUrl, UriKind.Absolute, out var otelMetricsUri)) + { + throw new UriFormatException($"Invalid OtelMetricsUrl: {settings.OtelMetricsUrl}"); + } + builder.Services.AddOpenTelemetry() + .ConfigureResource(b => b.AddService( + serviceName: "Particular.ServiceControl.Audit", + serviceVersion: version, + serviceInstanceId: settings.InstanceName)) + .WithMetrics(b => + { + b.AddMeter(AuditMetrics.MeterName); + b.AddOtlpExporter(e => + { + e.Endpoint = otelMetricsUri; + }); + }); + var logger = LogManager.GetLogger(typeof(HostApplicationBuilderExtensions)); + logger.InfoFormat("OpenTelemetry metrics exporter enabled: {0}", settings.OtelMetricsUrl); + + } + // Configure after the NServiceBus hosted service to ensure NServiceBus is already started if (settings.IngestAuditMessages) { @@ -84,10 +108,8 @@ public static void AddServiceControlAuditInstallers(this IHostApplicationBuilder builder.Services.AddInstaller(persistenceSettings, persistenceConfiguration); } - static void RecordStartup(Settings settings, EndpointConfiguration endpointConfiguration, IPersistenceConfiguration persistenceConfiguration) + static void RecordStartup(string version, Settings settings, EndpointConfiguration endpointConfiguration, IPersistenceConfiguration persistenceConfiguration) { - var version = FileVersionInfo.GetVersionInfo(typeof(HostApplicationBuilderExtensions).Assembly.Location).ProductVersion; - var startupMessage = $@" ------------------------------------------------------------- ServiceControl Audit Version: {version} @@ -101,9 +123,6 @@ static void RecordStartup(Settings settings, EndpointConfiguration endpointConfi var logger = LogManager.GetLogger(typeof(HostApplicationBuilderExtensions)); logger.Info(startupMessage); - endpointConfiguration.GetSettings().AddStartupDiagnosticsSection("Startup", new - { - Settings = settings - }); + endpointConfiguration.GetSettings().AddStartupDiagnosticsSection("Startup", new { Settings = settings }); } } \ No newline at end of file diff --git a/src/ServiceControl.Audit/Infrastructure/AuditMetrics.cs b/src/ServiceControl.Audit/Infrastructure/AuditMetrics.cs new file mode 100644 index 0000000000..4e07ae3453 --- /dev/null +++ b/src/ServiceControl.Audit/Infrastructure/AuditMetrics.cs @@ -0,0 +1,10 @@ +namespace ServiceControl.Audit; + +using System.Diagnostics.Metrics; + +static class AuditMetrics +{ + public const string MeterName = "Particular.ServiceControl"; + public static readonly Meter Meter = new(MeterName, "0.1.0"); + public static readonly string Prefix = "particular.servicecontrol.audit"; +} \ No newline at end of file diff --git a/src/ServiceControl.Audit/Infrastructure/Metrics/MetricsReporterHostedService.cs b/src/ServiceControl.Audit/Infrastructure/Metrics/MetricsReporterHostedService.cs deleted file mode 100644 index c378a8f4d6..0000000000 --- a/src/ServiceControl.Audit/Infrastructure/Metrics/MetricsReporterHostedService.cs +++ /dev/null @@ -1,30 +0,0 @@ -namespace ServiceControl.Audit.Infrastructure.Metrics -{ - using System; - using System.Threading; - using System.Threading.Tasks; - using Microsoft.Extensions.Hosting; - using NServiceBus.Logging; - using ServiceControl.Infrastructure.Metrics; - - class MetricsReporterHostedService : IHostedService - { - readonly Metrics metrics; - MetricsReporter reporter; - - public MetricsReporterHostedService(Metrics metrics) => this.metrics = metrics; - - public Task StartAsync(CancellationToken cancellationToken) - { - var metricsLog = LogManager.GetLogger("Metrics"); - - reporter = new MetricsReporter(metrics, x => metricsLog.Info(x), TimeSpan.FromSeconds(5)); - - reporter.Start(); - - return Task.CompletedTask; - } - - public Task StopAsync(CancellationToken cancellationToken) => reporter.Stop(); - } -} \ No newline at end of file diff --git a/src/ServiceControl.Audit/Infrastructure/Metrics/MetricsServiceCollectionExtensions.cs b/src/ServiceControl.Audit/Infrastructure/Metrics/MetricsServiceCollectionExtensions.cs deleted file mode 100644 index a7bc6b1dd5..0000000000 --- a/src/ServiceControl.Audit/Infrastructure/Metrics/MetricsServiceCollectionExtensions.cs +++ /dev/null @@ -1,14 +0,0 @@ -namespace ServiceControl.Audit.Infrastructure.Metrics -{ - using Microsoft.Extensions.DependencyInjection; - using ServiceControl.Infrastructure.Metrics; - - static class MetricsServiceCollectionExtensions - { - public static void AddMetrics(this IServiceCollection services, bool printMetrics) - { - services.AddSingleton(new Metrics { Enabled = printMetrics }); - services.AddHostedService(); - } - } -} \ No newline at end of file diff --git a/src/ServiceControl.Audit/Infrastructure/Settings/Settings.cs b/src/ServiceControl.Audit/Infrastructure/Settings/Settings.cs index 183b695555..d6a84f0662 100644 --- a/src/ServiceControl.Audit/Infrastructure/Settings/Settings.cs +++ b/src/ServiceControl.Audit/Infrastructure/Settings/Settings.cs @@ -109,6 +109,7 @@ public string RootUrl public int Port { get; set; } public bool PrintMetrics => SettingsReader.Read(SettingsRootNamespace, "PrintMetrics"); + public string OtelMetricsUrl { get; set; } = SettingsReader.Read(SettingsRootNamespace, nameof(OtelMetricsUrl)); public string Hostname { get; private set; } public string VirtualDirectory => SettingsReader.Read(SettingsRootNamespace, "VirtualDirectory", string.Empty); diff --git a/src/ServiceControl.Audit/ServiceControl.Audit.csproj b/src/ServiceControl.Audit/ServiceControl.Audit.csproj index 3303782e3f..8f41ba97b1 100644 --- a/src/ServiceControl.Audit/ServiceControl.Audit.csproj +++ b/src/ServiceControl.Audit/ServiceControl.Audit.csproj @@ -18,7 +18,6 @@ - @@ -29,6 +28,8 @@ + + diff --git a/src/ServiceControl.Infrastructure/LoggingConfigurator.cs b/src/ServiceControl.Infrastructure/LoggingConfigurator.cs index 6cfbf98e54..7a041434fc 100644 --- a/src/ServiceControl.Infrastructure/LoggingConfigurator.cs +++ b/src/ServiceControl.Infrastructure/LoggingConfigurator.cs @@ -22,7 +22,7 @@ public static void ConfigureLogging(LoggingSettings loggingSettings) } var nlogConfig = new LoggingConfiguration(); - var simpleLayout = new SimpleLayout("${longdate}|${threadid}|${level}|${logger}|${message}${onexception:|${exception:format=tostring}}"); + var simpleLayout = new SimpleLayout("${longdate}|${processtime}|${threadid:padding=2}|${level:padding=5}|${logger:padding=70}|${message}${onexception:|${exception:format=tostring}}"); var fileTarget = new FileTarget {