Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Supports Force Committing Segments in Batches #14811

Open
wants to merge 68 commits into
base: master
Choose a base branch
from

Conversation

noob-se7en
Copy link
Contributor

@noob-se7en noob-se7en commented Jan 14, 2025

Problem Statement
The Force Commit API can cause high ingestion lag and slower queries because it triggers the simultaneous segment commit for all consuming segments. This happens because:

  1. If N is the number of partition groups a server is consuming from, The API will cause all of the N consuming segments to commit. Hence N consumer threads will rush to acquire segment build semaphore. If the Semaphore allows only M permits, Only M consuming segments are in the segment build stage and the remaining (N - M) consumer threads are waiting on the Semaphore. Since (N - M) consumer threads are waiting, the consumption lag can become substantial.
  2. Since M consuming segments are built in parallel, queries can become slower on poorly sized servers due to high memory consumption.

Solution
Adds an additional optional Integer parameter: batchSize to the forceCommit API (Default Value = Integer.MAX_VALUE i.e. commit as many segments as possible at once if no batchSize is provided) .

3 New query params added to the ForceCommit API:

  • batchSize (integer, optional):
    Max number of consuming segments to commit at once.
    Default: Integer.MAX_VALUE
    Example: batchSize=100

  • batchStatusCheckIntervalSec (integer, optional):
    How often (in seconds) to check whether the current batch of segments has been successfully committed.
    Default: 5
    Example: batchStatusCheckIntervalSec=10

  • batchStatusCheckTimeoutSec (integer, optional):
    Timeout (in seconds) after which the controller stops checking the forceCommit status and throws an exception.
    Default: 180
    Example: batchStatusCheckTimeoutSec=300

@codecov-commenter
Copy link

codecov-commenter commented Jan 14, 2025

Codecov Report

Attention: Patch coverage is 53.48837% with 40 lines in your changes missing coverage. Please review.

Project coverage is 63.68%. Comparing base (59551e4) to head (5ea7c3f).
Report is 1643 commits behind head on master.

Files with missing lines Patch % Lines
.../core/realtime/PinotLLCRealtimeSegmentManager.java 45.76% 28 Missing and 4 partials ⚠️
...ller/api/resources/PinotRealtimeTableResource.java 0.00% 6 Missing ⚠️
...ntroller/api/resources/ForceCommitBatchConfig.java 90.47% 1 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #14811      +/-   ##
============================================
+ Coverage     61.75%   63.68%   +1.93%     
- Complexity      207     1471    +1264     
============================================
  Files          2436     2710     +274     
  Lines        133233   151974   +18741     
  Branches      20636    23468    +2832     
============================================
+ Hits          82274    96785   +14511     
- Misses        44911    47913    +3002     
- Partials       6048     7276    +1228     
Flag Coverage Δ
custom-integration1 100.00% <ø> (+99.99%) ⬆️
integration 100.00% <ø> (+99.99%) ⬆️
integration1 100.00% <ø> (+99.99%) ⬆️
integration2 0.00% <ø> (ø)
java-11 63.67% <53.48%> (+1.96%) ⬆️
java-21 63.57% <53.48%> (+1.95%) ⬆️
skip-bytebuffers-false 63.68% <53.48%> (+1.93%) ⬆️
skip-bytebuffers-true 63.56% <53.48%> (+35.83%) ⬆️
temurin 63.68% <53.48%> (+1.93%) ⬆️
unittests 63.68% <53.48%> (+1.93%) ⬆️
unittests1 56.21% <ø> (+9.32%) ⬆️
unittests2 34.04% <53.48%> (+6.31%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@Jackie-Jiang
Copy link
Contributor

This solves #11950

Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems you are pushing the batch throttling to the server side. What will happen if some replicas decide to commit, and others get throttled? Even worse, could this cause deadlock?

@siddharthteotia
Copy link
Contributor

Is it not possible to solve the problem on controller / coordinate from the controller ? Pushing this down to the individual server will likely lead to error-prone situations

@noob-se7en
Copy link
Contributor Author

@Jackie-Jiang I don't quite get what is meant by

and others get throttled?

Regarding Deadlock or any edge case - Server will use the same logic which is used /tables/forceCommitStatus/{jobId} to check the status of the batch, so there should be no deadlock.

sendForceCommitMessageToServers(tableNameWithType, targetConsumingSegments);

List<Set<String>> segmentBatchList = getSegmentBatchList(idealState, targetConsumingSegments, batchSize);
ExecutorService executorService = Executors.newFixedThreadPool(1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not create new thread pool on every request

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ohh right!
Fixed (created a separate bounded ExecutorService for forceCommitAPI)

sendForceCommitMessageToServers(tableNameWithType, segmentBatchToCommit);

try {
Thread.sleep(FORCE_COMMIT_STATUS_CHECK_INTERVAL_MS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why sleep at start?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sleep is to save zookeeper calls, as at this point of code it's almost certain that all segments are yet to be committed.
(we need a new retryPolicy with initial start delay)

String state = instanceToStateMap.get(instance);
if (state.equals(SegmentStateModel.CONSUMING)) {
instanceToConsumingSegments.putIfAbsent(instance, new LinkedList<>());
instanceToConsumingSegments.get(instance).add(segmentName);
Copy link
Contributor

@KKcorps KKcorps Jan 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can simply this by using compute instead of putIfAbsent and get

List<Set<String>> segmentBatchList =
realtimeSegmentManager.getSegmentBatchList(idealState, targetConsumingSegment, 2);

// i1 = [seg0, seg4, seg5, seg6]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove these comments

@@ -170,19 +169,28 @@ public Map<String, String> forceCommit(
@ApiParam(value = "Comma separated list of partition group IDs to be committed") @QueryParam("partitions")
String partitionGroupIds,
@ApiParam(value = "Comma separated list of consuming segments to be committed") @QueryParam("segments")
String consumingSegments, @Context HttpHeaders headers) {
String consumingSegments,
@ApiParam(value = "Max number of consuming segments to commit at once") @QueryParam("batchSize")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mention that default is all consuming segments at once in description

@KKcorps
Copy link
Contributor

KKcorps commented Jan 17, 2025

Please update PR description to reflect the change in API signature

@noob-se7en noob-se7en requested a review from KKcorps January 17, 2025 20:14
Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly good

@@ -213,6 +222,7 @@ public PinotLLCRealtimeSegmentManager(PinotHelixResourceManager helixResourceMan
controllerConf.getDeepStoreRetryUploadParallelism()) : null;
_deepStoreUploadExecutorPendingSegments =
_isDeepStoreLLCSegmentUploadRetryEnabled ? ConcurrentHashMap.newKeySet() : null;
_forceCommitExecutorService = Executors.newFixedThreadPool(4);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having a fixed size pool could actually cause problems when there are multiple force commit request. Since it is waiting most of the time, I'd actually suggest making a single thread pool for each request same as the last version. It is not query path so the overhead should be fine.


Map<String, Map<String, String>> segmentNameToInstanceToStateMap = idealState.getRecord().getMapFields();
for (String segmentName : segmentNameToInstanceToStateMap.keySet()) {
if (!targetConsumingSegments.contains(segmentName)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's loop over targetConsumingSegments instead of ideal state. Ideal state should always contain targetConsumingSegments because they are extracted from ideal state.

Comment on lines 1965 to 1971
instanceToConsumingSegments.compute(instance, (key, value) -> {
if (value == null) {
value = new LinkedList<>();
}
value.add(segmentName);
return value;
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
instanceToConsumingSegments.compute(instance, (key, value) -> {
if (value == null) {
value = new LinkedList<>();
}
value.add(segmentName);
return value;
});
instanceToConsumingSegments.computeIfAbsent(instance, k -> new LinkedList<>()).add(segmentName);

Comment on lines 1962 to 1963
for (String instance : instanceToStateMap.keySet()) {
String state = instanceToStateMap.get(instance);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use entrySet() to reduce lookup


while (segmentsRemaining) {
segmentsRemaining = false;
// pick segments in round-robin fashion to parallelize
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Smart!

String segmentName = queue.poll();
// there might be a segment replica hosted on
// another instance added before
if (segmentsAdded.contains(segmentName)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can reduce a lookup by

Suggested change
if (segmentsAdded.contains(segmentName)) {
if (!segmentsAdded.add(segmentName)) {

// pick segments in round-robin fashion to parallelize
// forceCommit across max servers
for (Queue<String> queue : instanceToConsumingSegments.values()) {
if (!queue.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove the queue when it is empty to avoid checking it again and again. You may use iterator to remove entry without extra lookup


try {
Thread.sleep(FORCE_COMMIT_STATUS_CHECK_INTERVAL_MS);
} catch (InterruptedException ignored) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ignoring interrupt could be risky (holding a long running thread). Let's wrap it as a RuntimeException and throw it. We may log an error when catching it

}

int attemptCount = 0;
final Set<String>[] segmentsYetToBeCommitted = new Set[]{new HashSet<>()};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
final Set<String>[] segmentsYetToBeCommitted = new Set[]{new HashSet<>()};
final Set<String>[] segmentsYetToBeCommitted = new Set[1];

@@ -152,6 +157,9 @@ public class PinotLLCRealtimeSegmentManager {

// Max time to wait for all LLC segments to complete committing their metadata while stopping the controller.
private static final long MAX_LLC_SEGMENT_METADATA_COMMIT_TIME_MILLIS = 30_000L;
private static final int FORCE_COMMIT_STATUS_CHECK_INTERVAL_MS = 15000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's take the check interval also from the rest API because different use case might want different interval; we might also want to add a TIMEOUT and also take that from rest API. The retry count can be calculated from timeout and interval.
We can provide default values (e.g. 5s, 3min) for them in case they are not provided. IMO 15s interval is too long because it means for each batch we will wait at least 15s.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure.
(Default case (5s interval, 3m timeout) might be little expensive as we are waiting for segments build to complete and there will be 36 calls at max to ZK per batch)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants