Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Emit metrics via otel instead of custom format #4762

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
2 changes: 2 additions & 0 deletions src/Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
<PackageVersion Include="NUnit" Version="4.3.2" />
<PackageVersion Include="NUnit.Analyzers" Version="4.6.0" />
<PackageVersion Include="NUnit3TestAdapter" Version="4.6.0" />
<PackageVersion Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.9.0" />
<PackageVersion Include="OpenTelemetry.Extensions.Hosting" Version="1.9.0" />
<PackageVersion Include="Particular.Approvals" Version="2.0.1" />
<PackageVersion Include="Particular.Licensing.Sources" Version="6.0.1" />
<PackageVersion Include="Particular.LicensingComponent.Report" Version="1.0.0" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ async Task InitializeServiceControl(ScenarioContext context)
{
var id = messageContext.NativeMessageId;
var headers = messageContext.Headers;

var log = NServiceBus.Logging.LogManager.GetLogger<ServiceControlComponentRunner>();
headers.TryGetValue(Headers.MessageId, out var originalMessageId);
log.Debug($"OnMessage for message '{id}'({originalMessageId ?? string.Empty}).");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"ApiUrl": "http://localhost:8888/api",
"Port": 8888,
"PrintMetrics": false,
"OtelMetricsUrl": null,
"Hostname": "localhost",
"VirtualDirectory": "",
"TransportType": "LearningTransport",
Expand Down
2 changes: 1 addition & 1 deletion src/ServiceControl.Audit/App.config
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ These settings are only here so that we can debug ServiceControl while developin
<add key="ServiceControl.Audit/ServiceControlQueueAddress" value="Particular.ServiceControl" />
<add key="ServiceControl.Audit/HostName" value="localhost" />
<add key="ServiceControl.Audit/DatabaseMaintenancePort" value="44445" />

<add key="ServiceControl.Audit/OtelMetricsUrl" value="http://localhost:4317" />
<!-- DEVS - Pick a transport to run Auditing instance on -->
<add key="ServiceControl.Audit/TransportType" value="LearningTransport" />
<!--<add key="ServiceControl.Audit/TransportType" value="AmazonSQS" />-->
Expand Down
33 changes: 15 additions & 18 deletions src/ServiceControl.Audit/Auditing/AuditIngestion.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.Metrics;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
Expand All @@ -14,18 +15,14 @@
using Persistence;
using Persistence.UnitOfWork;
using ServiceControl.Infrastructure;
using ServiceControl.Infrastructure.Metrics;
using Transports;

class AuditIngestion : IHostedService
{
static readonly long FrequencyInMilliseconds = Stopwatch.Frequency / 1000;

public AuditIngestion(
Settings settings,
ITransportCustomization transportCustomization,
TransportSettings transportSettings,
Metrics metrics,
IFailedAuditStorage failedImportsStorage,
AuditIngestionCustomCheck.State ingestionState,
AuditIngestor auditIngestor,
Expand All @@ -40,10 +37,6 @@ public AuditIngestion(
this.settings = settings;
this.applicationLifetime = applicationLifetime;

batchSizeMeter = metrics.GetMeter("Audit ingestion - batch size");
batchDurationMeter = metrics.GetMeter("Audit ingestion - batch processing duration", FrequencyInMilliseconds);
receivedMeter = metrics.GetCounter("Audit ingestion - received");

if (!transportSettings.MaxConcurrency.HasValue)
{
throw new ArgumentException("MaxConcurrency is not set in TransportSettings");
Expand Down Expand Up @@ -102,6 +95,7 @@ async Task EnsureStarted(CancellationToken cancellationToken = default)
await stoppable.StopReceive(cancellationToken);
logger.Info("Shutting down due to failed persistence health check. Infrastructure shut down completed");
}

return;
}

Expand Down Expand Up @@ -168,6 +162,7 @@ async Task EnsureStopped(CancellationToken cancellationToken = default)
logger.Info("Shutting down. Already stopped, skipping shut down");
return; //Already stopped
}

var stoppable = queueIngestor;
queueIngestor = null;
logger.Info("Shutting down. Infrastructure shut down commencing");
Expand Down Expand Up @@ -196,7 +191,7 @@ async Task OnMessage(MessageContext messageContext, CancellationToken cancellati
var taskCompletionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
messageContext.SetTaskCompletionSource(taskCompletionSource);

receivedMeter.Mark();
receivedAudits.Add(1);

await channel.Writer.WriteAsync(messageContext, cancellationToken);
await taskCompletionSource.Task;
Expand All @@ -215,13 +210,14 @@ async Task Loop()
while (channel.Reader.TryRead(out var context))
{
contexts.Add(context);
auditMessageSize.Record(context.Body.Length / 1024.0);
}

batchSizeMeter.Mark(contexts.Count);
using (batchDurationMeter.Measure())
{
await auditIngestor.Ingest(contexts);
}
auditBatchSize.Record(contexts.Count);
var sw = Stopwatch.StartNew();

await auditIngestor.Ingest(contexts);
auditBatchDuration.Record(sw.ElapsedMilliseconds);
}
catch (OperationCanceledException)
{
Expand Down Expand Up @@ -252,7 +248,7 @@ async Task Loop()
TransportInfrastructure transportInfrastructure;
IMessageReceiver queueIngestor;

readonly SemaphoreSlim startStopSemaphore = new SemaphoreSlim(1);
readonly SemaphoreSlim startStopSemaphore = new(1);
readonly string inputEndpoint;
readonly ITransportCustomization transportCustomization;
readonly TransportSettings transportSettings;
Expand All @@ -261,9 +257,10 @@ async Task Loop()
readonly IAuditIngestionUnitOfWorkFactory unitOfWorkFactory;
readonly Settings settings;
readonly Channel<MessageContext> channel;
readonly Meter batchSizeMeter;
readonly Meter batchDurationMeter;
readonly Counter receivedMeter;
readonly Histogram<long> auditBatchSize = AuditMetrics.Meter.CreateHistogram<long>($"{AuditMetrics.Prefix}.batch_size_audits");
andreasohlund marked this conversation as resolved.
Show resolved Hide resolved
readonly Histogram<double> auditBatchDuration = AuditMetrics.Meter.CreateHistogram<double>($"{AuditMetrics.Prefix}.batch_duration_audits", unit: "ms");
readonly Histogram<double> auditMessageSize = AuditMetrics.Meter.CreateHistogram<double>($"{AuditMetrics.Prefix}.audit_message_size", unit: "kilobytes");
readonly Counter<long> receivedAudits = AuditMetrics.Meter.CreateCounter<long>($"{AuditMetrics.Prefix}.received_audits");
andreasohlund marked this conversation as resolved.
Show resolved Hide resolved
readonly Watchdog watchdog;
readonly Task ingestionWorker;
readonly IHostApplicationLifetime applicationLifetime;
Expand Down
29 changes: 8 additions & 21 deletions src/ServiceControl.Audit/Auditing/AuditIngestor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using Infrastructure.Settings;
Expand All @@ -14,13 +13,11 @@
using Persistence.UnitOfWork;
using Recoverability;
using SagaAudit;
using ServiceControl.Infrastructure.Metrics;
using ServiceControl.Transports;

public class AuditIngestor
{
public AuditIngestor(
Metrics metrics,
Settings settings,
IAuditIngestionUnitOfWorkFactory unitOfWorkFactory,
EndpointInstanceMonitoring endpointInstanceMonitoring,
Expand All @@ -32,26 +29,16 @@ ITransportCustomization transportCustomization
{
this.settings = settings;
this.messageDispatcher = messageDispatcher;

var ingestedAuditMeter = metrics.GetCounter("Audit ingestion - ingested audit");
var ingestedSagaAuditMeter = metrics.GetCounter("Audit ingestion - ingested saga audit");
var auditBulkInsertDurationMeter = metrics.GetMeter("Audit ingestion - audit bulk insert duration", FrequencyInMilliseconds);
var sagaAuditBulkInsertDurationMeter = metrics.GetMeter("Audit ingestion - saga audit bulk insert duration", FrequencyInMilliseconds);
var bulkInsertCommitDurationMeter = metrics.GetMeter("Audit ingestion - bulk insert commit duration", FrequencyInMilliseconds);

var enrichers = new IEnrichImportedAuditMessages[]
{
new MessageTypeEnricher(),
new EnrichWithTrackingIds(),
new ProcessingStatisticsEnricher(),
new DetectNewEndpointsFromAuditImportsEnricher(endpointInstanceMonitoring),
new DetectSuccessfulRetriesEnricher(),
new SagaRelationshipsEnricher()
}.Concat(auditEnrichers).ToArray();
var enrichers = new IEnrichImportedAuditMessages[] { new MessageTypeEnricher(), new EnrichWithTrackingIds(), new ProcessingStatisticsEnricher(), new DetectNewEndpointsFromAuditImportsEnricher(endpointInstanceMonitoring), new DetectSuccessfulRetriesEnricher(), new SagaRelationshipsEnricher() }.Concat(auditEnrichers).ToArray();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andreasohlund update your Rider formatting so that this will not be put on a single line


logQueueAddress = transportCustomization.ToTransportQualifiedQueueName(settings.AuditLogQueue);

auditPersister = new AuditPersister(unitOfWorkFactory, enrichers, ingestedAuditMeter, ingestedSagaAuditMeter, auditBulkInsertDurationMeter, sagaAuditBulkInsertDurationMeter, bulkInsertCommitDurationMeter, messageSession, messageDispatcher);
auditPersister = new AuditPersister(
unitOfWorkFactory,
enrichers,
messageSession,
messageDispatcher
);
}

public async Task Ingest(List<MessageContext> contexts)
Expand All @@ -71,6 +58,7 @@ public async Task Ingest(List<MessageContext> contexts)
{
Log.Debug($"Forwarding {stored.Count} messages");
}

await Forward(stored, logQueueAddress);
if (Log.IsDebugEnabled)
{
Expand Down Expand Up @@ -159,7 +147,6 @@ public async Task VerifyCanReachForwardingAddress()
readonly Lazy<IMessageDispatcher> messageDispatcher;
readonly string logQueueAddress;

static readonly long FrequencyInMilliseconds = Stopwatch.Frequency / 1000;
static readonly ILog Log = LogManager.GetLogger<AuditIngestor>();
}
}
66 changes: 22 additions & 44 deletions src/ServiceControl.Audit/Auditing/AuditPersister.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.Metrics;
using System.Text.Json;
using System.Threading.Tasks;
using Infrastructure;
Expand All @@ -15,29 +16,13 @@
using ServiceControl.Audit.Persistence.Monitoring;
using ServiceControl.EndpointPlugin.Messages.SagaState;
using ServiceControl.Infrastructure;
using ServiceControl.Infrastructure.Metrics;
using ServiceControl.SagaAudit;

class AuditPersister
class AuditPersister(IAuditIngestionUnitOfWorkFactory unitOfWorkFactory,
IEnrichImportedAuditMessages[] enrichers,
IMessageSession messageSession,
Lazy<IMessageDispatcher> messageDispatcher)
{
public AuditPersister(IAuditIngestionUnitOfWorkFactory unitOfWorkFactory,
IEnrichImportedAuditMessages[] enrichers,
Counter ingestedAuditMeter, Counter ingestedSagaAuditMeter, Meter auditBulkInsertDurationMeter,
Meter sagaAuditBulkInsertDurationMeter, Meter bulkInsertCommitDurationMeter, IMessageSession messageSession,
Lazy<IMessageDispatcher> messageDispatcher)
{
this.unitOfWorkFactory = unitOfWorkFactory;
this.enrichers = enrichers;

this.ingestedAuditMeter = ingestedAuditMeter;
this.ingestedSagaAuditMeter = ingestedSagaAuditMeter;
this.auditBulkInsertDurationMeter = auditBulkInsertDurationMeter;
this.sagaAuditBulkInsertDurationMeter = sagaAuditBulkInsertDurationMeter;
this.bulkInsertCommitDurationMeter = bulkInsertCommitDurationMeter;
this.messageSession = messageSession;
this.messageDispatcher = messageDispatcher;
}

public async Task<IReadOnlyList<MessageContext>> Persist(IReadOnlyList<MessageContext> contexts)
{
var stopwatch = Stopwatch.StartNew();
Expand All @@ -51,7 +36,6 @@ public async Task<IReadOnlyList<MessageContext>> Persist(IReadOnlyList<MessageCo
IAuditIngestionUnitOfWork unitOfWork = null;
try
{

// deliberately not using the using statement because we dispose async explicitly
unitOfWork = await unitOfWorkFactory.StartNew(contexts.Count);
var inserts = new List<Task>(contexts.Count);
Expand Down Expand Up @@ -89,12 +73,11 @@ public async Task<IReadOnlyList<MessageContext>> Persist(IReadOnlyList<MessageCo
Logger.Debug("Adding audit message for bulk storage");
}

using (auditBulkInsertDurationMeter.Measure())
{
await unitOfWork.RecordProcessedMessage(processedMessage, context.Body);
}
var auditSw = Stopwatch.StartNew();
await unitOfWork.RecordProcessedMessage(processedMessage, context.Body);
auditBulkInsertDuration.Record(auditSw.ElapsedMilliseconds);
Comment on lines +76 to +78
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a lot of things done in this method. Even multiple measurements.
I think we should consider doing the measures in the unitOfWork methods itself. The measurement is then simply that whole method and isn't making this method even worse then it already is :-)


ingestedAuditMeter.Mark();
storedAudits.Add(1);
}
else if (context.Extensions.TryGet(out SagaSnapshot sagaSnapshot))
{
Expand All @@ -103,12 +86,11 @@ public async Task<IReadOnlyList<MessageContext>> Persist(IReadOnlyList<MessageCo
Logger.Debug("Adding SagaSnapshot message for bulk storage");
}

using (sagaAuditBulkInsertDurationMeter.Measure())
{
await unitOfWork.RecordSagaSnapshot(sagaSnapshot);
}
var sagaSw = Stopwatch.StartNew();
await unitOfWork.RecordSagaSnapshot(sagaSnapshot);
sagaAuditBulkInsertDuration.Record(sagaSw.ElapsedMilliseconds);

ingestedSagaAuditMeter.Mark();
storedSagas.Add(1);
}

storedContexts.Add(context);
Expand Down Expand Up @@ -146,10 +128,9 @@ public async Task<IReadOnlyList<MessageContext>> Persist(IReadOnlyList<MessageCo
try
{
// this can throw even though dispose is never supposed to throw
using (bulkInsertCommitDurationMeter.Measure())
{
await unitOfWork.DisposeAsync();
}
var commitSw = Stopwatch.StartNew();
await unitOfWork.DisposeAsync();
auditCommitDuration.Record(commitSw.ElapsedMilliseconds);
}
catch (Exception e)
{
Expand Down Expand Up @@ -263,6 +244,7 @@ async Task ProcessAuditMessage(MessageContext context)
{
Logger.Debug($"Emitting {commandsToEmit.Count} commands and {messagesToEmit.Count} control messages.");
}

foreach (var commandToEmit in commandsToEmit)
{
await messageSession.Send(commandToEmit);
Expand Down Expand Up @@ -301,16 +283,12 @@ await messageDispatcher.Value.Dispatch(new TransportOperations(messagesToEmit.To
}
}

readonly Counter ingestedAuditMeter;
readonly Counter ingestedSagaAuditMeter;
readonly Meter auditBulkInsertDurationMeter;
readonly Meter sagaAuditBulkInsertDurationMeter;
readonly Meter bulkInsertCommitDurationMeter;
readonly IMessageSession messageSession;
readonly Lazy<IMessageDispatcher> messageDispatcher;
readonly Counter<long> storedAudits = AuditMetrics.Meter.CreateCounter<long>($"{AuditMetrics.Prefix}.stored_audit_messages");
andreasohlund marked this conversation as resolved.
Show resolved Hide resolved
readonly Counter<long> storedSagas = AuditMetrics.Meter.CreateCounter<long>($"{AuditMetrics.Prefix}.stored_saga_audits");
readonly Histogram<double> auditBulkInsertDuration = AuditMetrics.Meter.CreateHistogram<double>($"{AuditMetrics.Prefix}.bulk_insert_duration_audits", unit: "ms");
readonly Histogram<double> sagaAuditBulkInsertDuration = AuditMetrics.Meter.CreateHistogram<double>($"{AuditMetrics.Prefix}.bulk_insert_duration_sagas", unit: "ms");
readonly Histogram<double> auditCommitDuration = AuditMetrics.Meter.CreateHistogram<double>($"{AuditMetrics.Prefix}.commit_duration_audits", unit: "ms");
andreasohlund marked this conversation as resolved.
Show resolved Hide resolved

readonly IEnrichImportedAuditMessages[] enrichers;
readonly IAuditIngestionUnitOfWorkFactory unitOfWorkFactory;
static readonly ILog Logger = LogManager.GetLogger<AuditPersister>();
}
}
Loading