Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Investigation hanging audit instance #4758

Draft
wants to merge 23 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
eb6e9dc
Emit metrics via otel instead of custom format
andreasohlund Jan 31, 2025
2d0880b
Minimize diff by reverting whitespace changes
ramonsmits Jan 31, 2025
9652aa0
Approvals
andreasohlund Jan 31, 2025
77b2c2c
Fix formatting
andreasohlund Jan 31, 2025
03690a0
Apply suggestions from code review
andreasohlund Jan 31, 2025
14c3474
fix formatting
ramonsmits Jan 31, 2025
7f37e53
Only use standar otel
andreasohlund Feb 2, 2025
7616151
Set instance id
andreasohlund Feb 2, 2025
e6d1f40
Set unit
andreasohlund Feb 3, 2025
88d2e71
Better metrics names
andreasohlund Feb 3, 2025
da4c35b
Emit body size
andreasohlund Feb 3, 2025
786559b
Fix meter name
ramonsmits Feb 3, 2025
a2011fc
Log that OpenTelemetry metrics exporter is enabled
ramonsmits Feb 3, 2025
3835af4
Added logging to diagnose hanging ingestion
ramonsmits Feb 3, 2025
6a26fdc
NLog layout: Added processtime and padding to threadid, level, and l…
ramonsmits Feb 3, 2025
24c4ef3
Incorrectly handled OCE can result in hanging ingestion
ramonsmits Feb 4, 2025
d8d5730
New cancellation token allows further improvement to reduce shutdown …
ramonsmits Feb 4, 2025
251f167
Moved TrySetResult to loop so that both TrySetResult and TrySetExcept…
ramonsmits Feb 5, 2025
4f78bdf
Improve message
ramonsmits Feb 5, 2025
4c00e6e
Log as INF/WRN/ERR when processing duration exceeds their correspondi…
ramonsmits Feb 5, 2025
76f26ec
Report sequential failure count and timestamp of last success
ramonsmits Feb 5, 2025
4c1b13c
fixup! Moved TrySetResult to loop so that both TrySetResult and TrySe…
ramonsmits Feb 6, 2025
a2f471e
fixup! Report sequential failure count and timestamp of last success
ramonsmits Feb 6, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
<PackageVersion Include="NUnit" Version="4.3.2" />
<PackageVersion Include="NUnit.Analyzers" Version="4.6.0" />
<PackageVersion Include="NUnit3TestAdapter" Version="4.6.0" />
<PackageVersion Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.9.0" />
<PackageVersion Include="OpenTelemetry.Extensions.Hosting" Version="1.9.0" />
<PackageVersion Include="Particular.Approvals" Version="2.0.1" />
<PackageVersion Include="Particular.Licensing.Sources" Version="6.0.1" />
<PackageVersion Include="Particular.LicensingComponent.Report" Version="1.0.0" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ async Task InitializeServiceControl(ScenarioContext context)
{
var id = messageContext.NativeMessageId;
var headers = messageContext.Headers;

var log = NServiceBus.Logging.LogManager.GetLogger<ServiceControlComponentRunner>();
headers.TryGetValue(Headers.MessageId, out var originalMessageId);
log.Debug($"OnMessage for message '{id}'({originalMessageId ?? string.Empty}).");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
namespace ServiceControl.Audit.Persistence.InMemory
{
using System.Threading;
using System.Threading.Tasks;
using ServiceControl.Audit.Auditing.BodyStorage;
using ServiceControl.Audit.Persistence.UnitOfWork;
Expand All @@ -12,7 +13,7 @@ public InMemoryAuditIngestionUnitOfWorkFactory(InMemoryAuditDataStore dataStore,
bodyStorageEnricher = new BodyStorageEnricher(bodyStorage, settings);
}

public ValueTask<IAuditIngestionUnitOfWork> StartNew(int batchSize)
public ValueTask<IAuditIngestionUnitOfWork> StartNew(int batchSize, CancellationToken cancellationToken)
{
//The batchSize argument is ignored: the in-memory storage implementation doesn't support batching.
return new ValueTask<IAuditIngestionUnitOfWork>(new InMemoryAuditIngestionUnitOfWork(dataStore, bodyStorageEnricher));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ class RavenAuditIngestionUnitOfWorkFactory(
MinimumRequiredStorageState customCheckState)
: IAuditIngestionUnitOfWorkFactory
{
public async ValueTask<IAuditIngestionUnitOfWork> StartNew(int batchSize)
public async ValueTask<IAuditIngestionUnitOfWork> StartNew(int batchSize, CancellationToken cancellationToken)
{
var timedCancellationSource = new CancellationTokenSource(databaseConfiguration.BulkInsertCommitTimeout);
var timedCancellationSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to have a comment here indicating the disposal happens as part of the unit of work

timedCancellationSource.CancelAfter(databaseConfiguration.BulkInsertCommitTimeout);
var bulkInsert = (await documentStoreProvider.GetDocumentStore(timedCancellationSource.Token))
.BulkInsert(new BulkInsertOptions { SkipOverwriteIfUnchanged = true, }, timedCancellationSource.Token);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
namespace ServiceControl.Audit.Persistence.UnitOfWork
{
using System.Threading;
using System.Threading.Tasks;

public interface IAuditIngestionUnitOfWorkFactory
{
ValueTask<IAuditIngestionUnitOfWork> StartNew(int batchSize); //Throws if not enough space or some other problem preventing from writing data
ValueTask<IAuditIngestionUnitOfWork> StartNew(int batchSize, CancellationToken cancellationToken = default); //Throws if not enough space or some other problem preventing from writing data
bool CanIngestMore();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"ApiUrl": "http://localhost:8888/api",
"Port": 8888,
"PrintMetrics": false,
"OtelMetricsUrl": null,
"Hostname": "localhost",
"VirtualDirectory": "",
"TransportType": "LearningTransport",
Expand Down
2 changes: 1 addition & 1 deletion src/ServiceControl.Audit/App.config
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ These settings are only here so that we can debug ServiceControl while developin
<add key="ServiceControl.Audit/ServiceControlQueueAddress" value="Particular.ServiceControl" />
<add key="ServiceControl.Audit/HostName" value="localhost" />
<add key="ServiceControl.Audit/DatabaseMaintenancePort" value="44445" />

<add key="ServiceControl.Audit/OtelMetricsUrl" value="http://localhost:4317" />
<!-- DEVS - Pick a transport to run Auditing instance on -->
<add key="ServiceControl.Audit/TransportType" value="LearningTransport" />
<!--<add key="ServiceControl.Audit/TransportType" value="AmazonSQS" />-->
Expand Down
130 changes: 79 additions & 51 deletions src/ServiceControl.Audit/Auditing/AuditIngestion.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.Metrics;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
Expand All @@ -14,18 +15,14 @@
using Persistence;
using Persistence.UnitOfWork;
using ServiceControl.Infrastructure;
using ServiceControl.Infrastructure.Metrics;
using Transports;

class AuditIngestion : IHostedService
{
static readonly long FrequencyInMilliseconds = Stopwatch.Frequency / 1000;

public AuditIngestion(
Settings settings,
ITransportCustomization transportCustomization,
TransportSettings transportSettings,
Metrics metrics,
IFailedAuditStorage failedImportsStorage,
AuditIngestionCustomCheck.State ingestionState,
AuditIngestor auditIngestor,
Expand All @@ -40,10 +37,6 @@ public AuditIngestion(
this.settings = settings;
this.applicationLifetime = applicationLifetime;

batchSizeMeter = metrics.GetMeter("Audit ingestion - batch size");
batchDurationMeter = metrics.GetMeter("Audit ingestion - batch processing duration", FrequencyInMilliseconds);
receivedMeter = metrics.GetCounter("Audit ingestion - received");

if (!transportSettings.MaxConcurrency.HasValue)
{
throw new ArgumentException("MaxConcurrency is not set in TransportSettings");
Expand All @@ -59,15 +52,27 @@ public AuditIngestion(

errorHandlingPolicy = new AuditIngestionFaultPolicy(failedImportsStorage, settings.LoggingSettings, OnCriticalError);

watchdog = new Watchdog("audit message ingestion", EnsureStarted, EnsureStopped, ingestionState.ReportError, ingestionState.Clear, settings.TimeToRestartAuditIngestionAfterFailure, logger);

ingestionWorker = Task.Run(() => Loop(), CancellationToken.None);
watchdog = new Watchdog(
"audit message ingestion",
EnsureStarted,
EnsureStopped,
ingestionState.ReportError,
ingestionState.Clear,
settings.TimeToRestartAuditIngestionAfterFailure,
logger
);
}

public Task StartAsync(CancellationToken _) => watchdog.Start(() => applicationLifetime.StopApplication());
public async Task StartAsync(CancellationToken cancellationToken)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andreasohlund @danielmarbach Alternative is to use BackgroundService.ExecuteAsync, assuming we can safely terminate ASAP:

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
    await watchdog.Start(() => applicationLifetime.StopApplication());
    await Loop(stoppingToken);
    // Intentionally not invoking the following to shut down ASAP
    // watchdog.Stop();
    // channel.Writer.Complete();
    // if (transportInfrastructure != null)
    // {
    //     await transportInfrastructure.Shutdown(stoppingToken);
    // }
}

As everything is at-least-once processing and idempotent this would be the fastest method to shutdow.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense to switch to a BackgroundService here and inline the loop into the execute method

Be aware though of dotnet/runtime#36063 and https://blog.stephencleary.com/2020/05/backgroundservice-gotcha-startup.html for more context

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I already tested that and it works. As the loop pretty much immediately does IO its not affected which is why I removed the Task.Run. I can restore it for safety or add a code comment. Any preference @danielmarbach ?

{
stopSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
ingestionWorker = Loop(stopSource.Token);
await watchdog.Start(() => applicationLifetime.StopApplication());
}

public async Task StopAsync(CancellationToken cancellationToken)
{
await stopSource.CancelAsync();
await watchdog.Stop();
channel.Writer.Complete();
await ingestionWorker;
Expand Down Expand Up @@ -102,6 +107,7 @@ async Task EnsureStarted(CancellationToken cancellationToken = default)
await stoppable.StopReceive(cancellationToken);
logger.Info("Shutting down due to failed persistence health check. Infrastructure shut down completed");
}

return;
}

Expand All @@ -123,7 +129,7 @@ async Task EnsureStarted(CancellationToken cancellationToken = default)

queueIngestor = transportInfrastructure.Receivers[inputEndpoint];

await auditIngestor.VerifyCanReachForwardingAddress();
await auditIngestor.VerifyCanReachForwardingAddress(cancellationToken);

await queueIngestor.StartReceive(cancellationToken);

Expand Down Expand Up @@ -168,6 +174,7 @@ async Task EnsureStopped(CancellationToken cancellationToken = default)
logger.Info("Shutting down. Already stopped, skipping shut down");
return; //Already stopped
}

var stoppable = queueIngestor;
queueIngestor = null;
logger.Info("Shutting down. Infrastructure shut down commencing");
Expand Down Expand Up @@ -196,63 +203,82 @@ async Task OnMessage(MessageContext messageContext, CancellationToken cancellati
var taskCompletionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
messageContext.SetTaskCompletionSource(taskCompletionSource);

receivedMeter.Mark();
receivedAudits.Add(1);

await channel.Writer.WriteAsync(messageContext, cancellationToken);
await taskCompletionSource.Task;
}

async Task Loop()
async Task Loop(CancellationToken cancellationToken)
{
var contexts = new List<MessageContext>(transportSettings.MaxConcurrency.Value);

while (await channel.Reader.WaitToReadAsync())
try
{
// will only enter here if there is something to read.
try
var contexts = new List<MessageContext>(transportSettings.MaxConcurrency.Value);

while (await channel.Reader.WaitToReadAsync(cancellationToken))
{
// as long as there is something to read this will fetch up to MaximumConcurrency items
while (channel.Reader.TryRead(out var context))
// will only enter here if there is something to read.
try
{
contexts.Add(context);
// as long as there is something to read this will fetch up to MaximumConcurrency items
while (channel.Reader.TryRead(out var context))
{
contexts.Add(context);
auditMessageSize.Record(context.Body.Length / 1024D);
}

auditBatchSize.Record(contexts.Count);
var sw = Stopwatch.StartNew();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not relevant for this PR but we should be using a ValueStopWatch here

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


await auditIngestor.Ingest(contexts, cancellationToken);
auditBatchDuration.Record(sw.ElapsedMilliseconds);
}

batchSizeMeter.Mark(contexts.Count);
using (batchDurationMeter.Measure())
catch (OperationCanceledException e) when (e.CancellationToken == cancellationToken)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andreasohlund please review todays commits seperately and then combined.

{
await auditIngestor.Ingest(contexts);
logger.Debug("Cancelled by host");
return; // No point in continueing as WaitToReadAsync will throw OCE
}
}
catch (OperationCanceledException)
{
//Do nothing as we are shutting down
continue;
}
catch (Exception e) // show must go on
{
if (logger.IsInfoEnabled)
catch (Exception e) // show must go on
{
logger.Info("Ingesting messages failed", e);
if (logger.IsInfoEnabled)
{
logger.Info("Ingesting messages failed", e);
}

// signal all message handling tasks to terminate
foreach (var context in contexts)
{
if (!context.GetTaskCompletionSource().TrySetException(e))
{
logger.Error("Loop TrySetException failed");
}
}
}

// signal all message handling tasks to terminate
foreach (var context in contexts)
finally
{
context.GetTaskCompletionSource().TrySetException(e);
contexts.Clear();
}
}
finally
{
contexts.Clear();
}
// will fall out here when writer is completed
}
catch (OperationCanceledException e) when (e.CancellationToken == cancellationToken)
{
logger.Debug("Cancelled by host");
}
catch (Exception e)
{
// Might the next exception scope throw an exception, consider this fatal as that cannot be an OCE
logger.Fatal("Loop interrupted", e);
applicationLifetime.StopApplication();
throw;
}
// will fall out here when writer is completed
}

TransportInfrastructure transportInfrastructure;
IMessageReceiver queueIngestor;
Task ingestionWorker;

readonly SemaphoreSlim startStopSemaphore = new SemaphoreSlim(1);
readonly SemaphoreSlim startStopSemaphore = new(1);
readonly string inputEndpoint;
readonly ITransportCustomization transportCustomization;
readonly TransportSettings transportSettings;
Expand All @@ -261,13 +287,15 @@ async Task Loop()
readonly IAuditIngestionUnitOfWorkFactory unitOfWorkFactory;
readonly Settings settings;
readonly Channel<MessageContext> channel;
readonly Meter batchSizeMeter;
readonly Meter batchDurationMeter;
readonly Counter receivedMeter;
readonly Histogram<long> auditBatchSize = AuditMetrics.Meter.CreateHistogram<long>($"{AuditMetrics.Prefix}.batch_size_audits");
readonly Histogram<double> auditBatchDuration = AuditMetrics.Meter.CreateHistogram<double>($"{AuditMetrics.Prefix}.batch_duration_audits", unit: "ms");
readonly Histogram<double> auditMessageSize = AuditMetrics.Meter.CreateHistogram<double>($"{AuditMetrics.Prefix}.audit_message_size", unit: "kilobytes");
readonly Counter<long> receivedAudits = AuditMetrics.Meter.CreateCounter<long>($"{AuditMetrics.Prefix}.received_audits");
readonly Watchdog watchdog;
readonly Task ingestionWorker;
readonly IHostApplicationLifetime applicationLifetime;

CancellationTokenSource stopSource;

static readonly ILog logger = LogManager.GetLogger<AuditIngestion>();
}
}
Loading
Loading