Skip to content

YARN-11809. Support application backoff mechanism for CapacityScheduler #7589

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,16 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
private ConcurrentHashMultiset<SchedulerRequestKey>
missedNonPartitionedReqSchedulingOpportunity =
ConcurrentHashMultiset.create();


/**
* Tracks the total number of times the application has missed scheduling
* opportunities, which will be incremented when the scheduler cannot allocate
* resources for the application, and reset to 0 when the scheduler
* successfully allocates resources for the application or transitions
* the application to the backoff state.
*/
private final AtomicLong appMissedSchedulingOpportunities = new AtomicLong();

// Time of the last container scheduled at the current allowed level
protected Map<SchedulerRequestKey, Long> lastScheduledContainer =
new ConcurrentHashMap<>();
Expand Down Expand Up @@ -1106,6 +1115,18 @@ void setSchedulingOpportunities(SchedulerRequestKey schedulerKey, int count) {
schedulingOpportunities.setCount(schedulerKey, count);
}

public void addAppMissedSchedulingOpportunities() {
appMissedSchedulingOpportunities.incrementAndGet();
}

public void resetAppMissedSchedulingOpportunities() {
appMissedSchedulingOpportunities.set(0);
}

public long getAppMissedSchedulingOpportunities() {
return appMissedSchedulingOpportunities.get();
}

private AggregateAppResourceUsage getRunningAggregateAppResourceUsage() {
long currentTimeMillis = System.currentTimeMillis();
// Don't walk the whole container list if the resources were computed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ public class ActivityDiagnosticConstant {
public final static String APPLICATION_DO_NOT_NEED_RESOURCE =
"Application does not need more resource";

public static final String APPLICATION_IN_BACKOFF_STATE =
"Application is in backoff state due to reaching missed scheduling threshold";

/*
* Request level diagnostics
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import org.apache.commons.lang3.StringUtils;
Expand All @@ -39,6 +40,8 @@
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.thirdparty.com.google.common.cache.Cache;
import org.apache.hadoop.thirdparty.com.google.common.cache.CacheBuilder;
import org.apache.hadoop.util.Sets;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
Expand Down Expand Up @@ -154,6 +157,13 @@ public class AbstractLeafQueue extends AbstractCSQueue {
private final List<FiCaSchedulerApp> runnableApps = new ArrayList<>();
private final List<FiCaSchedulerApp> nonRunnableApps = new ArrayList<>();

// Backoff related variables
private final boolean appBackoffEnabled;
private long appBackoffIntervalMs = 0L;
private long appBackoffMissedThreshold = 0L;
// Cache of applications that are in backoff state
private Cache<ApplicationId, Boolean> appsInBackoffState = null;

public AbstractLeafQueue(CapacitySchedulerQueueContext queueContext,
String queueName, CSQueue parent, CSQueue old) throws IOException {
this(queueContext, queueName, parent, old, false);
Expand All @@ -170,6 +180,26 @@ public AbstractLeafQueue(CapacitySchedulerQueueContext queueContext,

// One time initialization is enough since it is static ordering policy
this.pendingOrderingPolicy = new FifoOrderingPolicyForPendingApps<>();

// Initialize the backoff configurations
CapacitySchedulerConfiguration conf = queueContext.getConfiguration();
appBackoffEnabled = conf.isAppBackoffEnabled(queuePath);
if (appBackoffEnabled) {
appBackoffIntervalMs = conf.getAppBackoffIntervalMs(queuePath);
if (appBackoffIntervalMs <= 0) {
throw new IOException(
"Backoff interval must be greater than 0 for queue: " + queuePath);
}
appBackoffMissedThreshold =
conf.getAppBackoffMissedThreshold(queuePath);
if (appBackoffMissedThreshold <= 0) {
throw new IOException(
"Backoff app missed threshold must be greater than 0 for queue: "
+ queuePath);
}
appsInBackoffState = CacheBuilder.newBuilder().expireAfterAccess(
appBackoffIntervalMs, TimeUnit.MILLISECONDS).build();
}
}

@SuppressWarnings("checkstyle:nowhitespaceafter")
Expand Down Expand Up @@ -314,7 +344,10 @@ protected void setupQueueConfigs(Resource clusterResource) throws
+ defaultAppPriorityPerQueue + "\npriority = " + priority
+ "\nmaxLifetime = " + getMaximumApplicationLifetime()
+ " seconds" + "\ndefaultLifetime = "
+ getDefaultApplicationLifetime() + " seconds");
+ getDefaultApplicationLifetime() + " seconds"
+ "\nbackoffEnabled = " + appBackoffEnabled
+ "\nbackoffIntervalMs = " + appBackoffIntervalMs
+ "\nbackoffAppMissedThreshold = " + appBackoffMissedThreshold);
} finally {
writeLock.unlock();
}
Expand Down Expand Up @@ -1212,6 +1245,33 @@ public CSAssignment assignContainers(Resource clusterResource,
assignmentIterator.hasNext();) {
FiCaSchedulerApp application = assignmentIterator.next();

// Check for backoff state
if (isAppInBackoffState(application.getApplicationId())) {
// Skip if this app is still in backoff state
ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue(
activitiesManager, node, application, application.getPriority(),
ActivityDiagnosticConstant.APPLICATION_IN_BACKOFF_STATE);
continue;
}

// Check for missed scheduling opportunities
if (isAppShouldEnterBackoffState(application)) {
// Don't assign containers to this app when the missed opportunities reached the threshold.
LOG.info("Skip scheduling for application {} as it has reached the "
+ "missed scheduling threshold {}, the backoff interval is {} ms.",
application.getApplicationId(), appBackoffMissedThreshold,
appBackoffIntervalMs);
ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue(
activitiesManager, node, application, application.getPriority(),
ActivityDiagnosticConstant.APPLICATION_IN_BACKOFF_STATE);
// Add the app to the backoff state, to prevent further scheduling
// attempts during the backoff period.
appsInBackoffState.put(application.getApplicationId(), true);
// Reset missed scheduling opportunities
application.resetAppMissedSchedulingOpportunities();
continue;
}

ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager,
node, SystemClock.getInstance().getTime(), application);

Expand Down Expand Up @@ -1302,13 +1362,18 @@ public CSAssignment assignContainers(Resource clusterResource,
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
parent.getQueuePath(), getQueuePath(),
ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY);
// Reset missed scheduling opportunities after successfully allocating
// resources for the application.
application.resetAppMissedSchedulingOpportunities();
return assignment;
} else if (assignment.getSkippedType()
== CSAssignment.SkippedType.OTHER) {
ActivitiesLogger.APP.finishSkippedAppAllocationRecording(
activitiesManager, application.getApplicationId(),
ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY);
application.updateNodeInfoForAMDiagnostics(node);
// Add missed scheduling opportunities for the application
application.addAppMissedSchedulingOpportunities();
} else if (assignment.getSkippedType()
== CSAssignment.SkippedType.QUEUE_LIMIT) {
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
Expand All @@ -1335,6 +1400,15 @@ public CSAssignment assignContainers(Resource clusterResource,
return CSAssignment.NULL_ASSIGNMENT;
}

public boolean isAppInBackoffState(ApplicationId appId) {
return appBackoffEnabled && appsInBackoffState.getIfPresent(appId) != null;
}

public boolean isAppShouldEnterBackoffState(FiCaSchedulerApp application) {
return appBackoffEnabled &&
application.getAppMissedSchedulingOpportunities() >= appBackoffMissedThreshold;
}

@Override
public boolean accept(Resource cluster,
ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.concurrent.TimeUnit;

import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueuePrefixes.getAutoCreatedQueueObjectTemplateConfPrefix;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueuePrefixes.getNodeLabelPrefix;
Expand Down Expand Up @@ -3036,4 +3037,131 @@ private String normalizePolicyName(String policyName) {
"Could not instantiate " + "NodesSortingPolicy: " + policyName, e);
}
}

/**
* Configuration keys for enabling backoff mechanism for a queue.
* When enabled, applications in the queue will be temporarily skipped
* if they fail to schedule tasks after a certain number of opportunities.
*/
@Private
public static final String BACKOFF_ENABLED = "app-backoff.enabled";

/**
* Default value for enabling backoff mechanism.
*/
@Private
public static final boolean DEFAULT_BACKOFF_ENABLED = false;

/**
* Configuration key indicating the duration for which an application
* in backoff state will be skipped during the scheduling process.
*/
@Private
public static final String APP_BACKOFF_INTERVAL_MS = "app-backoff.interval-ms";

/**
* Default value for the backoff duration in milliseconds.
*/
@Private
public static final long DEFAULT_APP_BACKOFF_INTERVAL_MS = 3000L;

/**
* Configuration key for the threshold of missed scheduling opportunities
* before an application is put into backoff state.
*/
@Private
public static final String APP_BACKOFF_MISSED_THRESHOLD =
"app-backoff.missed-threshold";

/**
* Default value for missed opportunities' threshold.
*/
@Private
public static final long DEFAULT_APP_BACKOFF_MISSED_THRESHOLD = 3L;

/**
* Get the global default value for backoff enabled setting.
* @return true if backoff is enabled, false otherwise
*/
public boolean getGlobalAppBackoffEnabled() {
return getBoolean(PREFIX + BACKOFF_ENABLED, DEFAULT_BACKOFF_ENABLED);
}

/**
* Get the global default value for backoff interval in milliseconds.
* @return the backoff interval in milliseconds
*/
public long getGlobalAppBackoffIntervalMs() {
return getTimeDuration(PREFIX + APP_BACKOFF_INTERVAL_MS,
DEFAULT_APP_BACKOFF_INTERVAL_MS, TimeUnit.MILLISECONDS);
}

/**
* Get the global default value for missed opportunities' threshold.
* @return the missed opportunities threshold
*/
public long getGlobalAppBackoffMissedThreshold() {
return getLong(PREFIX + APP_BACKOFF_MISSED_THRESHOLD,
DEFAULT_APP_BACKOFF_MISSED_THRESHOLD);
}

/**
* Check if app-backoff is enabled for a specific queue.
* @param queue the queue path
* @return true if app-backoff is enabled for the queue, false otherwise
*/
public boolean isAppBackoffEnabled(QueuePath queue) {
return getBoolean(getQueuePrefix(queue) + BACKOFF_ENABLED,
getGlobalAppBackoffEnabled());
}

/**
* Get the app-backoff interval in milliseconds for a specific queue.
* @param queue the queue path
* @return the app-backoff interval in milliseconds
*/
public long getAppBackoffIntervalMs(QueuePath queue) {
return getTimeDuration(getQueuePrefix(queue) + APP_BACKOFF_INTERVAL_MS,
getGlobalAppBackoffIntervalMs(), TimeUnit.MILLISECONDS);
}

/**
* Get the missed opportunities threshold for a specific queue.
* @param queue the queue path
* @return the missed opportunities threshold
*/
public long getAppBackoffMissedThreshold(QueuePath queue) {
return getLong(getQueuePrefix(queue) + APP_BACKOFF_MISSED_THRESHOLD,
getGlobalAppBackoffMissedThreshold());
}

/**
* Set the app-backoff enabled flag for a specific queue (for testing).
* @param queue the queue path
* @param enabled the backoff enabled flag
*/
@VisibleForTesting
public void setAppBackoffEnabled(QueuePath queue, boolean enabled) {
setBoolean(getQueuePrefix(queue) + BACKOFF_ENABLED, enabled);
}

/**
* Set the app-backoff interval in milliseconds for a specific queue (for testing).
* @param queue the queue path
* @param intervalMs the backoff interval in milliseconds
*/
@VisibleForTesting
public void setAppBackoffIntervalMs(QueuePath queue, long intervalMs) {
setLong(getQueuePrefix(queue) + APP_BACKOFF_INTERVAL_MS, intervalMs);
}

/**
* Set the app-backoff missed opportunities threshold for a specific queue (for testing).
* @param queue the queue path
* @param threshold the missed opportunities threshold
*/
@VisibleForTesting
public void setAppBackoffMissedThreshold(QueuePath queue, long threshold) {
setLong(getQueuePrefix(queue) + APP_BACKOFF_MISSED_THRESHOLD, threshold);
}
}
Loading