Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Supports Force Committing Segments in Batches #14811

Open
wants to merge 73 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 62 commits
Commits
Show all changes
73 commits
Select commit Hold shift + click to select a range
eeb5be1
Supports batching in ForceCommit API
noob-se7en Jan 12, 2025
5f5a554
nit
noob-se7en Jan 12, 2025
ca5104a
Refactoring
noob-se7en Jan 14, 2025
434e8a3
nit
noob-se7en Jan 14, 2025
504f3c9
nit
noob-se7en Jan 14, 2025
987bb00
nit
noob-se7en Jan 14, 2025
ff25c5f
nit
noob-se7en Jan 14, 2025
e28ff47
nit
noob-se7en Jan 14, 2025
99a7cee
nit
noob-se7en Jan 14, 2025
255bc34
lint
noob-se7en Jan 14, 2025
3a9e41a
nit
noob-se7en Jan 14, 2025
b2eeb85
fixes lint
noob-se7en Jan 14, 2025
1782207
nit
noob-se7en Jan 14, 2025
90db3b8
Merge branch 'master' of github.com:Harnoor7/pinot into add_batch_for…
noob-se7en Jan 15, 2025
fa418b9
refactoring
noob-se7en Jan 15, 2025
470c6eb
refactoring
noob-se7en Jan 15, 2025
8de7bfc
fixes bug
noob-se7en Jan 15, 2025
4f2d4fc
nit
noob-se7en Jan 15, 2025
50af02e
nit
noob-se7en Jan 15, 2025
09d557e
nit
noob-se7en Jan 15, 2025
32b7fd5
nit
noob-se7en Jan 15, 2025
e334983
nit
noob-se7en Jan 15, 2025
1aecc5a
fix_bug
noob-se7en Jan 15, 2025
5be2722
Adds scheduling logic in controller
noob-se7en Jan 15, 2025
153a897
nit
noob-se7en Jan 15, 2025
430127d
fixes lint
noob-se7en Jan 15, 2025
f20948e
fixes bug
noob-se7en Jan 15, 2025
5012b5f
nit
noob-se7en Jan 15, 2025
c2312d2
nit
noob-se7en Jan 15, 2025
49474f5
fix bug
noob-se7en Jan 16, 2025
ab2220f
Updates foceCommit API to handle Pauseless
noob-se7en Jan 16, 2025
ed90f11
updates metadata
noob-se7en Jan 16, 2025
e88aa2a
fixes lint
noob-se7en Jan 16, 2025
8c8d8d3
adds tests
noob-se7en Jan 16, 2025
1be0316
saves 1 zk call
noob-se7en Jan 16, 2025
55fa6e2
Merge branch 'update_forceCommit_status' of github.com:Harnoor7/pinot…
noob-se7en Jan 16, 2025
3297ddd
updates log
noob-se7en Jan 16, 2025
748d0d3
Adds tests
noob-se7en Jan 16, 2025
095acc0
Addresses PR comments
noob-se7en Jan 17, 2025
36360b8
nit
noob-se7en Jan 17, 2025
d3d42ca
Merge branch 'update_forceCommit_status' of github.com:Harnoor7/pinot…
noob-se7en Jan 17, 2025
262bee0
pulls latest changes for pauseless
noob-se7en Jan 17, 2025
68cdc26
adds unit test
noob-se7en Jan 17, 2025
9f833c6
Merge branch 'update_forceCommit_status' of github.com:Harnoor7/pinot…
noob-se7en Jan 17, 2025
f5d68ae
addresses comment
noob-se7en Jan 17, 2025
bffab6d
Merge branch 'master' of github.com:apache/pinot into add_batch_force…
noob-se7en Jan 17, 2025
a1079c2
Addresses Pr comment
noob-se7en Jan 17, 2025
b8a2e7f
Merge branch 'master' of github.com:apache/pinot into update_forceCom…
noob-se7en Jan 17, 2025
5730a06
addresses PR comments
noob-se7en Jan 17, 2025
c8565d6
nit
noob-se7en Jan 17, 2025
165e7ab
Merge branch 'update_forceCommit_status' of github.com:Harnoor7/pinot…
noob-se7en Jan 17, 2025
0cab772
refactoring
noob-se7en Jan 17, 2025
de04824
Addresses PR comments
noob-se7en Jan 17, 2025
b95a2f6
nit
noob-se7en Jan 17, 2025
857dd6a
attempt to fix test
noob-se7en Jan 23, 2025
2f7e5d9
Merge branch 'master' of github.com:apache/pinot into update_forceCom…
noob-se7en Jan 23, 2025
0b64439
nit
noob-se7en Jan 23, 2025
9e3ddad
Attempts to fix test
noob-se7en Jan 23, 2025
01604e9
Merge branch 'update_forceCommit_status' of github.com:Harnoor7/pinot…
noob-se7en Jan 23, 2025
bb84ae2
Merge branch 'master' of github.com:apache/pinot into add_batch_force…
noob-se7en Jan 24, 2025
71f4ee1
Attempts to fix test
noob-se7en Jan 24, 2025
2408d13
attempt to fix test
noob-se7en Jan 24, 2025
ff67929
Addresses PR comments
noob-se7en Jan 28, 2025
80dda07
Adds timeout and interval query parameters in API
noob-se7en Jan 28, 2025
11299f4
nit
noob-se7en Jan 28, 2025
ad7aec0
fixes lint
noob-se7en Jan 29, 2025
7ea5535
Adds unit test
noob-se7en Jan 29, 2025
5ea7c3f
nit
noob-se7en Jan 29, 2025
6907c8f
Addresses PR comments
noob-se7en Jan 30, 2025
2a61ce4
attempts to fix test
noob-se7en Jan 30, 2025
445efbc
speeds up test
noob-se7en Jan 30, 2025
444fc49
Attempts to fix test
noob-se7en Jan 30, 2025
e7ab323
nit
noob-se7en Jan 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -169,19 +169,29 @@ public Map<String, String> forceCommit(
@ApiParam(value = "Comma separated list of partition group IDs to be committed") @QueryParam("partitions")
String partitionGroupIds,
@ApiParam(value = "Comma separated list of consuming segments to be committed") @QueryParam("segments")
String consumingSegments, @Context HttpHeaders headers) {
String consumingSegments,
@ApiParam(value = "Max number of consuming segments to commit at once (default = Integer.MAX_VALUE)")
@QueryParam("batchSize")
Integer batchSize, @Context HttpHeaders headers) {
tableName = DatabaseUtils.translateTableName(tableName, headers);
if (partitionGroupIds != null && consumingSegments != null) {
throw new ControllerApplicationException(LOGGER, "Cannot specify both partitions and segments to commit",
Response.Status.BAD_REQUEST);
}
if (batchSize == null) {
batchSize = Integer.MAX_VALUE;
} else if (batchSize <= 0) {
throw new ControllerApplicationException(LOGGER, "Batch size should be greater than zero",
Response.Status.BAD_REQUEST);
}
long startTimeMs = System.currentTimeMillis();
String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName);
validateTable(tableNameWithType);
Map<String, String> response = new HashMap<>();
try {
Set<String> consumingSegmentsForceCommitted =
_pinotLLCRealtimeSegmentManager.forceCommit(tableNameWithType, partitionGroupIds, consumingSegments);
_pinotLLCRealtimeSegmentManager.forceCommit(tableNameWithType, partitionGroupIds, consumingSegments,
batchSize);
response.put("forceCommitStatus", "SUCCESS");
try {
String jobId = UUID.randomUUID().toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
Expand Down Expand Up @@ -117,7 +119,10 @@
import org.apache.pinot.spi.utils.StringUtil;
import org.apache.pinot.spi.utils.TimeUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
import org.apache.pinot.spi.utils.retry.RetriableOperationException;
import org.apache.pinot.spi.utils.retry.RetryPolicies;
import org.apache.pinot.spi.utils.retry.RetryPolicy;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -152,6 +157,9 @@ public class PinotLLCRealtimeSegmentManager {

// Max time to wait for all LLC segments to complete committing their metadata while stopping the controller.
private static final long MAX_LLC_SEGMENT_METADATA_COMMIT_TIME_MILLIS = 30_000L;
private static final int FORCE_COMMIT_STATUS_CHECK_INTERVAL_MS = 15000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's take the check interval also from the rest API because different use case might want different interval; we might also want to add a TIMEOUT and also take that from rest API. The retry count can be calculated from timeout and interval.
We can provide default values (e.g. 5s, 3min) for them in case they are not provided. IMO 15s interval is too long because it means for each batch we will wait at least 15s.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure.
(Default case (5s interval, 3m timeout) might be little expensive as we are waiting for segments build to complete and there will be 36 calls at max to ZK per batch)

private static final RetryPolicy DEFAULT_RETRY_POLICY =
RetryPolicies.fixedDelayRetryPolicy(10, FORCE_COMMIT_STATUS_CHECK_INTERVAL_MS);

// TODO: make this configurable with default set to 10
/**
Expand Down Expand Up @@ -189,6 +197,7 @@ public class PinotLLCRealtimeSegmentManager {
private final AtomicInteger _numCompletingSegments = new AtomicInteger(0);
private final ExecutorService _deepStoreUploadExecutor;
private final Set<String> _deepStoreUploadExecutorPendingSegments;
private final ExecutorService _forceCommitExecutorService;

private volatile boolean _isStopping = false;

Expand All @@ -213,6 +222,7 @@ public PinotLLCRealtimeSegmentManager(PinotHelixResourceManager helixResourceMan
controllerConf.getDeepStoreRetryUploadParallelism()) : null;
_deepStoreUploadExecutorPendingSegments =
_isDeepStoreLLCSegmentUploadRetryEnabled ? ConcurrentHashMap.newKeySet() : null;
_forceCommitExecutorService = Executors.newFixedThreadPool(4);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having a fixed size pool could actually cause problems when there are multiple force commit request. Since it is waiting most of the time, I'd actually suggest making a single thread pool for each request same as the last version. It is not query path so the overhead should be fine.

}

public boolean isDeepStoreLLCSegmentUploadRetryEnabled() {
Expand Down Expand Up @@ -309,6 +319,8 @@ public void stop() {
LOGGER.error("Failed to close fileUploadDownloadClient.");
}
}

_forceCommitExecutorService.shutdown();
}

/**
Expand Down Expand Up @@ -1848,15 +1860,122 @@ private boolean isTmpAndCanDelete(String filePath, Set<String> downloadUrls, Pin
* @return the set of consuming segments for which commit was initiated
*/
public Set<String> forceCommit(String tableNameWithType, @Nullable String partitionGroupIdsToCommit,
@Nullable String segmentsToCommit) {
@Nullable String segmentsToCommit, int batchSize) {
IdealState idealState = getIdealState(tableNameWithType);
Set<String> allConsumingSegments = findConsumingSegments(idealState);
Set<String> targetConsumingSegments = filterSegmentsToCommit(allConsumingSegments, partitionGroupIdsToCommit,
segmentsToCommit);
sendForceCommitMessageToServers(tableNameWithType, targetConsumingSegments);

List<Set<String>> segmentBatchList = getSegmentBatchList(idealState, targetConsumingSegments, batchSize);

_forceCommitExecutorService.submit(() -> processBatchesSequentially(segmentBatchList, tableNameWithType));

return targetConsumingSegments;
}

private void processBatchesSequentially(List<Set<String>> segmentBatchList, String tableNameWithType) {
Set<String> prevBatch = null;
for (Set<String> segmentBatchToCommit: segmentBatchList) {
if (prevBatch != null) {
waitUntilPrevBatchIsComplete(tableNameWithType, prevBatch);
}
sendForceCommitMessageToServers(tableNameWithType, segmentBatchToCommit);
prevBatch = segmentBatchToCommit;
}
}

private void waitUntilPrevBatchIsComplete(String tableNameWithType, Set<String> segmentBatchToCommit) {

try {
Thread.sleep(FORCE_COMMIT_STATUS_CHECK_INTERVAL_MS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why sleep at start?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sleep is to save zookeeper calls, as at this point of code it's almost certain that all segments are yet to be committed.
(we need a new retryPolicy with initial start delay)

} catch (InterruptedException ignored) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ignoring interrupt could be risky (holding a long running thread). Let's wrap it as a RuntimeException and throw it. We may log an error when catching it

}

int attemptCount = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for this variable. Both AttemptsExceededException and RetriableOperationException have getAttempts method.

final Set<String>[] segmentsYetToBeCommitted = new Set[]{new HashSet<>()};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
final Set<String>[] segmentsYetToBeCommitted = new Set[]{new HashSet<>()};
final Set<String>[] segmentsYetToBeCommitted = new Set[1];

try {
attemptCount = DEFAULT_RETRY_POLICY.attempt(() -> {
segmentsYetToBeCommitted[0] = getSegmentsYetToBeCommitted(tableNameWithType, segmentBatchToCommit);
return segmentsYetToBeCommitted[0].isEmpty();
});
} catch (AttemptsExceededException | RetriableOperationException e) {
String errorMsg = String.format(
"Exception occurred while executing the forceCommit batch of segments: %s, attempt count: %d, "
+ "segmentsYetToBeCommitted: %s",
segmentBatchToCommit, attemptCount, segmentsYetToBeCommitted[0]);
LOGGER.error(errorMsg, e);
throw new RuntimeException(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If an exception is thrown, there's no need to log. Add the errorMsg to the runtime exception.

}
}

@VisibleForTesting
List<Set<String>> getSegmentBatchList(IdealState idealState, Set<String> targetConsumingSegments,
int batchSize) {
Map<String, Queue<String>> instanceToConsumingSegments =
getInstanceToConsumingSegments(idealState, targetConsumingSegments);

List<Set<String>> segmentBatchList = new ArrayList<>();
Set<String> currentBatch = new HashSet<>();
Set<String> segmentsAdded = new HashSet<>();
boolean segmentsRemaining = true;

while (segmentsRemaining) {
segmentsRemaining = false;
// pick segments in round-robin fashion to parallelize
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Smart!

// forceCommit across max servers
for (Queue<String> queue : instanceToConsumingSegments.values()) {
if (!queue.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove the queue when it is empty to avoid checking it again and again. You may use iterator to remove entry without extra lookup

segmentsRemaining = true;
String segmentName = queue.poll();
// there might be a segment replica hosted on
// another instance added before
if (segmentsAdded.contains(segmentName)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can reduce a lookup by

Suggested change
if (segmentsAdded.contains(segmentName)) {
if (!segmentsAdded.add(segmentName)) {

continue;
}
currentBatch.add(segmentName);
segmentsAdded.add(segmentName);
if (currentBatch.size() == batchSize) {
segmentBatchList.add(currentBatch);
currentBatch = new HashSet<>();
}
}
}
}

if (!currentBatch.isEmpty()) {
segmentBatchList.add(currentBatch);
}
return segmentBatchList;
}

@VisibleForTesting
Map<String, Queue<String>> getInstanceToConsumingSegments(IdealState idealState,
Set<String> targetConsumingSegments) {
Map<String, Queue<String>> instanceToConsumingSegments = new HashMap<>();

Map<String, Map<String, String>> segmentNameToInstanceToStateMap = idealState.getRecord().getMapFields();
for (String segmentName : segmentNameToInstanceToStateMap.keySet()) {
if (!targetConsumingSegments.contains(segmentName)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's loop over targetConsumingSegments instead of ideal state. Ideal state should always contain targetConsumingSegments because they are extracted from ideal state.

continue;
}
Map<String, String> instanceToStateMap = segmentNameToInstanceToStateMap.get(segmentName);
for (String instance : instanceToStateMap.keySet()) {
String state = instanceToStateMap.get(instance);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use entrySet() to reduce lookup

if (state.equals(SegmentStateModel.CONSUMING)) {
instanceToConsumingSegments.compute(instance, (key, value) -> {
if (value == null) {
value = new LinkedList<>();
}
value.add(segmentName);
return value;
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
instanceToConsumingSegments.compute(instance, (key, value) -> {
if (value == null) {
value = new LinkedList<>();
}
value.add(segmentName);
return value;
});
instanceToConsumingSegments.computeIfAbsent(instance, k -> new LinkedList<>()).add(segmentName);

}
}
}

return instanceToConsumingSegments;
}

/**
* Among all consuming segments, filter the ones that are in the given partitions or segments.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.pinot.controller.helix.core.realtime;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import java.io.File;
Expand All @@ -31,9 +33,12 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -1248,6 +1253,112 @@ public void testGetPartitionIds()
Assert.assertEquals(partitionIds.size(), 2);
}

@Test
public void testGetInstanceToConsumingSegments() {
PinotHelixResourceManager mockHelixResourceManager = mock(PinotHelixResourceManager.class);
FakePinotLLCRealtimeSegmentManager realtimeSegmentManager =
new FakePinotLLCRealtimeSegmentManager(mockHelixResourceManager);
IdealState idealState = mock(IdealState.class);
Map<String, Map<String, String>> map = ImmutableMap.of(
"seg0", ImmutableMap.of("i1", "CONSUMING", "i4", "ONLINE"),
"seg1", ImmutableMap.of("i2", "CONSUMING"),
"seg2", ImmutableMap.of("i3", "CONSUMING", "i2", "OFFLINE"),
"seg3", ImmutableMap.of("i4", "CONSUMING", "i2", "CONSUMING", "i3", "CONSUMING"),
"seg4", ImmutableMap.of("i5", "CONSUMING", "i1", "CONSUMING", "i3", "CONSUMING")
);

ZNRecord znRecord = mock(ZNRecord.class);
when(znRecord.getMapFields()).thenReturn(map);
when(idealState.getRecord()).thenReturn(znRecord);
Set<String> targetConsumingSegment = new HashSet<>(map.keySet());

Map<String, Queue<String>> instanceToConsumingSegments =
realtimeSegmentManager.getInstanceToConsumingSegments(idealState, targetConsumingSegment);
List<String> instanceList = ImmutableList.of("i1", "i2", "i3", "i4", "i5");

StringBuilder expectedSegNames = new StringBuilder();
for (String instanceName : instanceList) {
SortedSet<String> sortedSegNames = new TreeSet<>(instanceToConsumingSegments.get(instanceName));
expectedSegNames.append(sortedSegNames);
}
assert expectedSegNames.toString()
.equals("[seg0, seg4][seg1, seg3][seg2, seg3, seg4][seg3][seg4]");
}

@Test
public void getSegmentBatchList() {
PinotHelixResourceManager mockHelixResourceManager = mock(PinotHelixResourceManager.class);
FakePinotLLCRealtimeSegmentManager realtimeSegmentManager =
new FakePinotLLCRealtimeSegmentManager(mockHelixResourceManager);
IdealState idealState = mock(IdealState.class);

Map<String, Map<String, String>> map = ImmutableMap.of(
"seg0", ImmutableMap.of("i1", "CONSUMING", "i4", "ONLINE"),
"seg1", ImmutableMap.of("i2", "CONSUMING"),
"seg2", ImmutableMap.of("i3", "CONSUMING", "i2", "OFFLINE"),
"seg3", ImmutableMap.of("i4", "CONSUMING", "i2", "CONSUMING", "i3", "CONSUMING"),
"seg4", ImmutableMap.of("i5", "CONSUMING", "i1", "CONSUMING", "i3", "CONSUMING"),
"seg5", ImmutableMap.of("i6", "CONSUMING", "i1", "CONSUMING", "i3", "CONSUMING"),
"seg6", ImmutableMap.of("i7", "CONSUMING", "i1", "CONSUMING", "i3", "CONSUMING")
);

ZNRecord znRecord = mock(ZNRecord.class);
when(znRecord.getMapFields()).thenReturn(map);
when(idealState.getRecord()).thenReturn(znRecord);
Set<String> targetConsumingSegment = new HashSet<>(map.keySet());

List<Set<String>> segmentBatchList =
realtimeSegmentManager.getSegmentBatchList(idealState, targetConsumingSegment, 2);

assert segmentBatchList.size() == 4;
Set<String> segmentsAdded = new HashSet<>();

for (Set<String> segmentBatch : segmentBatchList) {
assert segmentBatch.size() <= 2;
for (String segmentName : segmentBatch) {
assert !segmentsAdded.contains(segmentName);
segmentsAdded.add(segmentName);
}
}

Random random = new Random();
int numOfServers = 1 + random.nextInt(20);
int numOfSegments = Math.max(numOfServers, random.nextInt(500));
int rf = Math.min(numOfServers, random.nextInt(7));
int batchSize = random.nextInt(100);

map = new HashMap<>();
for (int segmentIndex = 0; segmentIndex < numOfSegments; segmentIndex++) {
String segmentName = "seg_" + segmentIndex;
Map<String, String> instanceToStateMap = new HashMap<>();
for (int rfIndex = 0; rfIndex < rf; rfIndex++) {
instanceToStateMap.put("i_" + random.nextInt(numOfServers), "CONSUMING");
}
map.put(segmentName, instanceToStateMap);
}

znRecord = mock(ZNRecord.class);
when(znRecord.getMapFields()).thenReturn(map);
when(idealState.getRecord()).thenReturn(znRecord);
targetConsumingSegment = new HashSet<>(map.keySet());

segmentBatchList = realtimeSegmentManager.getSegmentBatchList(idealState, targetConsumingSegment, batchSize);
int numBatchesExpected = (targetConsumingSegment.size() + batchSize - 1) / batchSize;

assert numBatchesExpected == segmentBatchList.size();

segmentsAdded = new HashSet<>();
for (Set<String> batch : segmentBatchList) {
assert batch.size() <= batchSize;
for (String segmentName : batch) {
assert !segmentsAdded.contains(segmentName);
segmentsAdded.add(segmentName);
}
}

assert segmentsAdded.equals(targetConsumingSegment);
}

@Test
public void getSegmentsYetToBeCommitted() {
PinotHelixResourceManager mockHelixResourceManager = mock(PinotHelixResourceManager.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,10 +428,22 @@ public void testForceCommit()
throws Exception {
Set<String> consumingSegments = getConsumingSegmentsFromIdealState(getTableName() + "_REALTIME");
String jobId = forceCommit(getTableName());
testForceCommitInternal(jobId, consumingSegments, 60000L);
}

@Test
public void testForceCommitInBatches()
throws Exception {
Set<String> consumingSegments = getConsumingSegmentsFromIdealState(getTableName() + "_REALTIME");
String jobId = forceCommit(getTableName(), 1);
testForceCommitInternal(jobId, consumingSegments, 180000L);
}

private void testForceCommitInternal(String jobId, Set<String> consumingSegments, long timeoutMs) {
Map<String, String> jobMetadata =
_helixResourceManager.getControllerJobZKMetadata(jobId, ControllerJobType.FORCE_COMMIT);
assert jobMetadata != null;
assert jobMetadata.get("segmentsForceCommitted") != null;
assert jobMetadata.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST) != null;

TestUtils.waitForCondition(aVoid -> {
try {
Expand All @@ -446,7 +458,7 @@ public void testForceCommit()
} catch (Exception e) {
return false;
}
}, 60000L, "Error verifying force commit operation on table!");
}, timeoutMs, "Error verifying force commit operation on table!");
}

public Set<String> getConsumingSegmentsFromIdealState(String tableNameWithType) {
Expand Down Expand Up @@ -492,6 +504,13 @@ private String forceCommit(String tableName)
return JsonUtils.stringToJsonNode(response).get("forceCommitJobId").asText();
}

private String forceCommit(String tableName, int batchSize)
throws Exception {
String response =
sendPostRequest(_controllerRequestURLBuilder.forTableForceCommit(tableName) + "?batchSize=" + batchSize, null);
return JsonUtils.stringToJsonNode(response).get("forceCommitJobId").asText();
}

@Test
@Override
public void testHardcodedServerPartitionedSqlQueries()
Expand Down
Loading