Skip to content

[SPARK-51768][SS][TESTS] Create Failure Injection Test for Streaming offset and commit log write failures #50559

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from

Conversation

siying
Copy link
Contributor

@siying siying commented Apr 10, 2025

What changes were proposed in this pull request?

Add unit test to verify stream query works as expected when writing to commit or offset log fails. And minor improvements to existing test code.

Why are the changes needed?

Improve testing

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Run this existing test

Was this patch authored or co-authored using generative AI tooling?

No.

@siying
Copy link
Contributor Author

siying commented Apr 10, 2025

@HeartSaVioR this PR is very simple. Can you take a look?

@HeartSaVioR HeartSaVioR changed the title [SPARK-51768][SS] Create Failure Injection Test for Streaming offset and commit log write failures [SPARK-51768][SS][TESTS] Create Failure Injection Test for Streaming offset and commit log write failures Apr 14, 2025
additionalConfs = additionalConfs),
AddData(inputData, 4),
if (failureConf.logType == "commits") {
// If the failure is in the commit log, data is already committed. The batch will
Copy link
Contributor

@HeartSaVioR HeartSaVioR Apr 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very uneasy to follow, because the behavior of MemoryStream seems to impact a lot on the test. It's not easy to reason about when the commit on the source happens and how MemoryStream will behave. It'd be harder than the origin test logic.

I'd test with file stream where we only append files and Spark is expected to process all files "regardless of these failures". (This should be a contract.) Once we change the output mode to complete, we should see the same result in the latest batch which has no further file to process.

Also, I'm not comfortable with the behavior - If the failure is in the commit log, data is already committed.. Shouldn't we commit the offset for batch N to source when batch N is NOT committed? I suspect this is an indication of data loss/correctness - I wish I'm misunderstanding something.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try to clean up the test code to be clearer, but committing source shouldn't be related here. We only cleanup the offset for the previous batch before ending the current one, so it should have no effect.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, is it due to the fact we used Check"Last"Batch? Sigh I missed this.

I'm OK with not using file stream, but let's figure out how we can verify that (2, 1) has provided before this batch.

Copy link
Contributor

@HeartSaVioR HeartSaVioR Apr 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If that's hard to achieve, I'm OK with more direct code comment about "sink" status - e.g. (3, 2) and (2, 1) were already emitted to sink as batch 1 even with failure on commit log. The write against sink for retried batch 1 should have been ignored (memory sink), but we restart the query and the state in the sink has reset, hence the write against sink for batch 1 in new query has effect.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be lot easier if we can describe the data being included in batch 0 to N. This is very complicated because I expect (3, 2) to be "reprocessed" in the "next query run", and we don't see it anywhere despite the fact the sink has reset hence there shouldn't be dedup for batch 1.

It seems to be due to CheckLastBatch - if we use CheckAnswer then it should contain both. If there is no easy way to verify two batches separately, let's just use CheckAnswer to confirm both batches altogether, with a code comment to describe we reprocessed (3, 2) in batch 1, and processed further input in batch 2.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I think I by mistake used CheckLastBatch. Using CheckNewAnswer will be much less confusing and I'm changing to use that.

…ng/state/RocksDBCheckpointFailureInjectionSuite.scala
Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 pending CI

@HeartSaVioR
Copy link
Contributor

https://github.com/siying/spark/actions/runs/14479972406/job/40614700665

pyspark-connect is only failing (with OOME), which is not relevant to this change.

@HeartSaVioR
Copy link
Contributor

Thanks! Merging to master.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants