Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix cases which pending items flowing will not be accounted after an error #46

Merged
merged 2 commits into from
Aug 19, 2024

Conversation

pedrokehl
Copy link
Owner

@pedrokehl pedrokehl commented Aug 8, 2024

Bug

The system had a bug which caused inconsistencies in the back-pressure system that eventually lead to stuck process.
This happened when there were many executions (.run()) for the same reused Caminho instance, so when some of the operators threw an error in the execution, the back-pressure system would not decrement the pending items for the items that has generated but haven't finished all the operators, therefore, when back-pressure system compared the maxItemsFlowing with the current number of items flowing, it was still accounting for items that were not in memory anymore.
Over time, the Caminho instance was allowing less and less items to be in memory, according to the number of items that were flowing during a failed execution, so at some point, the number of "ghost" items would reach maxItemsFlowing and stop processing completely.

Fix

The back-pressure system currently accounts all runs for a given Caminho instance, this gives a very good control of how many items can be in memory at the same time, making your process memory allocation very predictable, and even executions more efficient, the goal of this fix was to NOT change this behaviour, therefore, fixing this bug took more effort than simply changing the back-pressure to be per execution.
The fix consisted of generating a runId for each execution, so this ID allows to control the back-pressure isolated, and reset all the "pending items" for a given run whenever the flow for the given execution is ended, no matter if it had an error or not.
In the fix, the runId was intentionally not part of the ValueBag, this was done to avoid internal values of Caminho being available to the step functions, therefore, the OperatorApplierWithRunId was created.

Benchmark

Before

Subflow:
Parent Items: 50000
Child Items: 50000000
initialize steps: 1.437ms
initialize caminho: 0.372ms
run caminho: 1:26.565 (m:ss.mmm)

Parallel:
Parent Items: 5000
Child Items: 5000000
initialize steps: 0.304ms
initialize caminho: 0.038ms
run caminho: 27.630s

After

Subflow:
Parent Items: 50000
Child Items: 50000000
initialize steps: 1.445ms
initialize caminho: 0.441ms
run caminho: 1:30.135 (m:ss.mmm)

Parallel:
Parent Items: 5000
Child Items: 5000000
initialize steps: 0.262ms
initialize caminho: 0.036ms
run caminho: 28.243s

@pedrokehl pedrokehl force-pushed the fix-pending-items-on-error branch from 0ddd2e6 to a969329 Compare August 8, 2024 19:05
@pedrokehl pedrokehl force-pushed the fix-pending-items-on-error branch from a969329 to 04f66f2 Compare August 18, 2024 20:39
@pedrokehl pedrokehl changed the title Draft: Fix case where pending items flowing will not be accounted after an error Fix case where pending items flowing will not be accounted after an error Aug 18, 2024
@pedrokehl pedrokehl changed the title Fix case where pending items flowing will not be accounted after an error Fix cases which pending items flowing will not be accounted after an error Aug 18, 2024
@pedrokehl pedrokehl merged commit 66b8121 into master Aug 19, 2024
4 checks passed
@pedrokehl pedrokehl deleted the fix-pending-items-on-error branch August 19, 2024 08:08
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant