diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 7f36b5c7e06b5..b080b994cb67b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -184,7 +184,16 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { private ConcurrentHashMultiset missedNonPartitionedReqSchedulingOpportunity = ConcurrentHashMultiset.create(); - + + /** + * Tracks the total number of times the application has missed scheduling + * opportunities, which will be incremented when the scheduler cannot allocate + * resources for the application, and reset to 0 when the scheduler + * successfully allocates resources for the application or transitions + * the application to the backoff state. + */ + private final AtomicLong appMissedSchedulingOpportunities = new AtomicLong(); + // Time of the last container scheduled at the current allowed level protected Map lastScheduledContainer = new ConcurrentHashMap<>(); @@ -1106,6 +1115,18 @@ void setSchedulingOpportunities(SchedulerRequestKey schedulerKey, int count) { schedulingOpportunities.setCount(schedulerKey, count); } + public void addAppMissedSchedulingOpportunities() { + appMissedSchedulingOpportunities.incrementAndGet(); + } + + public void resetAppMissedSchedulingOpportunities() { + appMissedSchedulingOpportunities.set(0); + } + + public long getAppMissedSchedulingOpportunities() { + return appMissedSchedulingOpportunities.get(); + } + private AggregateAppResourceUsage getRunningAggregateAppResourceUsage() { long currentTimeMillis = System.currentTimeMillis(); // Don't walk the whole container list if the resources were computed diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityDiagnosticConstant.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityDiagnosticConstant.java index 3eb6dc24e0901..6d98ff750dddf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityDiagnosticConstant.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityDiagnosticConstant.java @@ -70,6 +70,9 @@ public class ActivityDiagnosticConstant { public final static String APPLICATION_DO_NOT_NEED_RESOURCE = "Application does not need more resource"; + public static final String APPLICATION_IN_BACKOFF_STATE = + "Application is in backoff state due to reaching missed scheduling threshold"; + /* * Request level diagnostics */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractLeafQueue.java index 6f176a69691ae..c6ea4b0fa118d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractLeafQueue.java @@ -31,6 +31,7 @@ import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; import java.util.stream.Stream; import org.apache.commons.lang3.StringUtils; @@ -39,6 +40,8 @@ import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; +import org.apache.hadoop.thirdparty.com.google.common.cache.Cache; +import org.apache.hadoop.thirdparty.com.google.common.cache.CacheBuilder; import org.apache.hadoop.util.Sets; import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -154,6 +157,13 @@ public class AbstractLeafQueue extends AbstractCSQueue { private final List runnableApps = new ArrayList<>(); private final List nonRunnableApps = new ArrayList<>(); + // Backoff related variables + private final boolean appBackoffEnabled; + private long appBackoffIntervalMs = 0L; + private long appBackoffMissedThreshold = 0L; + // Cache of applications that are in backoff state + private Cache appsInBackoffState = null; + public AbstractLeafQueue(CapacitySchedulerQueueContext queueContext, String queueName, CSQueue parent, CSQueue old) throws IOException { this(queueContext, queueName, parent, old, false); @@ -170,6 +180,26 @@ public AbstractLeafQueue(CapacitySchedulerQueueContext queueContext, // One time initialization is enough since it is static ordering policy this.pendingOrderingPolicy = new FifoOrderingPolicyForPendingApps<>(); + + // Initialize the backoff configurations + CapacitySchedulerConfiguration conf = queueContext.getConfiguration(); + appBackoffEnabled = conf.isAppBackoffEnabled(queuePath); + if (appBackoffEnabled) { + appBackoffIntervalMs = conf.getAppBackoffIntervalMs(queuePath); + if (appBackoffIntervalMs <= 0) { + throw new IOException( + "Backoff interval must be greater than 0 for queue: " + queuePath); + } + appBackoffMissedThreshold = + conf.getAppBackoffMissedThreshold(queuePath); + if (appBackoffMissedThreshold <= 0) { + throw new IOException( + "Backoff app missed threshold must be greater than 0 for queue: " + + queuePath); + } + appsInBackoffState = CacheBuilder.newBuilder().expireAfterWrite( + appBackoffIntervalMs, TimeUnit.MILLISECONDS).build(); + } } @SuppressWarnings("checkstyle:nowhitespaceafter") @@ -314,7 +344,10 @@ protected void setupQueueConfigs(Resource clusterResource) throws + defaultAppPriorityPerQueue + "\npriority = " + priority + "\nmaxLifetime = " + getMaximumApplicationLifetime() + " seconds" + "\ndefaultLifetime = " - + getDefaultApplicationLifetime() + " seconds"); + + getDefaultApplicationLifetime() + " seconds" + + "\nbackoffEnabled = " + appBackoffEnabled + + "\nbackoffIntervalMs = " + appBackoffIntervalMs + + "\nbackoffAppMissedThreshold = " + appBackoffMissedThreshold); } finally { writeLock.unlock(); } @@ -1212,6 +1245,33 @@ public CSAssignment assignContainers(Resource clusterResource, assignmentIterator.hasNext();) { FiCaSchedulerApp application = assignmentIterator.next(); + // Check for backoff state + if (isAppInBackoffState(application.getApplicationId())) { + // Skip if this app is still in backoff state + ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue( + activitiesManager, node, application, application.getPriority(), + ActivityDiagnosticConstant.APPLICATION_IN_BACKOFF_STATE); + continue; + } + + // Check for missed scheduling opportunities + if (isAppShouldEnterBackoffState(application)) { + // Don't assign containers to this app when the missed opportunities reached the threshold. + LOG.info("Skip scheduling for application {} as it has reached the " + + "missed scheduling threshold {}, the backoff interval is {} ms.", + application.getApplicationId(), appBackoffMissedThreshold, + appBackoffIntervalMs); + ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue( + activitiesManager, node, application, application.getPriority(), + ActivityDiagnosticConstant.APPLICATION_IN_BACKOFF_STATE); + // Add the app to the backoff state, to prevent further scheduling + // attempts during the backoff period. + appsInBackoffState.put(application.getApplicationId(), true); + // Reset missed scheduling opportunities + application.resetAppMissedSchedulingOpportunities(); + continue; + } + ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager, node, SystemClock.getInstance().getTime(), application); @@ -1302,6 +1362,9 @@ public CSAssignment assignContainers(Resource clusterResource, ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, parent.getQueuePath(), getQueuePath(), ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY); + // Reset missed scheduling opportunities after successfully allocating + // resources for the application. + application.resetAppMissedSchedulingOpportunities(); return assignment; } else if (assignment.getSkippedType() == CSAssignment.SkippedType.OTHER) { @@ -1309,6 +1372,8 @@ public CSAssignment assignContainers(Resource clusterResource, activitiesManager, application.getApplicationId(), ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY); application.updateNodeInfoForAMDiagnostics(node); + // Add missed scheduling opportunities for the application + application.addAppMissedSchedulingOpportunities(); } else if (assignment.getSkippedType() == CSAssignment.SkippedType.QUEUE_LIMIT) { ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, @@ -1335,6 +1400,15 @@ public CSAssignment assignContainers(Resource clusterResource, return CSAssignment.NULL_ASSIGNMENT; } + public boolean isAppInBackoffState(ApplicationId appId) { + return appBackoffEnabled && appsInBackoffState.getIfPresent(appId) != null; + } + + public boolean isAppShouldEnterBackoffState(FiCaSchedulerApp application) { + return appBackoffEnabled && + application.getAppMissedSchedulingOpportunities() >= appBackoffMissedThreshold; + } + @Override public boolean accept(Resource cluster, ResourceCommitRequest request) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index ea5c892ce3e5b..f7506d983fa14 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -76,6 +76,7 @@ import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.concurrent.TimeUnit; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueuePrefixes.getAutoCreatedQueueObjectTemplateConfPrefix; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueuePrefixes.getNodeLabelPrefix; @@ -3036,4 +3037,131 @@ private String normalizePolicyName(String policyName) { "Could not instantiate " + "NodesSortingPolicy: " + policyName, e); } } + + /** + * Configuration keys for enabling backoff mechanism for a queue. + * When enabled, applications in the queue will be temporarily skipped + * if they fail to schedule tasks after a certain number of opportunities. + */ + @Private + public static final String BACKOFF_ENABLED = "app-backoff.enabled"; + + /** + * Default value for enabling backoff mechanism. + */ + @Private + public static final boolean DEFAULT_BACKOFF_ENABLED = false; + + /** + * Configuration key indicating the duration for which an application + * in backoff state will be skipped during the scheduling process. + */ + @Private + public static final String APP_BACKOFF_INTERVAL_MS = "app-backoff.interval-ms"; + + /** + * Default value for the backoff duration in milliseconds. + */ + @Private + public static final long DEFAULT_APP_BACKOFF_INTERVAL_MS = 3000L; + + /** + * Configuration key for the threshold of missed scheduling opportunities + * before an application is put into backoff state. + */ + @Private + public static final String APP_BACKOFF_MISSED_THRESHOLD = + "app-backoff.missed-threshold"; + + /** + * Default value for missed opportunities' threshold. + */ + @Private + public static final long DEFAULT_APP_BACKOFF_MISSED_THRESHOLD = 3L; + + /** + * Get the global default value for backoff enabled setting. + * @return true if backoff is enabled, false otherwise + */ + public boolean getGlobalAppBackoffEnabled() { + return getBoolean(PREFIX + BACKOFF_ENABLED, DEFAULT_BACKOFF_ENABLED); + } + + /** + * Get the global default value for backoff interval in milliseconds. + * @return the backoff interval in milliseconds + */ + public long getGlobalAppBackoffIntervalMs() { + return getTimeDuration(PREFIX + APP_BACKOFF_INTERVAL_MS, + DEFAULT_APP_BACKOFF_INTERVAL_MS, TimeUnit.MILLISECONDS); + } + + /** + * Get the global default value for missed opportunities' threshold. + * @return the missed opportunities threshold + */ + public long getGlobalAppBackoffMissedThreshold() { + return getLong(PREFIX + APP_BACKOFF_MISSED_THRESHOLD, + DEFAULT_APP_BACKOFF_MISSED_THRESHOLD); + } + + /** + * Check if app-backoff is enabled for a specific queue. + * @param queue the queue path + * @return true if app-backoff is enabled for the queue, false otherwise + */ + public boolean isAppBackoffEnabled(QueuePath queue) { + return getBoolean(getQueuePrefix(queue) + BACKOFF_ENABLED, + getGlobalAppBackoffEnabled()); + } + + /** + * Get the app-backoff interval in milliseconds for a specific queue. + * @param queue the queue path + * @return the app-backoff interval in milliseconds + */ + public long getAppBackoffIntervalMs(QueuePath queue) { + return getTimeDuration(getQueuePrefix(queue) + APP_BACKOFF_INTERVAL_MS, + getGlobalAppBackoffIntervalMs(), TimeUnit.MILLISECONDS); + } + + /** + * Get the missed opportunities threshold for a specific queue. + * @param queue the queue path + * @return the missed opportunities threshold + */ + public long getAppBackoffMissedThreshold(QueuePath queue) { + return getLong(getQueuePrefix(queue) + APP_BACKOFF_MISSED_THRESHOLD, + getGlobalAppBackoffMissedThreshold()); + } + + /** + * Set the app-backoff enabled flag for a specific queue (for testing). + * @param queue the queue path + * @param enabled the backoff enabled flag + */ + @VisibleForTesting + public void setAppBackoffEnabled(QueuePath queue, boolean enabled) { + setBoolean(getQueuePrefix(queue) + BACKOFF_ENABLED, enabled); + } + + /** + * Set the app-backoff interval in milliseconds for a specific queue (for testing). + * @param queue the queue path + * @param intervalMs the backoff interval in milliseconds + */ + @VisibleForTesting + public void setAppBackoffIntervalMs(QueuePath queue, long intervalMs) { + setLong(getQueuePrefix(queue) + APP_BACKOFF_INTERVAL_MS, intervalMs); + } + + /** + * Set the app-backoff missed opportunities threshold for a specific queue (for testing). + * @param queue the queue path + * @param threshold the missed opportunities threshold + */ + @VisibleForTesting + public void setAppBackoffMissedThreshold(QueuePath queue, long threshold) { + setLong(getQueuePrefix(queue) + APP_BACKOFF_MISSED_THRESHOLD, threshold); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAppBackoff.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAppBackoff.java new file mode 100644 index 0000000000000..1affd6aa62c82 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAppBackoff.java @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceSizing; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData; +import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceUsageMultiNodeLookupPolicy; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; + +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTag; +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetIn; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.APP_BACKOFF_MISSED_THRESHOLD; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.BACKOFF_ENABLED; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.APP_BACKOFF_INTERVAL_MS; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DEFAULT_APP_BACKOFF_MISSED_THRESHOLD; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DEFAULT_APP_BACKOFF_INTERVAL_MS; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MULTI_NODE_PLACEMENT_ENABLED; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A1; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A2; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A3; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B1; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B2; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B3; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupQueueConfiguration; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.GB; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class TestCapacitySchedulerAppBackoff { + + @Test + public void testAppBackoffConfUpdate() throws Exception { + // Setup initial queue configuration + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + setupQueueConfiguration(conf); + conf.setAppBackoffEnabled(A1, true); + conf.setAppBackoffIntervalMs(A1, 10000L); + conf.setAppBackoffMissedThreshold(A1, 50L); + conf.setAppBackoffEnabled(B2, true); + + MockRM rm = new MockRM(conf); + rm.start(); + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + + // Verify initial configuration + assertTrue(cs.getConfiguration().isAppBackoffEnabled(A1)); + assertEquals(10000L, cs.getConfiguration().getAppBackoffIntervalMs(A1)); + assertEquals(50L, cs.getConfiguration().getAppBackoffMissedThreshold(A1)); + assertTrue(cs.getConfiguration().isAppBackoffEnabled(B2)); + assertEquals(DEFAULT_APP_BACKOFF_INTERVAL_MS, + cs.getConfiguration().getAppBackoffIntervalMs(B2)); + assertEquals(DEFAULT_APP_BACKOFF_MISSED_THRESHOLD, + cs.getConfiguration().getAppBackoffMissedThreshold(B2)); + assertFalse(cs.getConfiguration().isAppBackoffEnabled(A2)); + assertFalse(cs.getConfiguration().isAppBackoffEnabled(A3)); + assertFalse(cs.getConfiguration().isAppBackoffEnabled(B1)); + assertFalse(cs.getConfiguration().isAppBackoffEnabled(B3)); + + // Update configuration: enabled backoff + conf.setBoolean(PREFIX + BACKOFF_ENABLED, true); + conf.setLong(PREFIX + APP_BACKOFF_MISSED_THRESHOLD, 5L); + conf.setLong(PREFIX + APP_BACKOFF_INTERVAL_MS, 5000L); + // Disabled for A1 + conf.setAppBackoffEnabled(A1, false); + + // Reinitialize the scheduler with updated configuration + cs.reinitialize(conf, rm.getRMContext()); + + // Verify updated configuration + CapacitySchedulerConfiguration newConf = cs.getConfiguration(); + assertTrue(newConf.isAppBackoffEnabled(B2)); + assertEquals(5L, newConf.getAppBackoffMissedThreshold(B2)); + assertEquals(5000L, newConf.getAppBackoffIntervalMs(B2)); + assertFalse(newConf.isAppBackoffEnabled(A1)); + assertTrue(newConf.isAppBackoffEnabled(A2)); + assertTrue(newConf.isAppBackoffEnabled(A3)); + assertTrue(newConf.isAppBackoffEnabled(B1)); + assertTrue(newConf.isAppBackoffEnabled(B3)); + + rm.stop(); + } + + @Test + public void testSchedulingWithAppBackoffEnabled() throws Exception { + // Setup backoff conf for queue A1 + long appBackoffIntervalMs = 100L; + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + setupQueueConfiguration(conf); + enabledMultiNodesPlacement(conf); + conf.setAppBackoffEnabled(A1, true); + conf.setAppBackoffIntervalMs(A1, appBackoffIntervalMs); + conf.setAppBackoffMissedThreshold(A1, 3L); + + // Register a node + MockRM rm = new MockRM(conf); + rm.start(); + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 10 * GB); + + // Submit an application in queue A1 + MockRMAppSubmissionData data = + MockRMAppSubmissionData.Builder.createWithMemory(2 * GB, rm) + .withAppName("app1") + .withUser("user") + .withAcls(null) + .withQueue(A1.getLeafName()) + .withUnmanagedAM(false) + .build(); + RMApp app = MockRMAppSubmitter.submit(rm, data); + MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1); + + // Submit a request that cannot be satisfied due to the + // placement-constraint condition + PlacementConstraint pc = targetIn("node", + allocationTag("hbase-master")).build(); + SchedulingRequest schedulingRequest = SchedulingRequest.newInstance( + 1, Priority.newInstance(1), ExecutionTypeRequest.newInstance(), null, + ResourceSizing.newInstance(1, Resource.newInstance(2 * GB, 1)), pc); + am.addSchedulingRequest(ImmutableList.of(schedulingRequest)); + am.doHeartbeat(); + + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + AbstractLeafQueue queueA1 = + (AbstractLeafQueue) cs.getQueue(A1.getLeafName()); + FiCaSchedulerApp schedulerApp = + cs.getApplicationAttempt(am.getApplicationAttemptId()); + + // Simulate missed scheduling opportunities + for (int i = 0; i < 3; i++) { + cs.handle(new NodeUpdateSchedulerEvent( + rm.getRMContext().getRMNodes().get(nm1.getNodeId()))); + } + assertFalse(queueA1.isAppInBackoffState(app.getApplicationId())); + assertEquals(3L, schedulerApp.getAppMissedSchedulingOpportunities()); + + // Make the app enter backoff state when it reaches the missed threshold + cs.handle(new NodeUpdateSchedulerEvent( + rm.getRMContext().getRMNodes().get(nm1.getNodeId()))); + + // Verify app is in backoff state + assertTrue(queueA1.isAppInBackoffState(app.getApplicationId())); + assertEquals(0L, schedulerApp.getAppMissedSchedulingOpportunities()); + + // Wait for the backoff interval to expire + GenericTestUtils.waitFor( + () -> !queueA1.isAppInBackoffState(app.getApplicationId()), + appBackoffIntervalMs, appBackoffIntervalMs * 2); + + // Verify app is no longer in backoff state after the backoff interval + assertFalse(queueA1.isAppInBackoffState(app.getApplicationId())); + + // Simulate another missed scheduling opportunity + cs.handle(new NodeUpdateSchedulerEvent( + rm.getRMContext().getRMNodes().get(nm1.getNodeId()))); + assertFalse(queueA1.isAppInBackoffState(app.getApplicationId())); + assertEquals(1L, schedulerApp.getAppMissedSchedulingOpportunities()); + + // Request another request which can be allocated at first + am.allocate("*", 2 * GB, 1, new ArrayList<>()); + cs.handle(new NodeUpdateSchedulerEvent( + rm.getRMContext().getRMNodes().get(nm1.getNodeId()))); + + // new request should be allocated and app is not in backoff state + assertFalse(queueA1.isAppInBackoffState(schedulerApp.getApplicationId())); + assertEquals(0L, schedulerApp.getAppMissedSchedulingOpportunities()); + + rm.stop(); + } + + private void enabledMultiNodesPlacement(CapacitySchedulerConfiguration conf) { + conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, + YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER); + conf.setBoolean(MULTI_NODE_PLACEMENT_ENABLED, true); + conf.setBoolean(PREFIX + MULTI_NODE_PLACEMENT_ENABLED, true); + conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICIES, + "resource-based"); + conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME, + "resource-based"); + String policyName = + CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME + + ".resource-based.class"; + conf.set(policyName, ResourceUsageMultiNodeLookupPolicy.class.getName()); + conf.setBoolean(CapacitySchedulerConfiguration.MULTI_NODE_PLACEMENT_ENABLED, + true); + } +}