diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ForceCommitBatchConfig.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ForceCommitBatchConfig.java
new file mode 100644
index 000000000000..b34ec382f6a6
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ForceCommitBatchConfig.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.google.common.base.Preconditions;
+
+
+public class ForceCommitBatchConfig {
+  private final int _batchSize;
+  private final int _batchStatusCheckIntervalMs;
+  private final int _batchStatusCheckTimeoutMs;
+
+  private ForceCommitBatchConfig(int batchSize, int batchStatusCheckIntervalMs, int batchStatusCheckTimeoutMs) {
+    _batchSize = batchSize;
+    _batchStatusCheckIntervalMs = batchStatusCheckIntervalMs;
+    _batchStatusCheckTimeoutMs = batchStatusCheckTimeoutMs;
+  }
+
+  public static ForceCommitBatchConfig of(int batchSize, int batchStatusCheckIntervalSec,
+      int batchStatusCheckTimeoutSec) {
+    Preconditions.checkArgument(batchSize > 0, "Batch size should be greater than zero");
+    Preconditions.checkArgument(batchStatusCheckIntervalSec > 0,
+        "Batch status check interval should be greater than zero");
+    Preconditions.checkArgument(batchStatusCheckTimeoutSec > 0,
+        "Batch status check timeout should be greater than zero");
+    return new ForceCommitBatchConfig(batchSize, batchStatusCheckIntervalSec * 1000, batchStatusCheckTimeoutSec * 1000);
+  }
+
+  public int getBatchSize() {
+    return _batchSize;
+  }
+
+  public int getBatchStatusCheckIntervalMs() {
+    return _batchStatusCheckIntervalMs;
+  }
+
+  public int getBatchStatusCheckTimeoutMs() {
+    return _batchStatusCheckTimeoutMs;
+  }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
index f4a0e633a010..e69de66acba5 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
@@ -34,6 +34,7 @@
 import java.util.UUID;
 import java.util.concurrent.Executor;
 import javax.inject.Inject;
+import javax.ws.rs.DefaultValue;
 import javax.ws.rs.GET;
 import javax.ws.rs.POST;
 import javax.ws.rs.Path;
@@ -169,19 +170,35 @@ public Map<String, String> forceCommit(
       @ApiParam(value = "Comma separated list of partition group IDs to be committed") @QueryParam("partitions")
       String partitionGroupIds,
       @ApiParam(value = "Comma separated list of consuming segments to be committed") @QueryParam("segments")
-      String consumingSegments, @Context HttpHeaders headers) {
+      String consumingSegments,
+      @ApiParam(value = "Max number of consuming segments to commit at once (default = Integer.MAX_VALUE)")
+      @QueryParam("batchSize") @DefaultValue(Integer.MAX_VALUE + "") int batchSize,
+      @ApiParam(value = "How often to check whether the current batch of segments have been successfully committed or"
+          + " not (default = 5)")
+      @QueryParam("batchStatusCheckIntervalSec") @DefaultValue("5") int batchStatusCheckIntervalSec,
+      @ApiParam(value = "Timeout based on which the controller will stop checking the forceCommit status of the batch"
+          + " of segments and throw an exception. (default = 180)")
+      @QueryParam("batchStatusCheckTimeoutSec") @DefaultValue("180") int batchStatusCheckTimeoutSec,
+      @Context HttpHeaders headers) {
     tableName = DatabaseUtils.translateTableName(tableName, headers);
     if (partitionGroupIds != null && consumingSegments != null) {
       throw new ControllerApplicationException(LOGGER, "Cannot specify both partitions and segments to commit",
           Response.Status.BAD_REQUEST);
     }
+    ForceCommitBatchConfig batchConfig;
+    try {
+      batchConfig = ForceCommitBatchConfig.of(batchSize, batchStatusCheckIntervalSec, batchStatusCheckTimeoutSec);
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER, "Invalid batch config", Response.Status.BAD_REQUEST, e);
+    }
     long startTimeMs = System.currentTimeMillis();
     String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName);
     validateTable(tableNameWithType);
     Map<String, String> response = new HashMap<>();
     try {
       Set<String> consumingSegmentsForceCommitted =
-          _pinotLLCRealtimeSegmentManager.forceCommit(tableNameWithType, partitionGroupIds, consumingSegments);
+          _pinotLLCRealtimeSegmentManager.forceCommit(tableNameWithType, partitionGroupIds, consumingSegments,
+              batchConfig);
       response.put("forceCommitStatus", "SUCCESS");
       try {
         String jobId = UUID.randomUUID().toString();
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 2b9cf8f954ef..fc3172e09361 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -28,11 +28,15 @@
 import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.Random;
 import java.util.Set;
 import java.util.TreeSet;
@@ -72,6 +76,7 @@
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.api.events.MetadataEventNotifierFactory;
 import org.apache.pinot.controller.api.resources.Constants;
+import org.apache.pinot.controller.api.resources.ForceCommitBatchConfig;
 import org.apache.pinot.controller.api.resources.PauseStatusDetails;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
@@ -117,7 +122,9 @@
 import org.apache.pinot.spi.utils.StringUtil;
 import org.apache.pinot.spi.utils.TimeUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.spi.utils.retry.AttemptFailureException;
 import org.apache.pinot.spi.utils.retry.RetryPolicies;
+import org.apache.pinot.spi.utils.retry.RetryPolicy;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -1848,15 +1855,126 @@ private boolean isTmpAndCanDelete(String filePath, Set<String> downloadUrls, Pin
    * @return the set of consuming segments for which commit was initiated
    */
   public Set<String> forceCommit(String tableNameWithType, @Nullable String partitionGroupIdsToCommit,
-      @Nullable String segmentsToCommit) {
+      @Nullable String segmentsToCommit, ForceCommitBatchConfig batchConfig) {
     IdealState idealState = getIdealState(tableNameWithType);
     Set<String> allConsumingSegments = findConsumingSegments(idealState);
     Set<String> targetConsumingSegments = filterSegmentsToCommit(allConsumingSegments, partitionGroupIdsToCommit,
         segmentsToCommit);
-    sendForceCommitMessageToServers(tableNameWithType, targetConsumingSegments);
+    int batchSize = batchConfig.getBatchSize();
+    if (batchSize >= targetConsumingSegments.size()) {
+      // No need to divide segments in batches.
+      sendForceCommitMessageToServers(tableNameWithType, targetConsumingSegments);
+    } else {
+      List<Set<String>> segmentBatchList = getSegmentBatchList(idealState, targetConsumingSegments, batchSize);
+      ExecutorService executor = Executors.newSingleThreadExecutor();
+      executor.submit(() -> processBatchesSequentially(segmentBatchList, tableNameWithType, batchConfig));
+      executor.shutdown();
+    }
     return targetConsumingSegments;
   }
 
+  private void processBatchesSequentially(List<Set<String>> segmentBatchList, String tableNameWithType,
+      ForceCommitBatchConfig forceCommitBatchConfig) {
+    Set<String> prevBatch = null;
+    for (Set<String> segmentBatchToCommit : segmentBatchList) {
+      if (prevBatch != null) {
+        waitUntilPrevBatchIsComplete(tableNameWithType, prevBatch, forceCommitBatchConfig);
+      }
+      sendForceCommitMessageToServers(tableNameWithType, segmentBatchToCommit);
+      prevBatch = segmentBatchToCommit;
+    }
+  }
+
+  private void waitUntilPrevBatchIsComplete(String tableNameWithType, Set<String> segmentBatchToCommit,
+      ForceCommitBatchConfig forceCommitBatchConfig) {
+    int batchStatusCheckIntervalMs = forceCommitBatchConfig.getBatchStatusCheckIntervalMs();
+    int batchStatusCheckTimeoutMs = forceCommitBatchConfig.getBatchStatusCheckTimeoutMs();
+
+    try {
+      Thread.sleep(batchStatusCheckIntervalMs);
+    } catch (InterruptedException e) {
+      LOGGER.error("Exception occurred while waiting for the forceCommit of segments: {}", segmentBatchToCommit, e);
+      throw new RuntimeException(e);
+    }
+
+    int maxAttempts = (batchStatusCheckTimeoutMs + batchStatusCheckIntervalMs - 1) / batchStatusCheckIntervalMs;
+    RetryPolicy retryPolicy = RetryPolicies.fixedDelayRetryPolicy(maxAttempts, batchStatusCheckIntervalMs);
+    Set<?>[] segmentsYetToBeCommitted = new Set[1];
+    try {
+      retryPolicy.attempt(() -> {
+        segmentsYetToBeCommitted[0] = getSegmentsYetToBeCommitted(tableNameWithType, segmentBatchToCommit);
+        return segmentsYetToBeCommitted[0].isEmpty();
+      });
+    } catch (AttemptFailureException e) {
+      String errorMsg = String.format(
+          "Exception occurred while waiting for the forceCommit of segments: %s, attempt count: %d, "
+              + "segmentsYetToBeCommitted: %s", segmentBatchToCommit, e.getAttempts(), segmentsYetToBeCommitted[0]);
+      LOGGER.error(errorMsg, e);
+      throw new RuntimeException(errorMsg, e);
+    }
+
+    LOGGER.info("segmentBatch: {} successfully force committed", segmentBatchToCommit);
+  }
+
+  @VisibleForTesting
+  List<Set<String>> getSegmentBatchList(IdealState idealState, Set<String> targetConsumingSegments, int batchSize) {
+    int numSegments = targetConsumingSegments.size();
+    List<Set<String>> segmentBatchList = new ArrayList<>((numSegments + batchSize - 1) / batchSize);
+
+    Map<String, Queue<String>> instanceToConsumingSegments =
+        getInstanceToConsumingSegments(idealState, targetConsumingSegments);
+
+    Set<String> segmentsAdded = Sets.newHashSetWithExpectedSize(numSegments);
+    Set<String> currentBatch = Sets.newHashSetWithExpectedSize(batchSize);
+    Collection<Queue<String>> instanceSegmentsCollection = instanceToConsumingSegments.values();
+
+    while (!instanceSegmentsCollection.isEmpty()) {
+      Iterator<Queue<String>> instanceCollectionIterator = instanceSegmentsCollection.iterator();
+      // Pick segments in round-robin fashion to parallelize forceCommit across max servers
+      while (instanceCollectionIterator.hasNext()) {
+        Queue<String> consumingSegments = instanceCollectionIterator.next();
+        String segmentName = consumingSegments.poll();
+        if (consumingSegments.isEmpty()) {
+          instanceCollectionIterator.remove();
+        }
+        if (!segmentsAdded.add(segmentName)) {
+          // There might be a segment replica hosted on another instance added before
+          continue;
+        }
+        currentBatch.add(segmentName);
+        if (currentBatch.size() == batchSize) {
+          segmentBatchList.add(currentBatch);
+          currentBatch = Sets.newHashSetWithExpectedSize(batchSize);
+        }
+      }
+    }
+
+    if (!currentBatch.isEmpty()) {
+      segmentBatchList.add(currentBatch);
+    }
+    return segmentBatchList;
+  }
+
+  @VisibleForTesting
+  Map<String, Queue<String>> getInstanceToConsumingSegments(IdealState idealState,
+      Set<String> targetConsumingSegments) {
+    Map<String, Queue<String>> instanceToConsumingSegments = new HashMap<>();
+    Map<String, Map<String, String>> segmentNameToInstanceToStateMap = idealState.getRecord().getMapFields();
+
+    for (String segmentName: targetConsumingSegments) {
+      Map<String, String> instanceToStateMap = segmentNameToInstanceToStateMap.get(segmentName);
+
+      for (Map.Entry<String, String> instanceToState : instanceToStateMap.entrySet()) {
+        String instance = instanceToState.getKey();
+        String state = instanceToState.getValue();
+        if (state.equals(SegmentStateModel.CONSUMING)) {
+          instanceToConsumingSegments.computeIfAbsent(instance, k -> new LinkedList<>()).add(segmentName);
+        }
+      }
+    }
+    return instanceToConsumingSegments;
+  }
+
   /**
    * Among all consuming segments, filter the ones that are in the given partitions or segments.
    */
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/ForceCommitBatchConfigTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/ForceCommitBatchConfigTest.java
new file mode 100644
index 000000000000..862f19218f90
--- /dev/null
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/ForceCommitBatchConfigTest.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertThrows;
+
+
+public class ForceCommitBatchConfigTest {
+
+  @Test
+  public void testForceCommitBatchConfig() {
+    ForceCommitBatchConfig forceCommitBatchConfig = ForceCommitBatchConfig.of(Integer.MAX_VALUE, 5, 180);
+    assertEquals(forceCommitBatchConfig.getBatchSize(), Integer.MAX_VALUE);
+    assertEquals(forceCommitBatchConfig.getBatchStatusCheckIntervalMs(), 5000);
+    assertEquals(forceCommitBatchConfig.getBatchStatusCheckTimeoutMs(), 180000);
+
+    forceCommitBatchConfig = ForceCommitBatchConfig.of(1, 5, 180);
+    assertEquals(forceCommitBatchConfig.getBatchSize(), 1);
+    assertEquals(forceCommitBatchConfig.getBatchStatusCheckIntervalMs(), 5000);
+    assertEquals(forceCommitBatchConfig.getBatchStatusCheckTimeoutMs(), 180000);
+
+    forceCommitBatchConfig = ForceCommitBatchConfig.of(1, 23, 37);
+    assertEquals(forceCommitBatchConfig.getBatchSize(), 1);
+    assertEquals(forceCommitBatchConfig.getBatchStatusCheckIntervalMs(), 23000);
+    assertEquals(forceCommitBatchConfig.getBatchStatusCheckTimeoutMs(), 37000);
+
+    assertThrows(IllegalArgumentException.class, () -> ForceCommitBatchConfig.of(0, 5, 180));
+    assertThrows(IllegalArgumentException.class, () -> ForceCommitBatchConfig.of(32, 0, 0));
+  }
+}
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index d5969e611f91..abcd75a2004a 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -29,11 +29,14 @@
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.Random;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
@@ -1248,6 +1251,76 @@ public void testGetPartitionIds()
     Assert.assertEquals(partitionIds.size(), 2);
   }
 
+  @Test
+  public void testGetInstanceToConsumingSegments() {
+    PinotHelixResourceManager mockHelixResourceManager = mock(PinotHelixResourceManager.class);
+    FakePinotLLCRealtimeSegmentManager realtimeSegmentManager =
+        new FakePinotLLCRealtimeSegmentManager(mockHelixResourceManager);
+    IdealState idealState = mock(IdealState.class);
+    Map<String, Map<String, String>> map = Map.of(
+        "seg0", Map.of("i1", "CONSUMING", "i4", "ONLINE"),
+        "seg1", Map.of("i2", "CONSUMING"),
+        "seg2", Map.of("i3", "CONSUMING", "i2", "OFFLINE"),
+        "seg3", Map.of("i4", "CONSUMING", "i2", "CONSUMING", "i3", "CONSUMING"),
+        "seg4", Map.of("i5", "CONSUMING", "i1", "CONSUMING", "i3", "CONSUMING")
+    );
+
+    ZNRecord znRecord = mock(ZNRecord.class);
+    when(znRecord.getMapFields()).thenReturn(map);
+    when(idealState.getRecord()).thenReturn(znRecord);
+    // Use TreeSet to ensure ordering
+    Set<String> targetConsumingSegment = new TreeSet<>(map.keySet());
+
+    Map<String, Queue<String>> instanceToConsumingSegments =
+        realtimeSegmentManager.getInstanceToConsumingSegments(idealState, targetConsumingSegment);
+    assertEquals(instanceToConsumingSegments, Map.of(
+        "i1", new LinkedList<>(List.of("seg0", "seg4")),
+        "i2", new LinkedList<>(List.of("seg1", "seg3")),
+        "i3", new LinkedList<>(List.of("seg2", "seg3", "seg4")),
+        "i4", new LinkedList<>(List.of("seg3")),
+        "i5", new LinkedList<>(List.of("seg4"))
+    ));
+  }
+
+  @Test
+  public void getSegmentBatchList() {
+    PinotHelixResourceManager mockHelixResourceManager = mock(PinotHelixResourceManager.class);
+    FakePinotLLCRealtimeSegmentManager realtimeSegmentManager =
+        new FakePinotLLCRealtimeSegmentManager(mockHelixResourceManager);
+    IdealState idealState = mock(IdealState.class);
+
+    Map<String, Map<String, String>> map = Map.of(
+        "seg0", Map.of("i1", "CONSUMING", "i4", "ONLINE"),
+        "seg1", Map.of("i2", "CONSUMING"),
+        "seg2", Map.of("i3", "CONSUMING", "i2", "OFFLINE"),
+        "seg3", Map.of("i4", "CONSUMING", "i2", "CONSUMING", "i3", "CONSUMING"),
+        "seg4", Map.of("i5", "CONSUMING", "i1", "CONSUMING", "i3", "CONSUMING"),
+        "seg5", Map.of("i6", "CONSUMING", "i1", "CONSUMING", "i3", "CONSUMING"),
+        "seg6", Map.of("i7", "CONSUMING", "i1", "CONSUMING", "i3", "CONSUMING")
+    );
+
+    ZNRecord znRecord = mock(ZNRecord.class);
+    when(znRecord.getMapFields()).thenReturn(map);
+    when(idealState.getRecord()).thenReturn(znRecord);
+    // Use TreeSet to ensure ordering
+    Set<String> targetConsumingSegment = new TreeSet<>(map.keySet());
+
+    List<Set<String>> segmentBatchList =
+        realtimeSegmentManager.getSegmentBatchList(idealState, targetConsumingSegment, 2);
+    assertEquals(segmentBatchList, List.of(
+        Set.of("seg0", "seg1"),
+        Set.of("seg2", "seg3"),
+        Set.of("seg4", "seg5"),
+        Set.of("seg6")
+    ));
+
+    segmentBatchList = realtimeSegmentManager.getSegmentBatchList(idealState, targetConsumingSegment, 4);
+    assertEquals(segmentBatchList, List.of(
+        Set.of("seg0", "seg1", "seg2", "seg3"),
+        Set.of("seg4", "seg5", "seg6")
+    ));
+  }
+
   @Test
   public void getSegmentsYetToBeCommitted() {
     PinotHelixResourceManager mockHelixResourceManager = mock(PinotHelixResourceManager.class);
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
index 17655506411b..02f8d1f659f6 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
@@ -428,10 +428,22 @@ public void testForceCommit()
       throws Exception {
     Set<String> consumingSegments = getConsumingSegmentsFromIdealState(getTableName() + "_REALTIME");
     String jobId = forceCommit(getTableName());
+    testForceCommitInternal(jobId, consumingSegments, 60000L);
+  }
+
+  @Test
+  public void testForceCommitInBatches()
+      throws Exception {
+    Set<String> consumingSegments = getConsumingSegmentsFromIdealState(getTableName() + "_REALTIME");
+    String jobId = forceCommit(getTableName(), 1, 5, 210);
+    testForceCommitInternal(jobId, consumingSegments, 240000L);
+  }
+
+  private void testForceCommitInternal(String jobId, Set<String> consumingSegments, long timeoutMs) {
     Map<String, String> jobMetadata =
         _helixResourceManager.getControllerJobZKMetadata(jobId, ControllerJobType.FORCE_COMMIT);
     assert jobMetadata != null;
-    assert jobMetadata.get("segmentsForceCommitted") != null;
+    assert jobMetadata.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST) != null;
 
     TestUtils.waitForCondition(aVoid -> {
       try {
@@ -446,7 +458,7 @@ public void testForceCommit()
       } catch (Exception e) {
         return false;
       }
-    }, 60000L, "Error verifying force commit operation on table!");
+    }, timeoutMs, "Error verifying force commit operation on table!");
   }
 
   public Set<String> getConsumingSegmentsFromIdealState(String tableNameWithType) {
@@ -492,6 +504,15 @@ private String forceCommit(String tableName)
     return JsonUtils.stringToJsonNode(response).get("forceCommitJobId").asText();
   }
 
+  private String forceCommit(String tableName, int batchSize, int batchIntervalSec, int batchTimeoutSec)
+      throws Exception {
+    String response =
+        sendPostRequest(_controllerRequestURLBuilder.forTableForceCommit(tableName) + "?batchSize=" + batchSize
+                + "&batchStatusCheckIntervalSec=" + batchIntervalSec + "&batchStatusCheckTimeoutSec=" + batchTimeoutSec,
+            null);
+    return JsonUtils.stringToJsonNode(response).get("forceCommitJobId").asText();
+  }
+
   @Test
   @Override
   public void testHardcodedServerPartitionedSqlQueries()
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/retry/AttemptFailureException.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/retry/AttemptFailureException.java
index 2f7904edc8fa..8ed1b5c2fa95 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/retry/AttemptFailureException.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/retry/AttemptFailureException.java
@@ -26,12 +26,29 @@
  * either operation throwing an exception or running out of attempts.
  */
 public class AttemptFailureException extends Exception {
+  private final int _attempts;
 
   public AttemptFailureException(String message) {
     super(message);
+    _attempts = 0;
+  }
+
+  public AttemptFailureException(String message, int attempts) {
+    super(message);
+    _attempts = attempts;
   }
 
   public AttemptFailureException(Throwable cause) {
     super(cause);
+    _attempts = 0;
+  }
+
+  public AttemptFailureException(Throwable cause, int attempts) {
+    super(cause);
+    _attempts = attempts;
+  }
+
+  public int getAttempts() {
+    return _attempts;
   }
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/retry/AttemptsExceededException.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/retry/AttemptsExceededException.java
index c710aa1e72c3..7e4521b517c9 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/retry/AttemptsExceededException.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/retry/AttemptsExceededException.java
@@ -24,18 +24,11 @@
  */
 public class AttemptsExceededException extends AttemptFailureException {
 
-  private int _attempts = 0;
-
   public AttemptsExceededException(String message) {
     super(message);
   }
 
   public AttemptsExceededException(String message, int attempts) {
-    super(message);
-    _attempts = attempts;
-  }
-
-  public int getAttempts() {
-    return _attempts;
+    super(message, attempts);
   }
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/retry/RetriableOperationException.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/retry/RetriableOperationException.java
index 23385e85bb2f..380bed123ffc 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/retry/RetriableOperationException.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/retry/RetriableOperationException.java
@@ -23,18 +23,11 @@
  */
 public class RetriableOperationException extends AttemptFailureException {
 
-  private int _attempts = 0;
-
-  public int getAttempts() {
-    return _attempts;
-  }
-
   public RetriableOperationException(Throwable cause) {
     super(cause);
   }
 
   public RetriableOperationException(Throwable cause, int attempts) {
-    super(cause);
-    _attempts = attempts;
+    super(cause, attempts);
   }
 }