diff --git a/src/Directory.Packages.props b/src/Directory.Packages.props
index 37eaf6fc3b..044ee33716 100644
--- a/src/Directory.Packages.props
+++ b/src/Directory.Packages.props
@@ -50,6 +50,8 @@
+
+
diff --git a/src/ServiceControl.Audit.AcceptanceTests/TestSupport/ServiceControlComponentRunner.cs b/src/ServiceControl.Audit.AcceptanceTests/TestSupport/ServiceControlComponentRunner.cs
index 35990ee062..0866b504db 100644
--- a/src/ServiceControl.Audit.AcceptanceTests/TestSupport/ServiceControlComponentRunner.cs
+++ b/src/ServiceControl.Audit.AcceptanceTests/TestSupport/ServiceControlComponentRunner.cs
@@ -55,7 +55,6 @@ async Task InitializeServiceControl(ScenarioContext context)
{
var id = messageContext.NativeMessageId;
var headers = messageContext.Headers;
-
var log = NServiceBus.Logging.LogManager.GetLogger();
headers.TryGetValue(Headers.MessageId, out var originalMessageId);
log.Debug($"OnMessage for message '{id}'({originalMessageId ?? string.Empty}).");
diff --git a/src/ServiceControl.Audit.Persistence.InMemory/InMemoryAuditIngestionUnitOfWorkFactory.cs b/src/ServiceControl.Audit.Persistence.InMemory/InMemoryAuditIngestionUnitOfWorkFactory.cs
index ee654999a1..f07e7036de 100644
--- a/src/ServiceControl.Audit.Persistence.InMemory/InMemoryAuditIngestionUnitOfWorkFactory.cs
+++ b/src/ServiceControl.Audit.Persistence.InMemory/InMemoryAuditIngestionUnitOfWorkFactory.cs
@@ -1,5 +1,6 @@
namespace ServiceControl.Audit.Persistence.InMemory
{
+ using System.Threading;
using System.Threading.Tasks;
using ServiceControl.Audit.Auditing.BodyStorage;
using ServiceControl.Audit.Persistence.UnitOfWork;
@@ -12,7 +13,7 @@ public InMemoryAuditIngestionUnitOfWorkFactory(InMemoryAuditDataStore dataStore,
bodyStorageEnricher = new BodyStorageEnricher(bodyStorage, settings);
}
- public ValueTask StartNew(int batchSize)
+ public ValueTask StartNew(int batchSize, CancellationToken cancellationToken)
{
//The batchSize argument is ignored: the in-memory storage implementation doesn't support batching.
return new ValueTask(new InMemoryAuditIngestionUnitOfWork(dataStore, bodyStorageEnricher));
diff --git a/src/ServiceControl.Audit.Persistence.RavenDB/UnitOfWork/RavenAuditUnitOfWorkFactory.cs b/src/ServiceControl.Audit.Persistence.RavenDB/UnitOfWork/RavenAuditUnitOfWorkFactory.cs
index 3f9c56a6c8..8a9747fbc0 100644
--- a/src/ServiceControl.Audit.Persistence.RavenDB/UnitOfWork/RavenAuditUnitOfWorkFactory.cs
+++ b/src/ServiceControl.Audit.Persistence.RavenDB/UnitOfWork/RavenAuditUnitOfWorkFactory.cs
@@ -13,9 +13,10 @@ class RavenAuditIngestionUnitOfWorkFactory(
MinimumRequiredStorageState customCheckState)
: IAuditIngestionUnitOfWorkFactory
{
- public async ValueTask StartNew(int batchSize)
+ public async ValueTask StartNew(int batchSize, CancellationToken cancellationToken)
{
- var timedCancellationSource = new CancellationTokenSource(databaseConfiguration.BulkInsertCommitTimeout);
+ var timedCancellationSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
+ timedCancellationSource.CancelAfter(databaseConfiguration.BulkInsertCommitTimeout);
var bulkInsert = (await documentStoreProvider.GetDocumentStore(timedCancellationSource.Token))
.BulkInsert(new BulkInsertOptions { SkipOverwriteIfUnchanged = true, }, timedCancellationSource.Token);
diff --git a/src/ServiceControl.Audit.Persistence/UnitOfWork/IAuditIngestionUnitOfWorkFactory.cs b/src/ServiceControl.Audit.Persistence/UnitOfWork/IAuditIngestionUnitOfWorkFactory.cs
index 25a5859e50..1d3470bdea 100644
--- a/src/ServiceControl.Audit.Persistence/UnitOfWork/IAuditIngestionUnitOfWorkFactory.cs
+++ b/src/ServiceControl.Audit.Persistence/UnitOfWork/IAuditIngestionUnitOfWorkFactory.cs
@@ -1,10 +1,11 @@
namespace ServiceControl.Audit.Persistence.UnitOfWork
{
+ using System.Threading;
using System.Threading.Tasks;
public interface IAuditIngestionUnitOfWorkFactory
{
- ValueTask StartNew(int batchSize); //Throws if not enough space or some other problem preventing from writing data
+ ValueTask StartNew(int batchSize, CancellationToken cancellationToken = default); //Throws if not enough space or some other problem preventing from writing data
bool CanIngestMore();
}
}
\ No newline at end of file
diff --git a/src/ServiceControl.Audit.UnitTests/ApprovalFiles/APIApprovals.PlatformSampleSettings.approved.txt b/src/ServiceControl.Audit.UnitTests/ApprovalFiles/APIApprovals.PlatformSampleSettings.approved.txt
index c1dbbb4ec1..9862349774 100644
--- a/src/ServiceControl.Audit.UnitTests/ApprovalFiles/APIApprovals.PlatformSampleSettings.approved.txt
+++ b/src/ServiceControl.Audit.UnitTests/ApprovalFiles/APIApprovals.PlatformSampleSettings.approved.txt
@@ -12,6 +12,7 @@
"ApiUrl": "http://localhost:8888/api",
"Port": 8888,
"PrintMetrics": false,
+ "OtelMetricsUrl": null,
"Hostname": "localhost",
"VirtualDirectory": "",
"TransportType": "LearningTransport",
diff --git a/src/ServiceControl.Audit/App.config b/src/ServiceControl.Audit/App.config
index 83610fa6ee..7cf230c9f0 100644
--- a/src/ServiceControl.Audit/App.config
+++ b/src/ServiceControl.Audit/App.config
@@ -8,7 +8,7 @@ These settings are only here so that we can debug ServiceControl while developin
-
+
diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs
index 56d4244e84..602c2ff34e 100644
--- a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs
+++ b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs
@@ -3,6 +3,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
+ using System.Diagnostics.Metrics;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
@@ -14,18 +15,14 @@
using Persistence;
using Persistence.UnitOfWork;
using ServiceControl.Infrastructure;
- using ServiceControl.Infrastructure.Metrics;
using Transports;
class AuditIngestion : IHostedService
{
- static readonly long FrequencyInMilliseconds = Stopwatch.Frequency / 1000;
-
public AuditIngestion(
Settings settings,
ITransportCustomization transportCustomization,
TransportSettings transportSettings,
- Metrics metrics,
IFailedAuditStorage failedImportsStorage,
AuditIngestionCustomCheck.State ingestionState,
AuditIngestor auditIngestor,
@@ -40,10 +37,6 @@ public AuditIngestion(
this.settings = settings;
this.applicationLifetime = applicationLifetime;
- batchSizeMeter = metrics.GetMeter("Audit ingestion - batch size");
- batchDurationMeter = metrics.GetMeter("Audit ingestion - batch processing duration", FrequencyInMilliseconds);
- receivedMeter = metrics.GetCounter("Audit ingestion - received");
-
if (!transportSettings.MaxConcurrency.HasValue)
{
throw new ArgumentException("MaxConcurrency is not set in TransportSettings");
@@ -59,15 +52,27 @@ public AuditIngestion(
errorHandlingPolicy = new AuditIngestionFaultPolicy(failedImportsStorage, settings.LoggingSettings, OnCriticalError);
- watchdog = new Watchdog("audit message ingestion", EnsureStarted, EnsureStopped, ingestionState.ReportError, ingestionState.Clear, settings.TimeToRestartAuditIngestionAfterFailure, logger);
-
- ingestionWorker = Task.Run(() => Loop(), CancellationToken.None);
+ watchdog = new Watchdog(
+ "audit message ingestion",
+ EnsureStarted,
+ EnsureStopped,
+ ingestionState.ReportError,
+ ingestionState.Clear,
+ settings.TimeToRestartAuditIngestionAfterFailure,
+ logger
+ );
}
- public Task StartAsync(CancellationToken _) => watchdog.Start(() => applicationLifetime.StopApplication());
+ public async Task StartAsync(CancellationToken cancellationToken)
+ {
+ stopSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
+ ingestionWorker = Loop(stopSource.Token);
+ await watchdog.Start(() => applicationLifetime.StopApplication());
+ }
public async Task StopAsync(CancellationToken cancellationToken)
{
+ await stopSource.CancelAsync();
await watchdog.Stop();
channel.Writer.Complete();
await ingestionWorker;
@@ -102,6 +107,7 @@ async Task EnsureStarted(CancellationToken cancellationToken = default)
await stoppable.StopReceive(cancellationToken);
logger.Info("Shutting down due to failed persistence health check. Infrastructure shut down completed");
}
+
return;
}
@@ -123,7 +129,7 @@ async Task EnsureStarted(CancellationToken cancellationToken = default)
queueIngestor = transportInfrastructure.Receivers[inputEndpoint];
- await auditIngestor.VerifyCanReachForwardingAddress();
+ await auditIngestor.VerifyCanReachForwardingAddress(cancellationToken);
await queueIngestor.StartReceive(cancellationToken);
@@ -168,6 +174,7 @@ async Task EnsureStopped(CancellationToken cancellationToken = default)
logger.Info("Shutting down. Already stopped, skipping shut down");
return; //Already stopped
}
+
var stoppable = queueIngestor;
queueIngestor = null;
logger.Info("Shutting down. Infrastructure shut down commencing");
@@ -196,63 +203,108 @@ async Task OnMessage(MessageContext messageContext, CancellationToken cancellati
var taskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
messageContext.SetTaskCompletionSource(taskCompletionSource);
- receivedMeter.Mark();
+ receivedAudits.Add(1);
await channel.Writer.WriteAsync(messageContext, cancellationToken);
await taskCompletionSource.Task;
}
- async Task Loop()
+ async Task Loop(CancellationToken cancellationToken)
{
- var contexts = new List(transportSettings.MaxConcurrency.Value);
-
- while (await channel.Reader.WaitToReadAsync())
+ try
{
- // will only enter here if there is something to read.
- try
+ var contexts = new List(transportSettings.MaxConcurrency.Value);
+
+ long sequentialFailureCount = 0;
+ DateTime lastSuccess = DateTime.MinValue;
+
+ while (await channel.Reader.WaitToReadAsync(cancellationToken))
{
- // as long as there is something to read this will fetch up to MaximumConcurrency items
- while (channel.Reader.TryRead(out var context))
+ var sw = Stopwatch.StartNew();
+ // TODO: Add timeout handling, if processing takes over for example 1 minute
+ // will only enter here if there is something to read.
+ try
{
- contexts.Add(context);
- }
+ // as long as there is something to read this will fetch up to MaximumConcurrency items
+ while (channel.Reader.TryRead(out var context))
+ {
+ contexts.Add(context);
+ auditMessageSize.Record(context.Body.Length / 1024D);
+ }
+
+ auditBatchSize.Record(contexts.Count);
- batchSizeMeter.Mark(contexts.Count);
- using (batchDurationMeter.Measure())
+ await auditIngestor.Ingest(contexts, cancellationToken);
+ auditBatchDuration.Record(sw.ElapsedMilliseconds);
+
+ // No locking for consistency needed, just write, don't care about multi-threading
+ sequentialFailureCount = 0;
+ lastSuccess = DateTime.UtcNow;
+ }
+ catch (OperationCanceledException e) when (e.CancellationToken == cancellationToken)
{
- await auditIngestor.Ingest(contexts);
+ logger.Debug("Cancelled by host");
+ return; // No point in continueing as WaitToReadAsync will throw OCE
}
- }
- catch (OperationCanceledException)
- {
- //Do nothing as we are shutting down
- continue;
- }
- catch (Exception e) // show must go on
- {
- if (logger.IsInfoEnabled)
+ catch (Exception e) // show must go on
{
- logger.Info("Ingesting messages failed", e);
+ Interlocked.Increment(ref sequentialFailureCount);
+ logger.Warn($"Batch processing failed [#{sequentialFailureCount} @{lastSuccess:O}] ", e);
+
+ // Signal circuitbreaker, throttle whatever
+
+ // signal all message handling tasks to terminate
+ foreach (var context in contexts)
+ {
+ if (!context.GetTaskCompletionSource().TrySetException(e))
+ {
+ logger.Error("Loop TrySetException failed");
+ }
+ }
}
-
- // signal all message handling tasks to terminate
- foreach (var context in contexts)
+ finally
{
- context.GetTaskCompletionSource().TrySetException(e);
+ const int infoThreshold = 5000;
+ const int warnThreshold = 15000;
+ const int errorThreshold = 60000;
+ var elapsed = sw.ElapsedMilliseconds;
+
+ if (elapsed > errorThreshold)
+ {
+ logger.ErrorFormat("Processing duration {0} exceeded {1}", elapsed, errorThreshold);
+ }
+ else if (elapsed > warnThreshold)
+ {
+ logger.WarnFormat("Processing duration {0} exceeded {1}", elapsed, warnThreshold);
+ }
+ else if (elapsed > infoThreshold)
+ {
+ logger.InfoFormat("Processing duration {0} exceeded {1}", elapsed, infoThreshold);
+ }
+
+ contexts.Clear();
}
}
- finally
- {
- contexts.Clear();
- }
+ // will fall out here when writer is completed
+ }
+ catch (OperationCanceledException e) when (e.CancellationToken == cancellationToken)
+ {
+ logger.Debug("Cancelled by host");
+ }
+ catch (Exception e)
+ {
+ // Might the next exception scope throw an exception, consider this fatal as that cannot be an OCE
+ logger.Fatal("Loop interrupted", e);
+ applicationLifetime.StopApplication();
+ throw;
}
- // will fall out here when writer is completed
}
TransportInfrastructure transportInfrastructure;
IMessageReceiver queueIngestor;
+ Task ingestionWorker;
- readonly SemaphoreSlim startStopSemaphore = new SemaphoreSlim(1);
+ readonly SemaphoreSlim startStopSemaphore = new(1);
readonly string inputEndpoint;
readonly ITransportCustomization transportCustomization;
readonly TransportSettings transportSettings;
@@ -261,13 +313,15 @@ async Task Loop()
readonly IAuditIngestionUnitOfWorkFactory unitOfWorkFactory;
readonly Settings settings;
readonly Channel channel;
- readonly Meter batchSizeMeter;
- readonly Meter batchDurationMeter;
- readonly Counter receivedMeter;
+ readonly Histogram auditBatchSize = AuditMetrics.Meter.CreateHistogram($"{AuditMetrics.Prefix}.batch_size_audits");
+ readonly Histogram auditBatchDuration = AuditMetrics.Meter.CreateHistogram($"{AuditMetrics.Prefix}.batch_duration_audits", unit: "ms");
+ readonly Histogram auditMessageSize = AuditMetrics.Meter.CreateHistogram($"{AuditMetrics.Prefix}.audit_message_size", unit: "kilobytes");
+ readonly Counter receivedAudits = AuditMetrics.Meter.CreateCounter($"{AuditMetrics.Prefix}.received_audits");
readonly Watchdog watchdog;
- readonly Task ingestionWorker;
readonly IHostApplicationLifetime applicationLifetime;
+ CancellationTokenSource stopSource;
+
static readonly ILog logger = LogManager.GetLogger();
}
}
\ No newline at end of file
diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestor.cs b/src/ServiceControl.Audit/Auditing/AuditIngestor.cs
index 84d110a1f2..62ee58360f 100644
--- a/src/ServiceControl.Audit/Auditing/AuditIngestor.cs
+++ b/src/ServiceControl.Audit/Auditing/AuditIngestor.cs
@@ -2,8 +2,8 @@
{
using System;
using System.Collections.Generic;
- using System.Diagnostics;
using System.Linq;
+ using System.Threading;
using System.Threading.Tasks;
using Infrastructure.Settings;
using Monitoring;
@@ -14,13 +14,11 @@
using Persistence.UnitOfWork;
using Recoverability;
using SagaAudit;
- using ServiceControl.Infrastructure.Metrics;
using ServiceControl.Transports;
public class AuditIngestor
{
public AuditIngestor(
- Metrics metrics,
Settings settings,
IAuditIngestionUnitOfWorkFactory unitOfWorkFactory,
EndpointInstanceMonitoring endpointInstanceMonitoring,
@@ -32,36 +30,26 @@ ITransportCustomization transportCustomization
{
this.settings = settings;
this.messageDispatcher = messageDispatcher;
-
- var ingestedAuditMeter = metrics.GetCounter("Audit ingestion - ingested audit");
- var ingestedSagaAuditMeter = metrics.GetCounter("Audit ingestion - ingested saga audit");
- var auditBulkInsertDurationMeter = metrics.GetMeter("Audit ingestion - audit bulk insert duration", FrequencyInMilliseconds);
- var sagaAuditBulkInsertDurationMeter = metrics.GetMeter("Audit ingestion - saga audit bulk insert duration", FrequencyInMilliseconds);
- var bulkInsertCommitDurationMeter = metrics.GetMeter("Audit ingestion - bulk insert commit duration", FrequencyInMilliseconds);
-
- var enrichers = new IEnrichImportedAuditMessages[]
- {
- new MessageTypeEnricher(),
- new EnrichWithTrackingIds(),
- new ProcessingStatisticsEnricher(),
- new DetectNewEndpointsFromAuditImportsEnricher(endpointInstanceMonitoring),
- new DetectSuccessfulRetriesEnricher(),
- new SagaRelationshipsEnricher()
- }.Concat(auditEnrichers).ToArray();
+ var enrichers = new IEnrichImportedAuditMessages[] { new MessageTypeEnricher(), new EnrichWithTrackingIds(), new ProcessingStatisticsEnricher(), new DetectNewEndpointsFromAuditImportsEnricher(endpointInstanceMonitoring), new DetectSuccessfulRetriesEnricher(), new SagaRelationshipsEnricher() }.Concat(auditEnrichers).ToArray();
logQueueAddress = transportCustomization.ToTransportQualifiedQueueName(settings.AuditLogQueue);
- auditPersister = new AuditPersister(unitOfWorkFactory, enrichers, ingestedAuditMeter, ingestedSagaAuditMeter, auditBulkInsertDurationMeter, sagaAuditBulkInsertDurationMeter, bulkInsertCommitDurationMeter, messageSession, messageDispatcher);
+ auditPersister = new AuditPersister(
+ unitOfWorkFactory,
+ enrichers,
+ messageSession,
+ messageDispatcher
+ );
}
- public async Task Ingest(List contexts)
+ public async Task Ingest(List contexts, CancellationToken cancellationToken)
{
if (Log.IsDebugEnabled)
{
Log.Debug($"Ingesting {contexts.Count} message contexts");
}
- var stored = await auditPersister.Persist(contexts);
+ var stored = await auditPersister.Persist(contexts, cancellationToken);
try
{
@@ -71,7 +59,8 @@ public async Task Ingest(List contexts)
{
Log.Debug($"Forwarding {stored.Count} messages");
}
- await Forward(stored, logQueueAddress);
+
+ await Forward(stored, logQueueAddress, cancellationToken);
if (Log.IsDebugEnabled)
{
Log.Debug("Forwarded messages");
@@ -80,7 +69,10 @@ public async Task Ingest(List contexts)
foreach (var context in contexts)
{
- context.GetTaskCompletionSource().TrySetResult(true);
+ if (!context.GetTaskCompletionSource().TrySetResult(true))
+ {
+ Log.Warn("TrySetResult failed");
+ }
}
}
catch (Exception e)
@@ -95,7 +87,7 @@ public async Task Ingest(List contexts)
}
}
- Task Forward(IReadOnlyCollection messageContexts, string forwardingAddress)
+ Task Forward(IReadOnlyCollection messageContexts, string forwardingAddress, CancellationToken cancellationToken)
{
var transportOperations = new TransportOperation[messageContexts.Count]; //We could allocate based on the actual number of ProcessedMessages but this should be OK
var index = 0;
@@ -112,7 +104,8 @@ Task Forward(IReadOnlyCollection messageContexts, string forward
var outgoingMessage = new OutgoingMessage(
messageContext.NativeMessageId,
messageContext.Headers,
- messageContext.Body);
+ messageContext.Body
+ );
// Forwarded messages should last as long as possible
outgoingMessage.Headers.Remove(Headers.TimeToBeReceived);
@@ -124,12 +117,13 @@ Task Forward(IReadOnlyCollection messageContexts, string forward
return anyContext != null
? messageDispatcher.Value.Dispatch(
new TransportOperations(transportOperations),
- anyContext.TransportTransaction
+ anyContext.TransportTransaction,
+ cancellationToken
)
: Task.CompletedTask;
}
- public async Task VerifyCanReachForwardingAddress()
+ public async Task VerifyCanReachForwardingAddress(CancellationToken cancellationToken)
{
if (!settings.ForwardAuditMessages)
{
@@ -146,7 +140,7 @@ public async Task VerifyCanReachForwardingAddress()
)
);
- await messageDispatcher.Value.Dispatch(transportOperations, new TransportTransaction());
+ await messageDispatcher.Value.Dispatch(transportOperations, new TransportTransaction(), cancellationToken);
}
catch (Exception e)
{
@@ -159,7 +153,6 @@ public async Task VerifyCanReachForwardingAddress()
readonly Lazy messageDispatcher;
readonly string logQueueAddress;
- static readonly long FrequencyInMilliseconds = Stopwatch.Frequency / 1000;
static readonly ILog Log = LogManager.GetLogger();
}
}
\ No newline at end of file
diff --git a/src/ServiceControl.Audit/Auditing/AuditPersister.cs b/src/ServiceControl.Audit/Auditing/AuditPersister.cs
index 0a5d0d9938..520910eae0 100644
--- a/src/ServiceControl.Audit/Auditing/AuditPersister.cs
+++ b/src/ServiceControl.Audit/Auditing/AuditPersister.cs
@@ -3,7 +3,9 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
+ using System.Diagnostics.Metrics;
using System.Text.Json;
+ using System.Threading;
using System.Threading.Tasks;
using Infrastructure;
using Monitoring;
@@ -15,30 +17,14 @@
using ServiceControl.Audit.Persistence.Monitoring;
using ServiceControl.EndpointPlugin.Messages.SagaState;
using ServiceControl.Infrastructure;
- using ServiceControl.Infrastructure.Metrics;
using ServiceControl.SagaAudit;
- class AuditPersister
+ class AuditPersister(IAuditIngestionUnitOfWorkFactory unitOfWorkFactory,
+ IEnrichImportedAuditMessages[] enrichers,
+ IMessageSession messageSession,
+ Lazy messageDispatcher)
{
- public AuditPersister(IAuditIngestionUnitOfWorkFactory unitOfWorkFactory,
- IEnrichImportedAuditMessages[] enrichers,
- Counter ingestedAuditMeter, Counter ingestedSagaAuditMeter, Meter auditBulkInsertDurationMeter,
- Meter sagaAuditBulkInsertDurationMeter, Meter bulkInsertCommitDurationMeter, IMessageSession messageSession,
- Lazy messageDispatcher)
- {
- this.unitOfWorkFactory = unitOfWorkFactory;
- this.enrichers = enrichers;
-
- this.ingestedAuditMeter = ingestedAuditMeter;
- this.ingestedSagaAuditMeter = ingestedSagaAuditMeter;
- this.auditBulkInsertDurationMeter = auditBulkInsertDurationMeter;
- this.sagaAuditBulkInsertDurationMeter = sagaAuditBulkInsertDurationMeter;
- this.bulkInsertCommitDurationMeter = bulkInsertCommitDurationMeter;
- this.messageSession = messageSession;
- this.messageDispatcher = messageDispatcher;
- }
-
- public async Task> Persist(IReadOnlyList contexts)
+ public async Task> Persist(IReadOnlyList contexts, CancellationToken cancellationToken)
{
var stopwatch = Stopwatch.StartNew();
@@ -51,9 +37,8 @@ public async Task> Persist(IReadOnlyList(contexts.Count);
foreach (var context in contexts)
{
@@ -89,12 +74,11 @@ public async Task> Persist(IReadOnlyList> Persist(IReadOnlyList> Persist(IReadOnlyList messageDispatcher;
+ readonly Counter storedAudits = AuditMetrics.Meter.CreateCounter($"{AuditMetrics.Prefix}.stored_audit_messages");
+ readonly Counter storedSagas = AuditMetrics.Meter.CreateCounter($"{AuditMetrics.Prefix}.stored_saga_audits");
+ readonly Histogram auditBulkInsertDuration = AuditMetrics.Meter.CreateHistogram($"{AuditMetrics.Prefix}.bulk_insert_duration_audits", unit: "ms");
+ readonly Histogram sagaAuditBulkInsertDuration = AuditMetrics.Meter.CreateHistogram($"{AuditMetrics.Prefix}.bulk_insert_duration_sagas", unit: "ms");
+ readonly Histogram auditCommitDuration = AuditMetrics.Meter.CreateHistogram($"{AuditMetrics.Prefix}.commit_duration_audits", unit: "ms");
- readonly IEnrichImportedAuditMessages[] enrichers;
- readonly IAuditIngestionUnitOfWorkFactory unitOfWorkFactory;
static readonly ILog Logger = LogManager.GetLogger();
}
}
\ No newline at end of file
diff --git a/src/ServiceControl.Audit/Auditing/ImportFailedAudits.cs b/src/ServiceControl.Audit/Auditing/ImportFailedAudits.cs
index 7da46df703..5c3b67f17e 100644
--- a/src/ServiceControl.Audit/Auditing/ImportFailedAudits.cs
+++ b/src/ServiceControl.Audit/Auditing/ImportFailedAudits.cs
@@ -23,7 +23,7 @@ public ImportFailedAudits(
public async Task Run(CancellationToken cancellationToken = default)
{
- await auditIngestor.VerifyCanReachForwardingAddress();
+ await auditIngestor.VerifyCanReachForwardingAddress(cancellationToken);
var succeeded = 0;
var failed = 0;
@@ -37,7 +37,7 @@ await failedAuditStore.ProcessFailedMessages(
var taskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
messageContext.SetTaskCompletionSource(taskCompletionSource);
- await auditIngestor.Ingest([messageContext]);
+ await auditIngestor.Ingest([messageContext], cancellationToken);
await taskCompletionSource.Task;
diff --git a/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs b/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs
index 8968565a50..47ac4e2976 100644
--- a/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs
+++ b/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs
@@ -6,7 +6,6 @@ namespace ServiceControl.Audit;
using System.Threading.Tasks;
using Auditing;
using Infrastructure;
-using Infrastructure.Metrics;
using Infrastructure.Settings;
using Microsoft.AspNetCore.HttpLogging;
using Microsoft.Extensions.DependencyInjection;
@@ -20,6 +19,8 @@ namespace ServiceControl.Audit;
using NServiceBus.Transport;
using Persistence;
using Transports;
+using OpenTelemetry.Metrics;
+using OpenTelemetry.Resources;
static class HostApplicationBuilderExtensions
{
@@ -28,10 +29,11 @@ public static void AddServiceControlAudit(this IHostApplicationBuilder builder,
Settings settings,
EndpointConfiguration configuration)
{
+ var version = FileVersionInfo.GetVersionInfo(typeof(HostApplicationBuilderExtensions).Assembly.Location).ProductVersion;
var persistenceConfiguration = PersistenceConfigurationFactory.LoadPersistenceConfiguration(settings);
var persistenceSettings = persistenceConfiguration.BuildPersistenceSettings(settings);
- RecordStartup(settings, configuration, persistenceConfiguration);
+ RecordStartup(version, settings, configuration, persistenceConfiguration);
builder.Logging.ClearProviders();
builder.Logging.AddNLog();
@@ -61,13 +63,35 @@ public static void AddServiceControlAudit(this IHostApplicationBuilder builder,
// directly and to make things more complex of course the order of registration still matters ;)
services.AddSingleton(provider => new Lazy(provider.GetRequiredService));
- services.AddMetrics(settings.PrintMetrics);
-
services.AddPersistence(persistenceSettings, persistenceConfiguration);
NServiceBusFactory.Configure(settings, transportCustomization, transportSettings, onCriticalError, configuration);
builder.UseNServiceBus(configuration);
+ if (!string.IsNullOrEmpty(settings.OtelMetricsUrl))
+ {
+ if (!Uri.TryCreate(settings.OtelMetricsUrl, UriKind.Absolute, out var otelMetricsUri))
+ {
+ throw new UriFormatException($"Invalid OtelMetricsUrl: {settings.OtelMetricsUrl}");
+ }
+ builder.Services.AddOpenTelemetry()
+ .ConfigureResource(b => b.AddService(
+ serviceName: "Particular.ServiceControl.Audit",
+ serviceVersion: version,
+ serviceInstanceId: settings.InstanceName))
+ .WithMetrics(b =>
+ {
+ b.AddMeter(AuditMetrics.MeterName);
+ b.AddOtlpExporter(e =>
+ {
+ e.Endpoint = otelMetricsUri;
+ });
+ });
+ var logger = LogManager.GetLogger(typeof(HostApplicationBuilderExtensions));
+ logger.InfoFormat("OpenTelemetry metrics exporter enabled: {0}", settings.OtelMetricsUrl);
+
+ }
+
// Configure after the NServiceBus hosted service to ensure NServiceBus is already started
if (settings.IngestAuditMessages)
{
@@ -84,10 +108,8 @@ public static void AddServiceControlAuditInstallers(this IHostApplicationBuilder
builder.Services.AddInstaller(persistenceSettings, persistenceConfiguration);
}
- static void RecordStartup(Settings settings, EndpointConfiguration endpointConfiguration, IPersistenceConfiguration persistenceConfiguration)
+ static void RecordStartup(string version, Settings settings, EndpointConfiguration endpointConfiguration, IPersistenceConfiguration persistenceConfiguration)
{
- var version = FileVersionInfo.GetVersionInfo(typeof(HostApplicationBuilderExtensions).Assembly.Location).ProductVersion;
-
var startupMessage = $@"
-------------------------------------------------------------
ServiceControl Audit Version: {version}
@@ -101,9 +123,6 @@ static void RecordStartup(Settings settings, EndpointConfiguration endpointConfi
var logger = LogManager.GetLogger(typeof(HostApplicationBuilderExtensions));
logger.Info(startupMessage);
- endpointConfiguration.GetSettings().AddStartupDiagnosticsSection("Startup", new
- {
- Settings = settings
- });
+ endpointConfiguration.GetSettings().AddStartupDiagnosticsSection("Startup", new { Settings = settings });
}
}
\ No newline at end of file
diff --git a/src/ServiceControl.Audit/Infrastructure/AuditMetrics.cs b/src/ServiceControl.Audit/Infrastructure/AuditMetrics.cs
new file mode 100644
index 0000000000..4e07ae3453
--- /dev/null
+++ b/src/ServiceControl.Audit/Infrastructure/AuditMetrics.cs
@@ -0,0 +1,10 @@
+namespace ServiceControl.Audit;
+
+using System.Diagnostics.Metrics;
+
+static class AuditMetrics
+{
+ public const string MeterName = "Particular.ServiceControl";
+ public static readonly Meter Meter = new(MeterName, "0.1.0");
+ public static readonly string Prefix = "particular.servicecontrol.audit";
+}
\ No newline at end of file
diff --git a/src/ServiceControl.Audit/Infrastructure/Metrics/MetricsReporterHostedService.cs b/src/ServiceControl.Audit/Infrastructure/Metrics/MetricsReporterHostedService.cs
deleted file mode 100644
index c378a8f4d6..0000000000
--- a/src/ServiceControl.Audit/Infrastructure/Metrics/MetricsReporterHostedService.cs
+++ /dev/null
@@ -1,30 +0,0 @@
-namespace ServiceControl.Audit.Infrastructure.Metrics
-{
- using System;
- using System.Threading;
- using System.Threading.Tasks;
- using Microsoft.Extensions.Hosting;
- using NServiceBus.Logging;
- using ServiceControl.Infrastructure.Metrics;
-
- class MetricsReporterHostedService : IHostedService
- {
- readonly Metrics metrics;
- MetricsReporter reporter;
-
- public MetricsReporterHostedService(Metrics metrics) => this.metrics = metrics;
-
- public Task StartAsync(CancellationToken cancellationToken)
- {
- var metricsLog = LogManager.GetLogger("Metrics");
-
- reporter = new MetricsReporter(metrics, x => metricsLog.Info(x), TimeSpan.FromSeconds(5));
-
- reporter.Start();
-
- return Task.CompletedTask;
- }
-
- public Task StopAsync(CancellationToken cancellationToken) => reporter.Stop();
- }
-}
\ No newline at end of file
diff --git a/src/ServiceControl.Audit/Infrastructure/Metrics/MetricsServiceCollectionExtensions.cs b/src/ServiceControl.Audit/Infrastructure/Metrics/MetricsServiceCollectionExtensions.cs
deleted file mode 100644
index a7bc6b1dd5..0000000000
--- a/src/ServiceControl.Audit/Infrastructure/Metrics/MetricsServiceCollectionExtensions.cs
+++ /dev/null
@@ -1,14 +0,0 @@
-namespace ServiceControl.Audit.Infrastructure.Metrics
-{
- using Microsoft.Extensions.DependencyInjection;
- using ServiceControl.Infrastructure.Metrics;
-
- static class MetricsServiceCollectionExtensions
- {
- public static void AddMetrics(this IServiceCollection services, bool printMetrics)
- {
- services.AddSingleton(new Metrics { Enabled = printMetrics });
- services.AddHostedService();
- }
- }
-}
\ No newline at end of file
diff --git a/src/ServiceControl.Audit/Infrastructure/Settings/Settings.cs b/src/ServiceControl.Audit/Infrastructure/Settings/Settings.cs
index 183b695555..d6a84f0662 100644
--- a/src/ServiceControl.Audit/Infrastructure/Settings/Settings.cs
+++ b/src/ServiceControl.Audit/Infrastructure/Settings/Settings.cs
@@ -109,6 +109,7 @@ public string RootUrl
public int Port { get; set; }
public bool PrintMetrics => SettingsReader.Read(SettingsRootNamespace, "PrintMetrics");
+ public string OtelMetricsUrl { get; set; } = SettingsReader.Read(SettingsRootNamespace, nameof(OtelMetricsUrl));
public string Hostname { get; private set; }
public string VirtualDirectory => SettingsReader.Read(SettingsRootNamespace, "VirtualDirectory", string.Empty);
diff --git a/src/ServiceControl.Audit/ServiceControl.Audit.csproj b/src/ServiceControl.Audit/ServiceControl.Audit.csproj
index 3303782e3f..8f41ba97b1 100644
--- a/src/ServiceControl.Audit/ServiceControl.Audit.csproj
+++ b/src/ServiceControl.Audit/ServiceControl.Audit.csproj
@@ -18,7 +18,6 @@
-
@@ -29,6 +28,8 @@
+
+
diff --git a/src/ServiceControl.Infrastructure/LoggingConfigurator.cs b/src/ServiceControl.Infrastructure/LoggingConfigurator.cs
index 6cfbf98e54..7a041434fc 100644
--- a/src/ServiceControl.Infrastructure/LoggingConfigurator.cs
+++ b/src/ServiceControl.Infrastructure/LoggingConfigurator.cs
@@ -22,7 +22,7 @@ public static void ConfigureLogging(LoggingSettings loggingSettings)
}
var nlogConfig = new LoggingConfiguration();
- var simpleLayout = new SimpleLayout("${longdate}|${threadid}|${level}|${logger}|${message}${onexception:|${exception:format=tostring}}");
+ var simpleLayout = new SimpleLayout("${longdate}|${processtime}|${threadid:padding=2}|${level:padding=5}|${logger:padding=70}|${message}${onexception:|${exception:format=tostring}}");
var fileTarget = new FileTarget
{