Skip to content

Commit

Permalink
Added logging to diagnose hanging ingestion
Browse files Browse the repository at this point in the history
  • Loading branch information
ramonsmits committed Feb 3, 2025
1 parent da4c35b commit c8c16dd
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 8 deletions.
30 changes: 25 additions & 5 deletions src/ServiceControl.Audit/Auditing/AuditIngestion.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public AuditIngestion(

watchdog = new Watchdog("audit message ingestion", EnsureStarted, EnsureStopped, ingestionState.ReportError, ingestionState.Clear, settings.TimeToRestartAuditIngestionAfterFailure, logger);

ingestionWorker = Task.Run(() => Loop(), CancellationToken.None);
ingestionWorker = Task.Run(() => LoopWithTryCatch(), CancellationToken.None);
}

public Task StartAsync(CancellationToken _) => watchdog.Start(() => applicationLifetime.StopApplication());
Expand Down Expand Up @@ -197,6 +197,21 @@ async Task OnMessage(MessageContext messageContext, CancellationToken cancellati
await taskCompletionSource.Task;
}

async Task LoopWithTryCatch()
{
// TODO: Done to prevent conflicts with Otel branch, needs to becombine with Loop when merging to master
try
{
await Loop();
}
catch (Exception e)
{
logger.Fatal("Loop interrupted", e);
applicationLifetime.StopApplication();
throw;
}
}

async Task Loop()
{
var contexts = new List<MessageContext>(transportSettings.MaxConcurrency.Value);
Expand All @@ -219,10 +234,12 @@ async Task Loop()
await auditIngestor.Ingest(contexts);
auditBatchDuration.Record(sw.ElapsedMilliseconds);
}
catch (OperationCanceledException)
catch (OperationCanceledException e)
{
//Do nothing as we are shutting down
continue;
logger.Info("Ingesting messages failed", e);
// continue loop, do nothing as we are shutting down
// TODO: Assumption here is that OCE equals a shutdown which is definitely not the case
// We likely need to invoke `TrySetException`
}
catch (Exception e) // show must go on
{
Expand All @@ -234,7 +251,10 @@ async Task Loop()
// signal all message handling tasks to terminate
foreach (var context in contexts)
{
context.GetTaskCompletionSource().TrySetException(e);
if (!context.GetTaskCompletionSource().TrySetException(e))
{
logger.Error("Loop TrySetException failed");
}
}
}
finally
Expand Down
5 changes: 4 additions & 1 deletion src/ServiceControl.Audit/Auditing/AuditIngestor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,10 @@ public async Task Ingest(List<MessageContext> contexts)

foreach (var context in contexts)
{
context.GetTaskCompletionSource().TrySetResult(true);
if (!context.GetTaskCompletionSource().TrySetResult(true))
{
Log.Warn("TrySetResult failed");
}
}
}
catch (Exception e)
Expand Down
10 changes: 8 additions & 2 deletions src/ServiceControl.Audit/Auditing/AuditPersister.cs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,10 @@ void ProcessSagaAuditMessage(MessageContext context)
}

// releasing the failed message context early so that they can be retried outside the current batch
context.GetTaskCompletionSource().TrySetException(e);
if (!context.GetTaskCompletionSource().TrySetException(e))
{
Logger.Warn("ProcessSagaAuditMessage TrySetException failed");
}
}
}

Expand Down Expand Up @@ -279,7 +282,10 @@ await messageDispatcher.Value.Dispatch(new TransportOperations(messagesToEmit.To
}

// releasing the failed message context early so that they can be retried outside the current batch
context.GetTaskCompletionSource().TrySetException(e);
if (!context.GetTaskCompletionSource().TrySetException(e))
{
Logger.Warn("ProcessAuditMessage TrySetException failed");
}
}
}

Expand Down

0 comments on commit c8c16dd

Please sign in to comment.