-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Supports Force Committing Segments in Batches #14811
base: master
Are you sure you want to change the base?
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #14811 +/- ##
============================================
+ Coverage 61.75% 63.68% +1.93%
- Complexity 207 1471 +1264
============================================
Files 2436 2710 +274
Lines 133233 151974 +18741
Branches 20636 23468 +2832
============================================
+ Hits 82274 96785 +14511
- Misses 44911 47913 +3002
- Partials 6048 7276 +1228
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
This solves #11950 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems you are pushing the batch throttling to the server side. What will happen if some replicas decide to commit, and others get throttled? Even worse, could this cause deadlock?
Is it not possible to solve the problem on controller / coordinate from the controller ? Pushing this down to the individual server will likely lead to error-prone situations |
@Jackie-Jiang I don't quite get what is meant by
Regarding Deadlock or any edge case - Server will use the same logic which is used |
… into add_batch_force_commit
sendForceCommitMessageToServers(tableNameWithType, targetConsumingSegments); | ||
|
||
List<Set<String>> segmentBatchList = getSegmentBatchList(idealState, targetConsumingSegments, batchSize); | ||
ExecutorService executorService = Executors.newFixedThreadPool(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do not create new thread pool on every request
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ohh right!
Fixed (created a separate bounded ExecutorService for forceCommitAPI)
sendForceCommitMessageToServers(tableNameWithType, segmentBatchToCommit); | ||
|
||
try { | ||
Thread.sleep(FORCE_COMMIT_STATUS_CHECK_INTERVAL_MS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why sleep at start?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sleep is to save zookeeper calls, as at this point of code it's almost certain that all segments are yet to be committed.
(we need a new retryPolicy with initial start delay)
String state = instanceToStateMap.get(instance); | ||
if (state.equals(SegmentStateModel.CONSUMING)) { | ||
instanceToConsumingSegments.putIfAbsent(instance, new LinkedList<>()); | ||
instanceToConsumingSegments.get(instance).add(segmentName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can simply this by using compute
instead of putIfAbsent and get
List<Set<String>> segmentBatchList = | ||
realtimeSegmentManager.getSegmentBatchList(idealState, targetConsumingSegment, 2); | ||
|
||
// i1 = [seg0, seg4, seg5, seg6] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove these comments
@@ -170,19 +169,28 @@ public Map<String, String> forceCommit( | |||
@ApiParam(value = "Comma separated list of partition group IDs to be committed") @QueryParam("partitions") | |||
String partitionGroupIds, | |||
@ApiParam(value = "Comma separated list of consuming segments to be committed") @QueryParam("segments") | |||
String consumingSegments, @Context HttpHeaders headers) { | |||
String consumingSegments, | |||
@ApiParam(value = "Max number of consuming segments to commit at once") @QueryParam("batchSize") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mention that default is all consuming segments at once in description
Please update PR description to reflect the change in API signature |
… into add_batch_force_commit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly good
@@ -213,6 +222,7 @@ public PinotLLCRealtimeSegmentManager(PinotHelixResourceManager helixResourceMan | |||
controllerConf.getDeepStoreRetryUploadParallelism()) : null; | |||
_deepStoreUploadExecutorPendingSegments = | |||
_isDeepStoreLLCSegmentUploadRetryEnabled ? ConcurrentHashMap.newKeySet() : null; | |||
_forceCommitExecutorService = Executors.newFixedThreadPool(4); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having a fixed size pool could actually cause problems when there are multiple force commit request. Since it is waiting most of the time, I'd actually suggest making a single thread pool for each request same as the last version. It is not query path so the overhead should be fine.
|
||
Map<String, Map<String, String>> segmentNameToInstanceToStateMap = idealState.getRecord().getMapFields(); | ||
for (String segmentName : segmentNameToInstanceToStateMap.keySet()) { | ||
if (!targetConsumingSegments.contains(segmentName)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's loop over targetConsumingSegments
instead of ideal state. Ideal state should always contain targetConsumingSegments
because they are extracted from ideal state.
instanceToConsumingSegments.compute(instance, (key, value) -> { | ||
if (value == null) { | ||
value = new LinkedList<>(); | ||
} | ||
value.add(segmentName); | ||
return value; | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instanceToConsumingSegments.compute(instance, (key, value) -> { | |
if (value == null) { | |
value = new LinkedList<>(); | |
} | |
value.add(segmentName); | |
return value; | |
}); | |
instanceToConsumingSegments.computeIfAbsent(instance, k -> new LinkedList<>()).add(segmentName); |
for (String instance : instanceToStateMap.keySet()) { | ||
String state = instanceToStateMap.get(instance); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use entrySet()
to reduce lookup
|
||
while (segmentsRemaining) { | ||
segmentsRemaining = false; | ||
// pick segments in round-robin fashion to parallelize |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Smart!
String segmentName = queue.poll(); | ||
// there might be a segment replica hosted on | ||
// another instance added before | ||
if (segmentsAdded.contains(segmentName)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can reduce a lookup by
if (segmentsAdded.contains(segmentName)) { | |
if (!segmentsAdded.add(segmentName)) { |
// pick segments in round-robin fashion to parallelize | ||
// forceCommit across max servers | ||
for (Queue<String> queue : instanceToConsumingSegments.values()) { | ||
if (!queue.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can remove the queue when it is empty to avoid checking it again and again. You may use iterator to remove entry without extra lookup
|
||
try { | ||
Thread.sleep(FORCE_COMMIT_STATUS_CHECK_INTERVAL_MS); | ||
} catch (InterruptedException ignored) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ignoring interrupt could be risky (holding a long running thread). Let's wrap it as a RuntimeException
and throw it. We may log an error when catching it
} | ||
|
||
int attemptCount = 0; | ||
final Set<String>[] segmentsYetToBeCommitted = new Set[]{new HashSet<>()}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final Set<String>[] segmentsYetToBeCommitted = new Set[]{new HashSet<>()}; | |
final Set<String>[] segmentsYetToBeCommitted = new Set[1]; |
@@ -152,6 +157,9 @@ public class PinotLLCRealtimeSegmentManager { | |||
|
|||
// Max time to wait for all LLC segments to complete committing their metadata while stopping the controller. | |||
private static final long MAX_LLC_SEGMENT_METADATA_COMMIT_TIME_MILLIS = 30_000L; | |||
private static final int FORCE_COMMIT_STATUS_CHECK_INTERVAL_MS = 15000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's take the check interval also from the rest API because different use case might want different interval; we might also want to add a TIMEOUT and also take that from rest API. The retry count can be calculated from timeout and interval.
We can provide default values (e.g. 5s, 3min) for them in case they are not provided. IMO 15s interval is too long because it means for each batch we will wait at least 15s.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure.
(Default case (5s interval, 3m timeout) might be little expensive as we are waiting for segments build to complete and there will be 36 calls at max to ZK per batch)
Problem Statement
The Force Commit API can cause high ingestion lag and slower queries because it triggers the simultaneous segment commit for all consuming segments. This happens because:
Solution
Adds an additional optional Integer parameter:
batchSize
to the forceCommit API (Default Value = Integer.MAX_VALUE i.e. commit as many segments as possible at once if no batchSize is provided) .3 New query params added to the ForceCommit API:
batchSize (integer, optional):
Max number of consuming segments to commit at once.
Default:
Integer.MAX_VALUE
Example:
batchSize=100
batchStatusCheckIntervalSec (integer, optional):
How often (in seconds) to check whether the current batch of segments has been successfully committed.
Default:
5
Example:
batchStatusCheckIntervalSec=10
batchStatusCheckTimeoutSec (integer, optional):
Timeout (in seconds) after which the controller stops checking the forceCommit status and throws an exception.
Default:
180
Example:
batchStatusCheckTimeoutSec=300