Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support cancellation of ingestion operations #4782

Draft
wants to merge 7 commits into
base: hanging-ingestion
Choose a base branch
from

Conversation

ramonsmits
Copy link
Member

@ramonsmits ramonsmits commented Feb 6, 2025

When ingestion hangs and the service looks unresponse a user might want to stop the instance. Especially for the batch ingestion is makes sense to support cancellation as this is a long running operations.

This is a port and continuation of my investion PR:

@ramonsmits ramonsmits self-assigned this Feb 6, 2025
@ramonsmits
Copy link
Member Author

ramonsmits commented Feb 6, 2025

@danielmarbach @andreasohlund There are multiple commits, one for audit, one for error, and one for domain events. Issue it that seems like a very deep rabbit hole as many handlers invoke storage and all of these do not support cancellation.

As cancellation itself isn't really a bug but an improvement.

As audit is the most affected I think it makes sense to support cancellation there and for the remaning parts to create cruft issues as it is a lot of effort to add cancellation to all those operations.

What is your opinion?

@ramonsmits ramonsmits changed the base branch from master to hanging-ingestion February 6, 2025 23:46
@@ -92,7 +92,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
batchSizeMeter.Mark(contexts.Count);
using (batchDurationMeter.Measure())
{
await ingestor.Ingest(contexts);
await ingestor.Ingest(contexts, stoppingToken);
}
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Throwing here will clear the messsage contexts without setting them to completed or failed and leave the OnMessage calls stuck. Is that a problem?

@@ -53,7 +54,7 @@ public async Task<IReadOnlyList<MessageContext>> Persist(IReadOnlyList<MessageCo
{

// deliberately not using the using statement because we dispose async explicitly
unitOfWork = await unitOfWorkFactory.StartNew(contexts.Count);
unitOfWork = await unitOfWorkFactory.StartNew(contexts.Count, cancellationToken);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we pass the token to Task.When all as well?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also pass the token to RecordProcessedMessage, RecordSagaSnapshot and RecordKnownEndpoint?

@danielmarbach
Copy link
Contributor

for the remaning parts to create cruft issues as it is a lot of effort to add cancellation to all those operations.

FYI During the modernization efforts we deliberately had to stop at some point. It was just too much effort and also not in all cases glaringly obvious what the effects of cancellation might have on the stability of the code paths calling it so we did cut off somewhere.

@ramonsmits ramonsmits force-pushed the hanging-ingestion branch 2 times, most recently from b779f00 to b8d5744 Compare February 10, 2025 13:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants