-
Notifications
You must be signed in to change notification settings - Fork 48
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support cancellation of ingestion operations #4782
base: hanging-ingestion
Are you sure you want to change the base?
Conversation
…ich can cause the ingestion to never used incoming context tasks and hang.
… - only ignore cancellations set by caller
…Source` with `RavenAuditIngestionUnitOfWork`
…but pretty much all storage API's aren't supporting cancellation....
@danielmarbach @andreasohlund There are multiple commits, one for audit, one for error, and one for domain events. Issue it that seems like a very deep rabbit hole as many handlers invoke storage and all of these do not support cancellation. As cancellation itself isn't really a bug but an improvement. As audit is the most affected I think it makes sense to support cancellation there and for the remaning parts to create cruft issues as it is a lot of effort to add cancellation to all those operations. What is your opinion? |
@@ -92,7 +92,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) | |||
batchSizeMeter.Mark(contexts.Count); | |||
using (batchDurationMeter.Measure()) | |||
{ | |||
await ingestor.Ingest(contexts); | |||
await ingestor.Ingest(contexts, stoppingToken); | |||
} | |||
} | |||
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Throwing here will clear the messsage contexts without setting them to completed or failed and leave the OnMessage calls stuck. Is that a problem?
@@ -53,7 +54,7 @@ public async Task<IReadOnlyList<MessageContext>> Persist(IReadOnlyList<MessageCo | |||
{ | |||
|
|||
// deliberately not using the using statement because we dispose async explicitly | |||
unitOfWork = await unitOfWorkFactory.StartNew(contexts.Count); | |||
unitOfWork = await unitOfWorkFactory.StartNew(contexts.Count, cancellationToken); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we pass the token to Task.When all as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also pass the token to RecordProcessedMessage, RecordSagaSnapshot and RecordKnownEndpoint?
FYI During the modernization efforts we deliberately had to stop at some point. It was just too much effort and also not in all cases glaringly obvious what the effects of cancellation might have on the stability of the code paths calling it so we did cut off somewhere. |
b779f00
to
b8d5744
Compare
When ingestion hangs and the service looks unresponse a user might want to stop the instance. Especially for the batch ingestion is makes sense to support cancellation as this is a long running operations.
This is a port and continuation of my investion PR: