Skip to content

[SPARK-51768][SS][TESTS] Create Failure Injection Test for Streaming offset and commit log write failures #50559

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ class DelayCloseFSDataOutputStreamWrapper(
* Used in unit tests to simulate failure scenarios.
* This can be put into SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS to provide failure
* injection behavior.
* Requirement: when this file manager is created, `path` should already be registered using
* FailureInjectionFileSystem.registerTempPath(path)
*
* @param path The path to the checkpoint directory, passing to the parent class
* @param hadoopConf hadoop conf that will be passed to the parent class
Expand Down Expand Up @@ -153,7 +155,7 @@ object FailureInjectionFileSystem {
* @param path the temp path
* @return the newly created failure injection state
*/
def addPathToTempToInjectionState(path: String): FailureInjectionState = synchronized {
def registerTempPath(path: String): FailureInjectionState = synchronized {
// Throw exception if the path already exists in the map
assert(!tempPathToInjectionState.contains(path), s"Path $path already exists in the map")
tempPathToInjectionState = tempPathToInjectionState + (path -> new FailureInjectionState)
Expand All @@ -165,6 +167,10 @@ object FailureInjectionFileSystem {
* @param path the temp path to be cle
*/
def removePathFromTempToInjectionState(path: String): Unit = synchronized {
// if we can find the injection state of the path, cancel all the delayed streams
tempPathToInjectionState.get(path).foreach { state =>
state.delayedStreams.foreach(_.cancel())
}
tempPathToInjectionState = tempPathToInjectionState - path
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class RocksDBCheckpointFailureInjectionSuite extends StreamTest
*/
def withTempDirAllowFailureInjection(f: (File, FailureInjectionState) => Unit): Unit = {
withTempDir { dir =>
val injectionState = FailureInjectionFileSystem.addPathToTempToInjectionState(dir.getPath)
val injectionState = FailureInjectionFileSystem.registerTempPath(dir.getPath)
try {
f(dir, injectionState)
} finally {
Expand Down Expand Up @@ -377,12 +377,14 @@ class RocksDBCheckpointFailureInjectionSuite extends StreamTest

injectionState.createAtomicDelayCloseRegex = Seq(".*/2_.*changelog")

val additionalConfs = Map(
rocksdbChangelogCheckpointingConfKey -> "true",
SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION.key -> "2",
STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key -> fileManagerClassName)
testStream(aggregated, Update)(
StartStream(checkpointLocation = checkpointDir.getAbsolutePath,
additionalConfs = Map(
rocksdbChangelogCheckpointingConfKey -> "true",
SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION.key -> "2",
STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key -> fileManagerClassName)),
StartStream(
checkpointLocation = checkpointDir.getAbsolutePath,
additionalConfs = additionalConfs),
AddData(inputData, 3),
CheckLastBatch((3, 1)),
AddData(inputData, 3, 2),
Expand All @@ -400,11 +402,9 @@ class RocksDBCheckpointFailureInjectionSuite extends StreamTest

// The query will restart successfully and start at the checkpoint after Batch 1
testStream(aggregated, Update)(
StartStream(checkpointLocation = checkpointDir.getAbsolutePath,
additionalConfs = Map(
rocksdbChangelogCheckpointingConfKey -> "true",
SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION.key -> "2",
STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key -> fileManagerClassName)),
StartStream(
checkpointLocation = checkpointDir.getAbsolutePath,
additionalConfs = additionalConfs),
AddData(inputData, 4),
CheckLastBatch((3, 3), (1, 1), (4, 1)),
StopStream
Expand All @@ -414,6 +414,84 @@ class RocksDBCheckpointFailureInjectionSuite extends StreamTest
}
}

case class FailureConf2(logType: String, checkpointFormatVersion: String) {
override def toString: String = {
s"logType = $logType, checkpointFormatVersion = $checkpointFormatVersion"
}
}

// tests to validate the behavior after failures when writing to the commit and offset logs
Seq(
FailureConf2("commits", checkpointFormatVersion = "1"),
FailureConf2("commits", checkpointFormatVersion = "2"),
FailureConf2("offsets", checkpointFormatVersion = "1"),
FailureConf2("offsets", checkpointFormatVersion = "2")).foreach { failureConf =>
test(s"Progress log fails to write $failureConf") {
val hadoopConf = new Configuration()
hadoopConf.set(STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key, fileManagerClassName)
val rocksdbChangelogCheckpointingConfKey =
RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".changelogCheckpointing.enabled"

withTempDirAllowFailureInjection { (checkpointDir, injectionState) =>
withSQLConf(
rocksdbChangelogCheckpointingConfKey -> "true",
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "2") {
val inputData = MemoryStream[Int]
val aggregated =
inputData.toDF()
.groupBy($"value")
.agg(count("*"))
.as[(Int, Long)]

// This should cause the second batch to fail
injectionState.createAtomicDelayCloseRegex = Seq(s".*/${failureConf.logType}/1")

val additionalConfs = Map(
rocksdbChangelogCheckpointingConfKey -> "true",
SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION.key ->
failureConf.checkpointFormatVersion,
STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key -> fileManagerClassName)

testStream(aggregated, Update)(
StartStream(
checkpointLocation = checkpointDir.getAbsolutePath,
additionalConfs = additionalConfs),
AddData(inputData, 3),
CheckNewAnswer((3, 1)),
AddData(inputData, 3, 2),
// We should categorize this error.
// TODO after the error is categorized, we should check error class
ExpectFailure[IOException] { _ => () }
)

injectionState.createAtomicDelayCloseRegex = Seq.empty

inputData.addData(3, 1)

// The query will restart successfully and start at the checkpoint after Batch 1
testStream(aggregated, Update)(
StartStream(
checkpointLocation = checkpointDir.getAbsolutePath,
additionalConfs = additionalConfs),
AddData(inputData, 4),
if (failureConf.logType == "commits") {
// If the failure is in the commit log, data is already committed.
// MemorySink isn't an ExactlyOnce sink, so we will see the data from the previous
// batch. We should see the data from the previous batch and the new data.
CheckNewAnswer((3, 2), (2, 1), (3, 3), (1, 1), (4, 1))
} else {
// If the failure is in the offset log, previous batch didn't run. when the query
// restarts, it will include all data since the last finished batch.
CheckNewAnswer((3, 3), (1, 1), (4, 1), (2, 1))

},
StopStream
)
}
}
}
}

/**
* An integrated test to cover this scenario:
* 1. A batch is running and a snapshot checkpoint is scheduled
Expand Down