-
Notifications
You must be signed in to change notification settings - Fork 28.5k
[SPARK-51768][SS][TESTS] Create Failure Injection Test for Streaming offset and commit log write failures #50559
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…and commit log write failures
@HeartSaVioR this PR is very simple. Can you take a look? |
additionalConfs = additionalConfs), | ||
AddData(inputData, 4), | ||
if (failureConf.logType == "commits") { | ||
// If the failure is in the commit log, data is already committed. The batch will |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is very uneasy to follow, because the behavior of MemoryStream seems to impact a lot on the test. It's not easy to reason about when the commit on the source happens and how MemoryStream will behave. It'd be harder than the origin test logic.
I'd test with file stream where we only append files and Spark is expected to process all files "regardless of these failures". (This should be a contract.) Once we change the output mode to complete, we should see the same result in the latest batch which has no further file to process.
Also, I'm not comfortable with the behavior - If the failure is in the commit log, data is already committed.
. Shouldn't we commit the offset for batch N to source when batch N is NOT committed? I suspect this is an indication of data loss/correctness - I wish I'm misunderstanding something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll try to clean up the test code to be clearer, but committing source shouldn't be related here. We only cleanup the offset for the previous batch before ending the current one, so it should have no effect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait, is it due to the fact we used Check"Last"Batch? Sigh I missed this.
I'm OK with not using file stream, but let's figure out how we can verify that (2, 1) has provided before this batch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If that's hard to achieve, I'm OK with more direct code comment about "sink" status - e.g. (3, 2) and (2, 1) were already emitted to sink as batch 1 even with failure on commit log. The write against sink for retried batch 1 should have been ignored (memory sink), but we restart the query and the state in the sink has reset, hence the write against sink for batch 1 in new query has effect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'd be lot easier if we can describe the data being included in batch 0 to N. This is very complicated because I expect (3, 2) to be "reprocessed" in the "next query run", and we don't see it anywhere despite the fact the sink has reset hence there shouldn't be dedup for batch 1.
It seems to be due to CheckLastBatch - if we use CheckAnswer then it should contain both. If there is no easy way to verify two batches separately, let's just use CheckAnswer to confirm both batches altogether, with a code comment to describe we reprocessed (3, 2) in batch 1, and processed further input in batch 2.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I think I by mistake used CheckLastBatch. Using CheckNewAnswer will be much less confusing and I'm changing to use that.
.../org/apache/spark/sql/execution/streaming/state/RocksDBCheckpointFailureInjectionSuite.scala
Outdated
Show resolved
Hide resolved
…ng/state/RocksDBCheckpointFailureInjectionSuite.scala
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 pending CI
https://github.com/siying/spark/actions/runs/14479972406/job/40614700665 pyspark-connect is only failing (with OOME), which is not relevant to this change. |
Thanks! Merging to master. |
What changes were proposed in this pull request?
Add unit test to verify stream query works as expected when writing to commit or offset log fails. And minor improvements to existing test code.
Why are the changes needed?
Improve testing
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Run this existing test
Was this patch authored or co-authored using generative AI tooling?
No.