-
Notifications
You must be signed in to change notification settings - Fork 48
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Investigation hanging audit instance #4758
base: master
Are you sure you want to change the base?
Changes from 17 commits
eb6e9dc
2d0880b
9652aa0
77b2c2c
03690a0
14c3474
7f37e53
7616151
e6d1f40
88d2e71
da4c35b
786559b
a2011fc
3835af4
6a26fdc
24c4ef3
d8d5730
251f167
4f78bdf
4c00e6e
76f26ec
4c1b13c
a2f471e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,10 +1,11 @@ | ||
namespace ServiceControl.Audit.Persistence.UnitOfWork | ||
{ | ||
using System.Threading; | ||
using System.Threading.Tasks; | ||
|
||
public interface IAuditIngestionUnitOfWorkFactory | ||
{ | ||
ValueTask<IAuditIngestionUnitOfWork> StartNew(int batchSize); //Throws if not enough space or some other problem preventing from writing data | ||
ValueTask<IAuditIngestionUnitOfWork> StartNew(int batchSize, CancellationToken cancellationToken = default); //Throws if not enough space or some other problem preventing from writing data | ||
bool CanIngestMore(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,6 +3,7 @@ | |
using System; | ||
using System.Collections.Generic; | ||
using System.Diagnostics; | ||
using System.Diagnostics.Metrics; | ||
using System.Threading; | ||
using System.Threading.Channels; | ||
using System.Threading.Tasks; | ||
|
@@ -14,18 +15,14 @@ | |
using Persistence; | ||
using Persistence.UnitOfWork; | ||
using ServiceControl.Infrastructure; | ||
using ServiceControl.Infrastructure.Metrics; | ||
using Transports; | ||
|
||
class AuditIngestion : IHostedService | ||
{ | ||
static readonly long FrequencyInMilliseconds = Stopwatch.Frequency / 1000; | ||
|
||
public AuditIngestion( | ||
Settings settings, | ||
ITransportCustomization transportCustomization, | ||
TransportSettings transportSettings, | ||
Metrics metrics, | ||
IFailedAuditStorage failedImportsStorage, | ||
AuditIngestionCustomCheck.State ingestionState, | ||
AuditIngestor auditIngestor, | ||
|
@@ -40,10 +37,6 @@ public AuditIngestion( | |
this.settings = settings; | ||
this.applicationLifetime = applicationLifetime; | ||
|
||
batchSizeMeter = metrics.GetMeter("Audit ingestion - batch size"); | ||
batchDurationMeter = metrics.GetMeter("Audit ingestion - batch processing duration", FrequencyInMilliseconds); | ||
receivedMeter = metrics.GetCounter("Audit ingestion - received"); | ||
|
||
if (!transportSettings.MaxConcurrency.HasValue) | ||
{ | ||
throw new ArgumentException("MaxConcurrency is not set in TransportSettings"); | ||
|
@@ -59,15 +52,27 @@ public AuditIngestion( | |
|
||
errorHandlingPolicy = new AuditIngestionFaultPolicy(failedImportsStorage, settings.LoggingSettings, OnCriticalError); | ||
|
||
watchdog = new Watchdog("audit message ingestion", EnsureStarted, EnsureStopped, ingestionState.ReportError, ingestionState.Clear, settings.TimeToRestartAuditIngestionAfterFailure, logger); | ||
|
||
ingestionWorker = Task.Run(() => Loop(), CancellationToken.None); | ||
watchdog = new Watchdog( | ||
"audit message ingestion", | ||
EnsureStarted, | ||
EnsureStopped, | ||
ingestionState.ReportError, | ||
ingestionState.Clear, | ||
settings.TimeToRestartAuditIngestionAfterFailure, | ||
logger | ||
); | ||
} | ||
|
||
public Task StartAsync(CancellationToken _) => watchdog.Start(() => applicationLifetime.StopApplication()); | ||
public async Task StartAsync(CancellationToken cancellationToken) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @andreasohlund @danielmarbach Alternative is to use BackgroundService.ExecuteAsync, assuming we can safely terminate ASAP: protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await watchdog.Start(() => applicationLifetime.StopApplication());
await Loop(stoppingToken);
// Intentionally not invoking the following to shut down ASAP
// watchdog.Stop();
// channel.Writer.Complete();
// if (transportInfrastructure != null)
// {
// await transportInfrastructure.Shutdown(stoppingToken);
// }
} As everything is at-least-once processing and idempotent this would be the fastest method to shutdow. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it makes sense to switch to a BackgroundService here and inline the loop into the execute method Be aware though of dotnet/runtime#36063 and https://blog.stephencleary.com/2020/05/backgroundservice-gotcha-startup.html for more context There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I already tested that and it works. As the loop pretty much immediately does IO its not affected which is why I removed the |
||
{ | ||
stopSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); | ||
ingestionWorker = Loop(stopSource.Token); | ||
await watchdog.Start(() => applicationLifetime.StopApplication()); | ||
} | ||
|
||
public async Task StopAsync(CancellationToken cancellationToken) | ||
{ | ||
await stopSource.CancelAsync(); | ||
await watchdog.Stop(); | ||
channel.Writer.Complete(); | ||
await ingestionWorker; | ||
|
@@ -102,6 +107,7 @@ async Task EnsureStarted(CancellationToken cancellationToken = default) | |
await stoppable.StopReceive(cancellationToken); | ||
logger.Info("Shutting down due to failed persistence health check. Infrastructure shut down completed"); | ||
} | ||
|
||
return; | ||
} | ||
|
||
|
@@ -123,7 +129,7 @@ async Task EnsureStarted(CancellationToken cancellationToken = default) | |
|
||
queueIngestor = transportInfrastructure.Receivers[inputEndpoint]; | ||
|
||
await auditIngestor.VerifyCanReachForwardingAddress(); | ||
await auditIngestor.VerifyCanReachForwardingAddress(cancellationToken); | ||
|
||
await queueIngestor.StartReceive(cancellationToken); | ||
|
||
|
@@ -168,6 +174,7 @@ async Task EnsureStopped(CancellationToken cancellationToken = default) | |
logger.Info("Shutting down. Already stopped, skipping shut down"); | ||
return; //Already stopped | ||
} | ||
|
||
var stoppable = queueIngestor; | ||
queueIngestor = null; | ||
logger.Info("Shutting down. Infrastructure shut down commencing"); | ||
|
@@ -196,63 +203,82 @@ async Task OnMessage(MessageContext messageContext, CancellationToken cancellati | |
var taskCompletionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously); | ||
messageContext.SetTaskCompletionSource(taskCompletionSource); | ||
|
||
receivedMeter.Mark(); | ||
receivedAudits.Add(1); | ||
|
||
await channel.Writer.WriteAsync(messageContext, cancellationToken); | ||
await taskCompletionSource.Task; | ||
} | ||
|
||
async Task Loop() | ||
async Task Loop(CancellationToken cancellationToken) | ||
{ | ||
var contexts = new List<MessageContext>(transportSettings.MaxConcurrency.Value); | ||
|
||
while (await channel.Reader.WaitToReadAsync()) | ||
try | ||
{ | ||
// will only enter here if there is something to read. | ||
try | ||
var contexts = new List<MessageContext>(transportSettings.MaxConcurrency.Value); | ||
|
||
while (await channel.Reader.WaitToReadAsync(cancellationToken)) | ||
{ | ||
// as long as there is something to read this will fetch up to MaximumConcurrency items | ||
while (channel.Reader.TryRead(out var context)) | ||
// will only enter here if there is something to read. | ||
try | ||
{ | ||
contexts.Add(context); | ||
// as long as there is something to read this will fetch up to MaximumConcurrency items | ||
while (channel.Reader.TryRead(out var context)) | ||
{ | ||
contexts.Add(context); | ||
auditMessageSize.Record(context.Body.Length / 1024D); | ||
} | ||
|
||
auditBatchSize.Record(contexts.Count); | ||
var sw = Stopwatch.StartNew(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not relevant for this PR but we should be using a ValueStopWatch here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is that included somewhere or do we copy https://github.com/dotnet/aspnetcore/blob/main/src/Shared/ValueStopwatch/ValueStopwatch.cs ? |
||
|
||
await auditIngestor.Ingest(contexts, cancellationToken); | ||
auditBatchDuration.Record(sw.ElapsedMilliseconds); | ||
} | ||
|
||
batchSizeMeter.Mark(contexts.Count); | ||
using (batchDurationMeter.Measure()) | ||
catch (OperationCanceledException e) when (e.CancellationToken == cancellationToken) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @andreasohlund please review todays commits seperately and then combined. |
||
{ | ||
await auditIngestor.Ingest(contexts); | ||
logger.Debug("Cancelled by host"); | ||
return; // No point in continueing as WaitToReadAsync will throw OCE | ||
} | ||
} | ||
catch (OperationCanceledException) | ||
{ | ||
//Do nothing as we are shutting down | ||
continue; | ||
} | ||
catch (Exception e) // show must go on | ||
{ | ||
if (logger.IsInfoEnabled) | ||
catch (Exception e) // show must go on | ||
{ | ||
logger.Info("Ingesting messages failed", e); | ||
if (logger.IsInfoEnabled) | ||
{ | ||
logger.Info("Ingesting messages failed", e); | ||
} | ||
|
||
// signal all message handling tasks to terminate | ||
foreach (var context in contexts) | ||
{ | ||
if (!context.GetTaskCompletionSource().TrySetException(e)) | ||
{ | ||
logger.Error("Loop TrySetException failed"); | ||
} | ||
} | ||
} | ||
|
||
// signal all message handling tasks to terminate | ||
foreach (var context in contexts) | ||
finally | ||
{ | ||
context.GetTaskCompletionSource().TrySetException(e); | ||
contexts.Clear(); | ||
} | ||
} | ||
finally | ||
{ | ||
contexts.Clear(); | ||
} | ||
// will fall out here when writer is completed | ||
} | ||
catch (OperationCanceledException e) when (e.CancellationToken == cancellationToken) | ||
{ | ||
logger.Debug("Cancelled by host"); | ||
} | ||
catch (Exception e) | ||
{ | ||
// Might the next exception scope throw an exception, consider this fatal as that cannot be an OCE | ||
logger.Fatal("Loop interrupted", e); | ||
applicationLifetime.StopApplication(); | ||
throw; | ||
} | ||
// will fall out here when writer is completed | ||
} | ||
|
||
TransportInfrastructure transportInfrastructure; | ||
IMessageReceiver queueIngestor; | ||
Task ingestionWorker; | ||
|
||
readonly SemaphoreSlim startStopSemaphore = new SemaphoreSlim(1); | ||
readonly SemaphoreSlim startStopSemaphore = new(1); | ||
readonly string inputEndpoint; | ||
readonly ITransportCustomization transportCustomization; | ||
readonly TransportSettings transportSettings; | ||
|
@@ -261,13 +287,15 @@ async Task Loop() | |
readonly IAuditIngestionUnitOfWorkFactory unitOfWorkFactory; | ||
readonly Settings settings; | ||
readonly Channel<MessageContext> channel; | ||
readonly Meter batchSizeMeter; | ||
readonly Meter batchDurationMeter; | ||
readonly Counter receivedMeter; | ||
readonly Histogram<long> auditBatchSize = AuditMetrics.Meter.CreateHistogram<long>($"{AuditMetrics.Prefix}.batch_size_audits"); | ||
readonly Histogram<double> auditBatchDuration = AuditMetrics.Meter.CreateHistogram<double>($"{AuditMetrics.Prefix}.batch_duration_audits", unit: "ms"); | ||
readonly Histogram<double> auditMessageSize = AuditMetrics.Meter.CreateHistogram<double>($"{AuditMetrics.Prefix}.audit_message_size", unit: "kilobytes"); | ||
readonly Counter<long> receivedAudits = AuditMetrics.Meter.CreateCounter<long>($"{AuditMetrics.Prefix}.received_audits"); | ||
readonly Watchdog watchdog; | ||
readonly Task ingestionWorker; | ||
readonly IHostApplicationLifetime applicationLifetime; | ||
|
||
CancellationTokenSource stopSource; | ||
|
||
static readonly ILog logger = LogManager.GetLogger<AuditIngestion>(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be good to have a comment here indicating the disposal happens as part of the unit of work