Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Supports Force Committing Segments in Batches #14811

Open
wants to merge 73 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
73 commits
Select commit Hold shift + click to select a range
eeb5be1
Supports batching in ForceCommit API
noob-se7en Jan 12, 2025
5f5a554
nit
noob-se7en Jan 12, 2025
ca5104a
Refactoring
noob-se7en Jan 14, 2025
434e8a3
nit
noob-se7en Jan 14, 2025
504f3c9
nit
noob-se7en Jan 14, 2025
987bb00
nit
noob-se7en Jan 14, 2025
ff25c5f
nit
noob-se7en Jan 14, 2025
e28ff47
nit
noob-se7en Jan 14, 2025
99a7cee
nit
noob-se7en Jan 14, 2025
255bc34
lint
noob-se7en Jan 14, 2025
3a9e41a
nit
noob-se7en Jan 14, 2025
b2eeb85
fixes lint
noob-se7en Jan 14, 2025
1782207
nit
noob-se7en Jan 14, 2025
90db3b8
Merge branch 'master' of github.com:Harnoor7/pinot into add_batch_for…
noob-se7en Jan 15, 2025
fa418b9
refactoring
noob-se7en Jan 15, 2025
470c6eb
refactoring
noob-se7en Jan 15, 2025
8de7bfc
fixes bug
noob-se7en Jan 15, 2025
4f2d4fc
nit
noob-se7en Jan 15, 2025
50af02e
nit
noob-se7en Jan 15, 2025
09d557e
nit
noob-se7en Jan 15, 2025
32b7fd5
nit
noob-se7en Jan 15, 2025
e334983
nit
noob-se7en Jan 15, 2025
1aecc5a
fix_bug
noob-se7en Jan 15, 2025
5be2722
Adds scheduling logic in controller
noob-se7en Jan 15, 2025
153a897
nit
noob-se7en Jan 15, 2025
430127d
fixes lint
noob-se7en Jan 15, 2025
f20948e
fixes bug
noob-se7en Jan 15, 2025
5012b5f
nit
noob-se7en Jan 15, 2025
c2312d2
nit
noob-se7en Jan 15, 2025
49474f5
fix bug
noob-se7en Jan 16, 2025
ab2220f
Updates foceCommit API to handle Pauseless
noob-se7en Jan 16, 2025
ed90f11
updates metadata
noob-se7en Jan 16, 2025
e88aa2a
fixes lint
noob-se7en Jan 16, 2025
8c8d8d3
adds tests
noob-se7en Jan 16, 2025
1be0316
saves 1 zk call
noob-se7en Jan 16, 2025
55fa6e2
Merge branch 'update_forceCommit_status' of github.com:Harnoor7/pinot…
noob-se7en Jan 16, 2025
3297ddd
updates log
noob-se7en Jan 16, 2025
748d0d3
Adds tests
noob-se7en Jan 16, 2025
095acc0
Addresses PR comments
noob-se7en Jan 17, 2025
36360b8
nit
noob-se7en Jan 17, 2025
d3d42ca
Merge branch 'update_forceCommit_status' of github.com:Harnoor7/pinot…
noob-se7en Jan 17, 2025
262bee0
pulls latest changes for pauseless
noob-se7en Jan 17, 2025
68cdc26
adds unit test
noob-se7en Jan 17, 2025
9f833c6
Merge branch 'update_forceCommit_status' of github.com:Harnoor7/pinot…
noob-se7en Jan 17, 2025
f5d68ae
addresses comment
noob-se7en Jan 17, 2025
bffab6d
Merge branch 'master' of github.com:apache/pinot into add_batch_force…
noob-se7en Jan 17, 2025
a1079c2
Addresses Pr comment
noob-se7en Jan 17, 2025
b8a2e7f
Merge branch 'master' of github.com:apache/pinot into update_forceCom…
noob-se7en Jan 17, 2025
5730a06
addresses PR comments
noob-se7en Jan 17, 2025
c8565d6
nit
noob-se7en Jan 17, 2025
165e7ab
Merge branch 'update_forceCommit_status' of github.com:Harnoor7/pinot…
noob-se7en Jan 17, 2025
0cab772
refactoring
noob-se7en Jan 17, 2025
de04824
Addresses PR comments
noob-se7en Jan 17, 2025
b95a2f6
nit
noob-se7en Jan 17, 2025
857dd6a
attempt to fix test
noob-se7en Jan 23, 2025
2f7e5d9
Merge branch 'master' of github.com:apache/pinot into update_forceCom…
noob-se7en Jan 23, 2025
0b64439
nit
noob-se7en Jan 23, 2025
9e3ddad
Attempts to fix test
noob-se7en Jan 23, 2025
01604e9
Merge branch 'update_forceCommit_status' of github.com:Harnoor7/pinot…
noob-se7en Jan 23, 2025
bb84ae2
Merge branch 'master' of github.com:apache/pinot into add_batch_force…
noob-se7en Jan 24, 2025
71f4ee1
Attempts to fix test
noob-se7en Jan 24, 2025
2408d13
attempt to fix test
noob-se7en Jan 24, 2025
ff67929
Addresses PR comments
noob-se7en Jan 28, 2025
80dda07
Adds timeout and interval query parameters in API
noob-se7en Jan 28, 2025
11299f4
nit
noob-se7en Jan 28, 2025
ad7aec0
fixes lint
noob-se7en Jan 29, 2025
7ea5535
Adds unit test
noob-se7en Jan 29, 2025
5ea7c3f
nit
noob-se7en Jan 29, 2025
6907c8f
Addresses PR comments
noob-se7en Jan 30, 2025
2a61ce4
attempts to fix test
noob-se7en Jan 30, 2025
445efbc
speeds up test
noob-se7en Jan 30, 2025
444fc49
Attempts to fix test
noob-se7en Jan 30, 2025
e7ab323
nit
noob-se7en Jan 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.controller.api.resources;

public class ForceCommitBatchConfig {

private static final int DEFAULT_BATCH_SIZE = Integer.MAX_VALUE;
private static final int DEFAULT_BATCH_STATUS_CHECK_INTERVAL_SEC = 5;
private static final int DEFAULT_BATCH_STATUS_CHECK_TIMEOUT_SEC = 180;

private final int _batchSize;
private final int _batchStatusCheckIntervalMs;
private final int _batchStatusCheckTimeoutMs;

private ForceCommitBatchConfig(Integer batchSize, Integer batchStatusCheckIntervalMs,
Integer batchStatusCheckTimeoutMs) {
_batchSize = batchSize;
_batchStatusCheckIntervalMs = batchStatusCheckIntervalMs;
_batchStatusCheckTimeoutMs = batchStatusCheckTimeoutMs;
}

public static ForceCommitBatchConfig of(Integer batchSize, Integer batchStatusCheckIntervalSec,
Integer batchStatusCheckTimeoutSec) {
if (batchSize == null) {
batchSize = DEFAULT_BATCH_SIZE;
} else if (batchSize <= 0) {
throw new IllegalArgumentException("Batch size should be greater than zero");
}

if (batchStatusCheckIntervalSec == null) {
batchStatusCheckIntervalSec = DEFAULT_BATCH_STATUS_CHECK_INTERVAL_SEC;
} else if (batchStatusCheckIntervalSec <= 0) {
throw new IllegalArgumentException("Batch status check interval should be greater than zero");
}

if (batchStatusCheckTimeoutSec == null) {
batchStatusCheckTimeoutSec = DEFAULT_BATCH_STATUS_CHECK_TIMEOUT_SEC;
} else if (batchStatusCheckTimeoutSec <= 0) {
throw new IllegalArgumentException("Batch status check timeout should be greater than zero");
}

return new ForceCommitBatchConfig(batchSize, batchStatusCheckIntervalSec * 1000, batchStatusCheckTimeoutSec * 1000);
}

public int getBatchSize() {
return _batchSize;
}

public int getBatchStatusCheckIntervalMs() {
return _batchStatusCheckIntervalMs;
}

public int getBatchStatusCheckTimeoutMs() {
return _batchStatusCheckTimeoutMs;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -169,19 +169,40 @@ public Map<String, String> forceCommit(
@ApiParam(value = "Comma separated list of partition group IDs to be committed") @QueryParam("partitions")
String partitionGroupIds,
@ApiParam(value = "Comma separated list of consuming segments to be committed") @QueryParam("segments")
String consumingSegments, @Context HttpHeaders headers) {
String consumingSegments,
@ApiParam(value = "Max number of consuming segments to commit at once (default = Integer.MAX_VALUE)")
@QueryParam("batchSize")
Integer batchSize,
@ApiParam(value = "How often to check whether the current batch of segments have been successfully committed or"
+ " not (default = 5)")
@QueryParam("batchStatusCheckIntervalSec")
Integer batchStatusCheckIntervalSec,
@ApiParam(value = "Timeout based on which the controller will stop checking the forceCommit status of the batch"
+ " of segments and throw an exception. (default = 180)")
@QueryParam("batchStatusCheckTimeoutSec")
Integer batchStatusCheckTimeoutSec,
@Context HttpHeaders headers) {
tableName = DatabaseUtils.translateTableName(tableName, headers);
if (partitionGroupIds != null && consumingSegments != null) {
throw new ControllerApplicationException(LOGGER, "Cannot specify both partitions and segments to commit",
Response.Status.BAD_REQUEST);
}
ForceCommitBatchConfig forceCommitBatchConfig;
try {
forceCommitBatchConfig =
ForceCommitBatchConfig.of(batchSize, batchStatusCheckIntervalSec, batchStatusCheckTimeoutSec);
} catch (Exception e) {
throw new ControllerApplicationException(LOGGER, "Invalid batch config",
Response.Status.BAD_REQUEST);
}
long startTimeMs = System.currentTimeMillis();
String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName);
validateTable(tableNameWithType);
Map<String, String> response = new HashMap<>();
try {
Set<String> consumingSegmentsForceCommitted =
_pinotLLCRealtimeSegmentManager.forceCommit(tableNameWithType, partitionGroupIds, consumingSegments);
_pinotLLCRealtimeSegmentManager.forceCommit(tableNameWithType, partitionGroupIds, consumingSegments,
forceCommitBatchConfig);
response.put("forceCommitStatus", "SUCCESS");
try {
String jobId = UUID.randomUUID().toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,15 @@
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
Expand Down Expand Up @@ -72,6 +76,7 @@
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.api.events.MetadataEventNotifierFactory;
import org.apache.pinot.controller.api.resources.Constants;
import org.apache.pinot.controller.api.resources.ForceCommitBatchConfig;
import org.apache.pinot.controller.api.resources.PauseStatusDetails;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
Expand Down Expand Up @@ -117,7 +122,10 @@
import org.apache.pinot.spi.utils.StringUtil;
import org.apache.pinot.spi.utils.TimeUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
import org.apache.pinot.spi.utils.retry.RetriableOperationException;
import org.apache.pinot.spi.utils.retry.RetryPolicies;
import org.apache.pinot.spi.utils.retry.RetryPolicy;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -189,6 +197,7 @@ public class PinotLLCRealtimeSegmentManager {
private final AtomicInteger _numCompletingSegments = new AtomicInteger(0);
private final ExecutorService _deepStoreUploadExecutor;
private final Set<String> _deepStoreUploadExecutorPendingSegments;
private final ExecutorService _forceCommitExecutorService;

private volatile boolean _isStopping = false;

Expand All @@ -213,6 +222,7 @@ public PinotLLCRealtimeSegmentManager(PinotHelixResourceManager helixResourceMan
controllerConf.getDeepStoreRetryUploadParallelism()) : null;
_deepStoreUploadExecutorPendingSegments =
_isDeepStoreLLCSegmentUploadRetryEnabled ? ConcurrentHashMap.newKeySet() : null;
_forceCommitExecutorService = Executors.newCachedThreadPool();
}

public boolean isDeepStoreLLCSegmentUploadRetryEnabled() {
Expand Down Expand Up @@ -309,6 +319,8 @@ public void stop() {
LOGGER.error("Failed to close fileUploadDownloadClient.");
}
}

_forceCommitExecutorService.shutdown();
}

/**
Expand Down Expand Up @@ -1848,15 +1860,130 @@ private boolean isTmpAndCanDelete(String filePath, Set<String> downloadUrls, Pin
* @return the set of consuming segments for which commit was initiated
*/
public Set<String> forceCommit(String tableNameWithType, @Nullable String partitionGroupIdsToCommit,
@Nullable String segmentsToCommit) {
@Nullable String segmentsToCommit, ForceCommitBatchConfig forceCommitBatchConfig) {
IdealState idealState = getIdealState(tableNameWithType);
Set<String> allConsumingSegments = findConsumingSegments(idealState);
Set<String> targetConsumingSegments = filterSegmentsToCommit(allConsumingSegments, partitionGroupIdsToCommit,
segmentsToCommit);
sendForceCommitMessageToServers(tableNameWithType, targetConsumingSegments);

List<Set<String>> segmentBatchList =
getSegmentBatchList(idealState, targetConsumingSegments, forceCommitBatchConfig.getBatchSize());

_forceCommitExecutorService.submit(
() -> processBatchesSequentially(segmentBatchList, tableNameWithType, forceCommitBatchConfig));

return targetConsumingSegments;
}

private void processBatchesSequentially(List<Set<String>> segmentBatchList, String tableNameWithType,
ForceCommitBatchConfig forceCommitBatchConfig) {
Set<String> prevBatch = null;
for (Set<String> segmentBatchToCommit : segmentBatchList) {
if (prevBatch != null) {
waitUntilPrevBatchIsComplete(tableNameWithType, prevBatch, forceCommitBatchConfig);
}
sendForceCommitMessageToServers(tableNameWithType, segmentBatchToCommit);
prevBatch = segmentBatchToCommit;
}
}

private void waitUntilPrevBatchIsComplete(String tableNameWithType, Set<String> segmentBatchToCommit,
ForceCommitBatchConfig forceCommitBatchConfig) {
int batchStatusCheckIntervalMs = forceCommitBatchConfig.getBatchStatusCheckIntervalMs();
int batchStatusCheckTimeoutMs = forceCommitBatchConfig.getBatchStatusCheckTimeoutMs();

try {
Thread.sleep(batchStatusCheckIntervalMs);
} catch (InterruptedException e) {
LOGGER.error("Exception occurred while waiting for the forceCommit of segments: {}", segmentBatchToCommit, e);
throw new RuntimeException(e);
}

int maxAttempts = (batchStatusCheckTimeoutMs + batchStatusCheckIntervalMs - 1) / batchStatusCheckIntervalMs;
RetryPolicy retryPolicy =
RetryPolicies.fixedDelayRetryPolicy(maxAttempts, batchStatusCheckIntervalMs);
final Set<String>[] segmentsYetToBeCommitted = new Set[1];

try {
retryPolicy.attempt(() -> {
segmentsYetToBeCommitted[0] = getSegmentsYetToBeCommitted(tableNameWithType, segmentBatchToCommit);
return segmentsYetToBeCommitted[0].isEmpty();
});
} catch (AttemptsExceededException | RetriableOperationException e) {
int attemptCount;
if (e instanceof AttemptsExceededException) {
attemptCount = ((AttemptsExceededException) e).getAttempts();
} else {
attemptCount = ((RetriableOperationException) e).getAttempts();
}
String errorMsg = String.format(
"Exception occurred while waiting for the forceCommit of segments: %s, attempt count: %d, "
+ "segmentsYetToBeCommitted: %s",
segmentBatchToCommit, attemptCount, segmentsYetToBeCommitted[0]);
throw new RuntimeException(errorMsg, e);
}
}

@VisibleForTesting
List<Set<String>> getSegmentBatchList(IdealState idealState, Set<String> targetConsumingSegments,
int batchSize) {
Map<String, Queue<String>> instanceToConsumingSegments =
getInstanceToConsumingSegments(idealState, targetConsumingSegments);

List<Set<String>> segmentBatchList = new ArrayList<>();
Set<String> currentBatch = new HashSet<>();
Set<String> segmentsAdded = new HashSet<>();
Collection<Queue<String>> instanceSegmentsCollection = instanceToConsumingSegments.values();

while (!instanceSegmentsCollection.isEmpty()) {
Iterator<Queue<String>> instanceCollectionIterator = instanceSegmentsCollection.iterator();
// pick segments in round-robin fashion to parallelize
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Smart!

// forceCommit across max servers
while (instanceCollectionIterator.hasNext()) {
Queue<String> consumingSegments = instanceCollectionIterator.next();
String segmentName = consumingSegments.poll();
if (consumingSegments.isEmpty()) {
instanceCollectionIterator.remove();
}
if (!segmentsAdded.add(segmentName)) {
// there might be a segment replica hosted on
// another instance added before
continue;
}
currentBatch.add(segmentName);
if (currentBatch.size() == batchSize) {
segmentBatchList.add(currentBatch);
currentBatch = new HashSet<>();
}
}
}

if (!currentBatch.isEmpty()) {
segmentBatchList.add(currentBatch);
}
return segmentBatchList;
}

@VisibleForTesting
Map<String, Queue<String>> getInstanceToConsumingSegments(IdealState idealState,
Set<String> targetConsumingSegments) {
Map<String, Queue<String>> instanceToConsumingSegments = new HashMap<>();
Map<String, Map<String, String>> segmentNameToInstanceToStateMap = idealState.getRecord().getMapFields();

for (String segmentName: targetConsumingSegments) {
Map<String, String> instanceToStateMap = segmentNameToInstanceToStateMap.get(segmentName);

for (Map.Entry<String, String> instanceToState : instanceToStateMap.entrySet()) {
String instance = instanceToState.getKey();
String state = instanceToState.getValue();
if (state.equals(SegmentStateModel.CONSUMING)) {
instanceToConsumingSegments.computeIfAbsent(instance, k -> new LinkedList<>()).add(segmentName);
}
}
}
return instanceToConsumingSegments;
}

/**
* Among all consuming segments, filter the ones that are in the given partitions or segments.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.controller.api.resources;

import org.testng.annotations.Test;

import static org.testng.Assert.assertThrows;


public class ForceCommitBatchConfigTest {

@Test
public void testForceCommitBatchConfig() {
ForceCommitBatchConfig forceCommitBatchConfig = ForceCommitBatchConfig.of(null, null, null);
assert Integer.MAX_VALUE == forceCommitBatchConfig.getBatchSize();
assert 5000 == forceCommitBatchConfig.getBatchStatusCheckIntervalMs();
assert 180000 == forceCommitBatchConfig.getBatchStatusCheckTimeoutMs();

forceCommitBatchConfig = ForceCommitBatchConfig.of(1, null, null);
assert 1 == forceCommitBatchConfig.getBatchSize();
assert 5000 == forceCommitBatchConfig.getBatchStatusCheckIntervalMs();
assert 180000 == forceCommitBatchConfig.getBatchStatusCheckTimeoutMs();

forceCommitBatchConfig = ForceCommitBatchConfig.of(1, 23, 37);
assert 1 == forceCommitBatchConfig.getBatchSize();
assert 23000 == forceCommitBatchConfig.getBatchStatusCheckIntervalMs();
assert 37000 == forceCommitBatchConfig.getBatchStatusCheckTimeoutMs();

assertThrows(IllegalArgumentException.class, () -> ForceCommitBatchConfig.of(0, null, null));
assertThrows(IllegalArgumentException.class, () -> ForceCommitBatchConfig.of(32, 0, null));
}
}
Loading
Loading