Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support cancellation of ingestion operations #4782

Draft
wants to merge 7 commits into
base: hanging-ingestion
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ protected override async Task ExecuteAsync(CancellationToken cancellationToken)
}
} while (await timer.WaitForNextTickAsync(cancellationToken));
}
catch (OperationCanceledException)
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
logger.LogInformation($"Stopping {nameof(AuditThroughputCollectorHostedService)}");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ static ReadOnlyDictionary<string, string> LoadBrokerSettingValues(IEnumerable<Ke
}
} while (await timer.WaitForNextTickAsync(stoppingToken));
}
catch (OperationCanceledException)
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
logger.LogInformation($"Stopping {nameof(BrokerThroughputCollectorHostedService)}");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,9 @@ public override async Task Stop(CancellationToken cancellationToken = default)
{
await checkTask;
}
catch (OperationCanceledException)
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
//Swallow
// Even though we are stopping, ONLY swallow when OCE from callee to not hide any ungraceful stop errors
}
finally
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace ServiceControl.AcceptanceTests.RavenDB.Recoverability.MessageFailures
{
using System;
using System.Threading;
using System.Threading.Tasks;
using AcceptanceTesting;
using AcceptanceTesting.EndpointTemplates;
Expand Down Expand Up @@ -75,7 +76,7 @@ public async Task It_can_be_reimported()

class MessageFailedHandler(MyContext scenarioContext) : IDomainHandler<MessageFailed>
{
public Task Handle(MessageFailed domainEvent)
public Task Handle(MessageFailed domainEvent, CancellationToken cancellationToken)
{
scenarioContext.MessageFailedEventPublished = true;
return Task.CompletedTask;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
namespace ServiceControl.Audit.Persistence.InMemory
{
using System.Threading;
using System.Threading.Tasks;
using ServiceControl.Audit.Auditing.BodyStorage;
using ServiceControl.Audit.Persistence.UnitOfWork;
Expand All @@ -12,7 +13,7 @@ public InMemoryAuditIngestionUnitOfWorkFactory(InMemoryAuditDataStore dataStore,
bodyStorageEnricher = new BodyStorageEnricher(bodyStorage, settings);
}

public ValueTask<IAuditIngestionUnitOfWork> StartNew(int batchSize)
public ValueTask<IAuditIngestionUnitOfWork> StartNew(int batchSize, CancellationToken cancellationToken)
{
//The batchSize argument is ignored: the in-memory storage implementation doesn't support batching.
return new ValueTask<IAuditIngestionUnitOfWork>(new InMemoryAuditIngestionUnitOfWork(dataStore, bodyStorageEnricher));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,21 @@ class RavenAuditIngestionUnitOfWorkFactory(
MinimumRequiredStorageState customCheckState)
: IAuditIngestionUnitOfWorkFactory
{
public async ValueTask<IAuditIngestionUnitOfWork> StartNew(int batchSize)
public async ValueTask<IAuditIngestionUnitOfWork> StartNew(int batchSize, CancellationToken cancellationToken)
{
var timedCancellationSource = new CancellationTokenSource(databaseConfiguration.BulkInsertCommitTimeout);
var bulkInsert = (await documentStoreProvider.GetDocumentStore(timedCancellationSource.Token))
.BulkInsert(new BulkInsertOptions { SkipOverwriteIfUnchanged = true, }, timedCancellationSource.Token);
// DO NOT USE using var, will be disposed by RavenAuditIngestionUnitOfWork
var lifetimeForwardedTimedCancellationSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
lifetimeForwardedTimedCancellationSource.CancelAfter(databaseConfiguration.BulkInsertCommitTimeout);
var bulkInsert = (await documentStoreProvider.GetDocumentStore(lifetimeForwardedTimedCancellationSource.Token))
.BulkInsert(new BulkInsertOptions { SkipOverwriteIfUnchanged = true, }, lifetimeForwardedTimedCancellationSource.Token);

return new RavenAuditIngestionUnitOfWork(
bulkInsert, timedCancellationSource, databaseConfiguration.AuditRetentionPeriod, new RavenAttachmentsBodyStorage(sessionProvider, bulkInsert, databaseConfiguration.MaxBodySizeToStore)
bulkInsert,
lifetimeForwardedTimedCancellationSource, // Transfer ownership for disposal
databaseConfiguration.AuditRetentionPeriod,
new RavenAttachmentsBodyStorage(sessionProvider, bulkInsert, databaseConfiguration.MaxBodySizeToStore)
);
// Intentionally not disposing CTS!
}

public bool CanIngestMore() => customCheckState.CanIngestMore;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ protected string GetManifestPath()
configuration.AuditIngestionUnitOfWorkFactory;

protected ValueTask<IAuditIngestionUnitOfWork> StartAuditUnitOfWork(int batchSize) =>
AuditIngestionUnitOfWorkFactory.StartNew(batchSize);
AuditIngestionUnitOfWorkFactory.StartNew(batchSize, TestContext.CurrentContext.CancellationToken);

protected IServiceProvider ServiceProvider => configuration.ServiceProvider;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
namespace ServiceControl.Audit.Persistence.UnitOfWork
{
using System.Threading;
using System.Threading.Tasks;

public interface IAuditIngestionUnitOfWorkFactory
{
ValueTask<IAuditIngestionUnitOfWork> StartNew(int batchSize); //Throws if not enough space or some other problem preventing from writing data
ValueTask<IAuditIngestionUnitOfWork> StartNew(int batchSize, CancellationToken cancellationToken); //Throws if not enough space or some other problem preventing from writing data
bool CanIngestMore();
}
}
105 changes: 57 additions & 48 deletions src/ServiceControl.Audit/Auditing/AuditIngestion.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
using ServiceControl.Infrastructure.Metrics;
using Transports;

class AuditIngestion : IHostedService
class AuditIngestion : BackgroundService
{
static readonly long FrequencyInMilliseconds = Stopwatch.Frequency / 1000;

Expand Down Expand Up @@ -59,23 +59,15 @@ public AuditIngestion(

errorHandlingPolicy = new AuditIngestionFaultPolicy(failedImportsStorage, settings.LoggingSettings, OnCriticalError);

watchdog = new Watchdog("audit message ingestion", EnsureStarted, EnsureStopped, ingestionState.ReportError, ingestionState.Clear, settings.TimeToRestartAuditIngestionAfterFailure, logger);

ingestionWorker = Task.Run(() => Loop(), CancellationToken.None);
}

public Task StartAsync(CancellationToken _) => watchdog.Start(() => applicationLifetime.StopApplication());

public async Task StopAsync(CancellationToken cancellationToken)
{
await watchdog.Stop();
channel.Writer.Complete();
await ingestionWorker;

if (transportInfrastructure != null)
{
await transportInfrastructure.Shutdown(cancellationToken);
}
watchdog = new Watchdog(
"audit message ingestion",
EnsureStarted,
EnsureStopped,
ingestionState.ReportError,
ingestionState.Clear,
settings.TimeToRestartAuditIngestionAfterFailure,
logger
);
}

Task OnCriticalError(string failure, Exception exception)
Expand Down Expand Up @@ -123,7 +115,7 @@ async Task EnsureStarted(CancellationToken cancellationToken = default)

queueIngestor = transportInfrastructure.Receivers[inputEndpoint];

await auditIngestor.VerifyCanReachForwardingAddress();
await auditIngestor.VerifyCanReachForwardingAddress(cancellationToken);

await queueIngestor.StartReceive(cancellationToken);

Expand Down Expand Up @@ -202,51 +194,69 @@ async Task OnMessage(MessageContext messageContext, CancellationToken cancellati
await taskCompletionSource.Task;
}

async Task Loop()

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var contexts = new List<MessageContext>(transportSettings.MaxConcurrency.Value);
await watchdog.Start(() => applicationLifetime.StopApplication());

while (await channel.Reader.WaitToReadAsync())
try
{
// will only enter here if there is something to read.
try
var contexts = new List<MessageContext>(transportSettings.MaxConcurrency.Value);

while (await channel.Reader.WaitToReadAsync(stoppingToken))
{
// as long as there is something to read this will fetch up to MaximumConcurrency items
while (channel.Reader.TryRead(out var context))
// will only enter here if there is something to read.
try
{
contexts.Add(context);
// as long as there is something to read this will fetch up to MaximumConcurrency items
while (channel.Reader.TryRead(out var context))
{
contexts.Add(context);
}

batchSizeMeter.Mark(contexts.Count);
using (batchDurationMeter.Measure())
{
await auditIngestor.Ingest(contexts, stoppingToken);
}
}

batchSizeMeter.Mark(contexts.Count);
using (batchDurationMeter.Measure())
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
await auditIngestor.Ingest(contexts);
throw; // Catch again in outer catch
}
}
catch (OperationCanceledException)
{
//Do nothing as we are shutting down
continue;
}
catch (Exception e) // show must go on
{
if (logger.IsInfoEnabled)
catch (Exception e) // show must go on
{
logger.Info("Ingesting messages failed", e);
}

// signal all message handling tasks to terminate
foreach (var context in contexts)
// signal all message handling tasks to terminate
foreach (var context in contexts)
{
context.GetTaskCompletionSource().TrySetException(e);
}
}
finally
{
context.GetTaskCompletionSource().TrySetException(e);
contexts.Clear();
}
}
finally
// will fall out here when writer is completed
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
// Cancellation
}
finally
{
await watchdog.Stop();
channel.Writer.Complete();

if (transportInfrastructure != null)
{
contexts.Clear();
// stoppingToken is cancelled, invoke so transport infrastructure will run teardown
// No need to await, as this will throw OperationCancelledException
_ = transportInfrastructure.Shutdown(stoppingToken);
}
}
// will fall out here when writer is completed
}

TransportInfrastructure transportInfrastructure;
Expand All @@ -265,7 +275,6 @@ async Task Loop()
readonly Meter batchDurationMeter;
readonly Counter receivedMeter;
readonly Watchdog watchdog;
readonly Task ingestionWorker;
readonly IHostApplicationLifetime applicationLifetime;

static readonly ILog logger = LogManager.GetLogger<AuditIngestion>();
Expand Down
16 changes: 8 additions & 8 deletions src/ServiceControl.Audit/Auditing/AuditIngestor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Infrastructure.Settings;
using Monitoring;
Expand Down Expand Up @@ -54,14 +55,14 @@ ITransportCustomization transportCustomization
auditPersister = new AuditPersister(unitOfWorkFactory, enrichers, ingestedAuditMeter, ingestedSagaAuditMeter, auditBulkInsertDurationMeter, sagaAuditBulkInsertDurationMeter, bulkInsertCommitDurationMeter, messageSession, messageDispatcher);
}

public async Task Ingest(List<MessageContext> contexts)
public async Task Ingest(List<MessageContext> contexts, CancellationToken cancellationToken)
{
if (Log.IsDebugEnabled)
{
Log.Debug($"Ingesting {contexts.Count} message contexts");
}

var stored = await auditPersister.Persist(contexts);
var stored = await auditPersister.Persist(contexts, cancellationToken);

try
{
Expand All @@ -71,7 +72,7 @@ public async Task Ingest(List<MessageContext> contexts)
{
Log.Debug($"Forwarding {stored.Count} messages");
}
await Forward(stored, logQueueAddress);
await Forward(stored, logQueueAddress, cancellationToken);
if (Log.IsDebugEnabled)
{
Log.Debug("Forwarded messages");
Expand All @@ -95,7 +96,7 @@ public async Task Ingest(List<MessageContext> contexts)
}
}

Task Forward(IReadOnlyCollection<MessageContext> messageContexts, string forwardingAddress)
Task Forward(IReadOnlyCollection<MessageContext> messageContexts, string forwardingAddress, CancellationToken cancellationToken)
{
var transportOperations = new TransportOperation[messageContexts.Count]; //We could allocate based on the actual number of ProcessedMessages but this should be OK
var index = 0;
Expand Down Expand Up @@ -124,12 +125,11 @@ Task Forward(IReadOnlyCollection<MessageContext> messageContexts, string forward
return anyContext != null
? messageDispatcher.Value.Dispatch(
new TransportOperations(transportOperations),
anyContext.TransportTransaction
)
anyContext.TransportTransaction, cancellationToken)
: Task.CompletedTask;
}

public async Task VerifyCanReachForwardingAddress()
public async Task VerifyCanReachForwardingAddress(CancellationToken cancellationToken)
{
if (!settings.ForwardAuditMessages)
{
Expand All @@ -146,7 +146,7 @@ public async Task VerifyCanReachForwardingAddress()
)
);

await messageDispatcher.Value.Dispatch(transportOperations, new TransportTransaction());
await messageDispatcher.Value.Dispatch(transportOperations, new TransportTransaction(), cancellationToken);
}
catch (Exception e)
{
Expand Down
5 changes: 3 additions & 2 deletions src/ServiceControl.Audit/Auditing/AuditPersister.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Infrastructure;
using Monitoring;
Expand Down Expand Up @@ -38,7 +39,7 @@ public AuditPersister(IAuditIngestionUnitOfWorkFactory unitOfWorkFactory,
this.messageDispatcher = messageDispatcher;
}

public async Task<IReadOnlyList<MessageContext>> Persist(IReadOnlyList<MessageContext> contexts)
public async Task<IReadOnlyList<MessageContext>> Persist(IReadOnlyList<MessageContext> contexts, CancellationToken cancellationToken)
{
var stopwatch = Stopwatch.StartNew();

Expand All @@ -53,7 +54,7 @@ public async Task<IReadOnlyList<MessageContext>> Persist(IReadOnlyList<MessageCo
{

// deliberately not using the using statement because we dispose async explicitly
unitOfWork = await unitOfWorkFactory.StartNew(contexts.Count);
unitOfWork = await unitOfWorkFactory.StartNew(contexts.Count, cancellationToken);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we pass the token to Task.When all as well?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also pass the token to RecordProcessedMessage, RecordSagaSnapshot and RecordKnownEndpoint?

var inserts = new List<Task>(contexts.Count);
foreach (var context in contexts)
{
Expand Down
Loading
Loading