Skip to content

Commit a551080

Browse files
siyingHeartSaVioR
andcommitted
[SPARK-51768][SS][TESTS] Create Failure Injection Test for Streaming offset and commit log write failures
### What changes were proposed in this pull request? Add unit test to verify stream query works as expected when writing to commit or offset log fails. And minor improvements to existing test code. ### Why are the changes needed? Improve testing ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Run this existing test ### Was this patch authored or co-authored using generative AI tooling? No. Closes #50559 from siying/commit_offset_failure. Lead-authored-by: Siying Dong <dong.sy@gmail.com> Co-authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
1 parent 6e8b4b5 commit a551080

File tree

2 files changed

+96
-12
lines changed

2 files changed

+96
-12
lines changed

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/FailureInjectionCheckpointFileManager.scala

+7-1
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ class DelayCloseFSDataOutputStreamWrapper(
5858
* Used in unit tests to simulate failure scenarios.
5959
* This can be put into SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS to provide failure
6060
* injection behavior.
61+
* Requirement: when this file manager is created, `path` should already be registered using
62+
* FailureInjectionFileSystem.registerTempPath(path)
6163
*
6264
* @param path The path to the checkpoint directory, passing to the parent class
6365
* @param hadoopConf hadoop conf that will be passed to the parent class
@@ -153,7 +155,7 @@ object FailureInjectionFileSystem {
153155
* @param path the temp path
154156
* @return the newly created failure injection state
155157
*/
156-
def addPathToTempToInjectionState(path: String): FailureInjectionState = synchronized {
158+
def registerTempPath(path: String): FailureInjectionState = synchronized {
157159
// Throw exception if the path already exists in the map
158160
assert(!tempPathToInjectionState.contains(path), s"Path $path already exists in the map")
159161
tempPathToInjectionState = tempPathToInjectionState + (path -> new FailureInjectionState)
@@ -165,6 +167,10 @@ object FailureInjectionFileSystem {
165167
* @param path the temp path to be cle
166168
*/
167169
def removePathFromTempToInjectionState(path: String): Unit = synchronized {
170+
// if we can find the injection state of the path, cancel all the delayed streams
171+
tempPathToInjectionState.get(path).foreach { state =>
172+
state.delayedStreams.foreach(_.cancel())
173+
}
168174
tempPathToInjectionState = tempPathToInjectionState - path
169175
}
170176

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBCheckpointFailureInjectionSuite.scala

+89-11
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class RocksDBCheckpointFailureInjectionSuite extends StreamTest
5959
*/
6060
def withTempDirAllowFailureInjection(f: (File, FailureInjectionState) => Unit): Unit = {
6161
withTempDir { dir =>
62-
val injectionState = FailureInjectionFileSystem.addPathToTempToInjectionState(dir.getPath)
62+
val injectionState = FailureInjectionFileSystem.registerTempPath(dir.getPath)
6363
try {
6464
f(dir, injectionState)
6565
} finally {
@@ -377,12 +377,14 @@ class RocksDBCheckpointFailureInjectionSuite extends StreamTest
377377

378378
injectionState.createAtomicDelayCloseRegex = Seq(".*/2_.*changelog")
379379

380+
val additionalConfs = Map(
381+
rocksdbChangelogCheckpointingConfKey -> "true",
382+
SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION.key -> "2",
383+
STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key -> fileManagerClassName)
380384
testStream(aggregated, Update)(
381-
StartStream(checkpointLocation = checkpointDir.getAbsolutePath,
382-
additionalConfs = Map(
383-
rocksdbChangelogCheckpointingConfKey -> "true",
384-
SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION.key -> "2",
385-
STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key -> fileManagerClassName)),
385+
StartStream(
386+
checkpointLocation = checkpointDir.getAbsolutePath,
387+
additionalConfs = additionalConfs),
386388
AddData(inputData, 3),
387389
CheckLastBatch((3, 1)),
388390
AddData(inputData, 3, 2),
@@ -400,11 +402,9 @@ class RocksDBCheckpointFailureInjectionSuite extends StreamTest
400402

401403
// The query will restart successfully and start at the checkpoint after Batch 1
402404
testStream(aggregated, Update)(
403-
StartStream(checkpointLocation = checkpointDir.getAbsolutePath,
404-
additionalConfs = Map(
405-
rocksdbChangelogCheckpointingConfKey -> "true",
406-
SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION.key -> "2",
407-
STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key -> fileManagerClassName)),
405+
StartStream(
406+
checkpointLocation = checkpointDir.getAbsolutePath,
407+
additionalConfs = additionalConfs),
408408
AddData(inputData, 4),
409409
CheckLastBatch((3, 3), (1, 1), (4, 1)),
410410
StopStream
@@ -414,6 +414,84 @@ class RocksDBCheckpointFailureInjectionSuite extends StreamTest
414414
}
415415
}
416416

417+
case class FailureConf2(logType: String, checkpointFormatVersion: String) {
418+
override def toString: String = {
419+
s"logType = $logType, checkpointFormatVersion = $checkpointFormatVersion"
420+
}
421+
}
422+
423+
// tests to validate the behavior after failures when writing to the commit and offset logs
424+
Seq(
425+
FailureConf2("commits", checkpointFormatVersion = "1"),
426+
FailureConf2("commits", checkpointFormatVersion = "2"),
427+
FailureConf2("offsets", checkpointFormatVersion = "1"),
428+
FailureConf2("offsets", checkpointFormatVersion = "2")).foreach { failureConf =>
429+
test(s"Progress log fails to write $failureConf") {
430+
val hadoopConf = new Configuration()
431+
hadoopConf.set(STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key, fileManagerClassName)
432+
val rocksdbChangelogCheckpointingConfKey =
433+
RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".changelogCheckpointing.enabled"
434+
435+
withTempDirAllowFailureInjection { (checkpointDir, injectionState) =>
436+
withSQLConf(
437+
rocksdbChangelogCheckpointingConfKey -> "true",
438+
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "2") {
439+
val inputData = MemoryStream[Int]
440+
val aggregated =
441+
inputData.toDF()
442+
.groupBy($"value")
443+
.agg(count("*"))
444+
.as[(Int, Long)]
445+
446+
// This should cause the second batch to fail
447+
injectionState.createAtomicDelayCloseRegex = Seq(s".*/${failureConf.logType}/1")
448+
449+
val additionalConfs = Map(
450+
rocksdbChangelogCheckpointingConfKey -> "true",
451+
SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION.key ->
452+
failureConf.checkpointFormatVersion,
453+
STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key -> fileManagerClassName)
454+
455+
testStream(aggregated, Update)(
456+
StartStream(
457+
checkpointLocation = checkpointDir.getAbsolutePath,
458+
additionalConfs = additionalConfs),
459+
AddData(inputData, 3),
460+
CheckNewAnswer((3, 1)),
461+
AddData(inputData, 3, 2),
462+
// We should categorize this error.
463+
// TODO after the error is categorized, we should check error class
464+
ExpectFailure[IOException] { _ => () }
465+
)
466+
467+
injectionState.createAtomicDelayCloseRegex = Seq.empty
468+
469+
inputData.addData(3, 1)
470+
471+
// The query will restart successfully and start at the checkpoint after Batch 1
472+
testStream(aggregated, Update)(
473+
StartStream(
474+
checkpointLocation = checkpointDir.getAbsolutePath,
475+
additionalConfs = additionalConfs),
476+
AddData(inputData, 4),
477+
if (failureConf.logType == "commits") {
478+
// If the failure is in the commit log, data is already committed.
479+
// MemorySink isn't an ExactlyOnce sink, so we will see the data from the previous
480+
// batch. We should see the data from the previous batch and the new data.
481+
CheckNewAnswer((3, 2), (2, 1), (3, 3), (1, 1), (4, 1))
482+
} else {
483+
// If the failure is in the offset log, previous batch didn't run. when the query
484+
// restarts, it will include all data since the last finished batch.
485+
CheckNewAnswer((3, 3), (1, 1), (4, 1), (2, 1))
486+
487+
},
488+
StopStream
489+
)
490+
}
491+
}
492+
}
493+
}
494+
417495
/**
418496
* An integrated test to cover this scenario:
419497
* 1. A batch is running and a snapshot checkpoint is scheduled

0 commit comments

Comments
 (0)