Fix cases which pending items flowing will not be accounted after an error #46
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Bug
The system had a bug which caused inconsistencies in the back-pressure system that eventually lead to stuck process.
This happened when there were many executions (
.run()
) for the same reused Caminho instance, so when some of the operators threw an error in the execution, the back-pressure system would not decrement the pending items for the items that has generated but haven't finished all the operators, therefore, when back-pressure system compared themaxItemsFlowing
with the current number of items flowing, it was still accounting for items that were not in memory anymore.Over time, the Caminho instance was allowing less and less items to be in memory, according to the number of items that were flowing during a failed execution, so at some point, the number of "ghost" items would reach
maxItemsFlowing
and stop processing completely.Fix
The back-pressure system currently accounts all runs for a given Caminho instance, this gives a very good control of how many items can be in memory at the same time, making your process memory allocation very predictable, and even executions more efficient, the goal of this fix was to NOT change this behaviour, therefore, fixing this bug took more effort than simply changing the back-pressure to be per execution.
The fix consisted of generating a
runId
for each execution, so this ID allows to control the back-pressure isolated, and reset all the "pending items" for a given run whenever the flow for the given execution is ended, no matter if it had an error or not.In the fix, the
runId
was intentionally not part of theValueBag
, this was done to avoid internal values of Caminho being available to the step functions, therefore, theOperatorApplierWithRunId
was created.Benchmark
Before
Subflow:
Parent Items: 50000
Child Items: 50000000
initialize steps: 1.437ms
initialize caminho: 0.372ms
run caminho: 1:26.565 (m:ss.mmm)
Parallel:
Parent Items: 5000
Child Items: 5000000
initialize steps: 0.304ms
initialize caminho: 0.038ms
run caminho: 27.630s
After
Subflow:
Parent Items: 50000
Child Items: 50000000
initialize steps: 1.445ms
initialize caminho: 0.441ms
run caminho: 1:30.135 (m:ss.mmm)
Parallel:
Parent Items: 5000
Child Items: 5000000
initialize steps: 0.262ms
initialize caminho: 0.036ms
run caminho: 28.243s