Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TEZ-4110: Make Tez fail fast when DFS quota is exceeded. #313

Merged
merged 3 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import com.google.common.annotations.VisibleForTesting;

import org.apache.hadoop.fs.ClusterStorageCapacityExceededException;
import org.apache.tez.common.Preconditions;
import com.google.common.collect.Multimap;
import org.apache.commons.lang.exception.ExceptionUtils;
Expand Down Expand Up @@ -90,7 +91,7 @@ public class TezTaskRunner2 {
// TaskRunnerCallable, a failure to heartbeat, or a signalFatalError on the context.
private volatile Throwable firstException;
private volatile EventMetaData exceptionSourceInfo;
private volatile TaskFailureType firstTaskFailureType;
volatile TaskFailureType firstTaskFailureType;
private final AtomicBoolean errorReporterToAm = new AtomicBoolean(false);

private volatile boolean oobSignalErrorInProgress = false;
Expand Down Expand Up @@ -204,7 +205,7 @@ public TaskRunner2Result run() {
synchronized (this) {
if (isRunningState()) {
trySettingEndReason(EndReason.TASK_ERROR);
registerFirstException(TaskFailureType.NON_FATAL, e, null);
registerFirstException(getTaskFailureType(e), e, null);
LOG.warn("Exception from RunnerCallable", e);
}
}
Expand Down Expand Up @@ -305,7 +306,7 @@ void processCallableResult(TaskRunner2CallableResult executionResult) {
if (isRunningState()) {
if (executionResult.error != null) {
trySettingEndReason(EndReason.TASK_ERROR);
registerFirstException(TaskFailureType.NON_FATAL, executionResult.error, null);
registerFirstException(getTaskFailureType(executionResult.error), executionResult.error, null);
} else {
trySettingEndReason(EndReason.SUCCESS);
taskComplete.set(true);
Expand Down Expand Up @@ -584,4 +585,13 @@ private void logAborting(String abortReason) {
LOG.info("Attempting to abort {} due to an invocation of {}", task.getTaskAttemptID(),
abortReason);
}

private TaskFailureType getTaskFailureType(Throwable e) {
boolean hasClusterStorageCapacityExceededException =
ExceptionUtils.indexOfType(e, ClusterStorageCapacityExceededException.class) != -1;
if (hasClusterStorageCapacityExceededException) {
return TaskFailureType.FATAL;
}
return TaskFailureType.NON_FATAL;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
import java.util.concurrent.locks.ReentrantLock;

import com.google.common.annotations.VisibleForTesting;

import org.apache.hadoop.fs.ClusterStorageCapacityExceededException;
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
import org.apache.tez.common.Preconditions;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
Expand Down Expand Up @@ -77,6 +80,7 @@
import org.apache.tez.runtime.common.resources.ScalingAllocator;
import org.apache.tez.runtime.internals.api.TaskReporterInterface;
import org.apache.tez.runtime.task.TaskExecutionTestHelpers.TestProcessor;
import org.apache.tez.runtime.task.TaskRunner2Callable.TaskRunner2CallableResult;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -653,6 +657,40 @@ public void testKilledAfterComplete() throws IOException, InterruptedException,
}
}

@Test
public void testClusterStorageCapacityFatalError() throws IOException {
// Try having a ClusterStorageCapacityExceededException, which is nested within several exceptions.
TezTaskRunner2ForTest taskRunner = createTaskRunnerForTest();
TaskRunner2CallableResult executionResult = new TaskRunner2CallableResult(new Exception(
new IllegalArgumentException(new ClusterStorageCapacityExceededException("cluster capacity blown"))));
taskRunner.processCallableResult(executionResult);

assertEquals(TaskFailureType.FATAL, taskRunner.getFirstTaskFailureType());

// Try having a child class of ClusterStorageCapacityExceededException, which is nested within several exceptions.
taskRunner = createTaskRunnerForTest();
executionResult = new TaskRunner2CallableResult(
new Exception(new IllegalArgumentException(new NSQuotaExceededException("Namespace quota blown"))));
taskRunner.processCallableResult(executionResult);

assertEquals(TaskFailureType.FATAL, taskRunner.getFirstTaskFailureType());

// Try having a ClusterStorageCapacityExceededException as the first exception (non-nested)
taskRunner = createTaskRunnerForTest();
executionResult =
new TaskRunner2CallableResult(new ClusterStorageCapacityExceededException("cluster capacity blown"));
taskRunner.processCallableResult(executionResult);

assertEquals(TaskFailureType.FATAL, taskRunner.getFirstTaskFailureType());

// Try having some other exception, for that it should be NON_FATAL
taskRunner = createTaskRunnerForTest();
executionResult = new TaskRunner2CallableResult(new Exception(new IllegalArgumentException("Generic Exception")));
taskRunner.processCallableResult(executionResult);

assertEquals(TaskFailureType.NON_FATAL, taskRunner.getFirstTaskFailureType());
}

private void verifySysCounters(TezCounters tezCounters, int minTaskCounterCount, int minFsCounterCount) {

Preconditions.checkArgument((minTaskCounterCount > 0 && minFsCounterCount > 0) ||
Expand Down Expand Up @@ -747,6 +785,11 @@ private TezTaskRunner2 createTaskRunner(ApplicationId appId,
processorConf, false, updateSysCounters);
}

private TezTaskRunner2ForTest createTaskRunnerForTest() throws IOException {
return (TezTaskRunner2ForTest) createTaskRunner(ApplicationId.newInstance(10000, 1), null, null, null,
TestProcessor.class.getName(), TestProcessor.CONF_EMPTY, true, false);
}

private TezTaskRunner2ForTest createTaskRunnerForTest(ApplicationId appId,
TaskExecutionTestHelpers.TezTaskUmbilicalForTest umbilical,
TaskReporter taskReporter,
Expand Down Expand Up @@ -827,6 +870,9 @@ executionContext, memAvailable, updateSysCounters, new DefaultHadoopShim(),
sharedExecutor);
}

public TaskFailureType getFirstTaskFailureType() {
return firstTaskFailureType;
}

@Override
@VisibleForTesting
Expand Down
Loading