-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Supports Force Committing Segments in Batches #14811
base: master
Are you sure you want to change the base?
Changes from 51 commits
eeb5be1
5f5a554
ca5104a
434e8a3
504f3c9
987bb00
ff25c5f
e28ff47
99a7cee
255bc34
3a9e41a
b2eeb85
1782207
90db3b8
fa418b9
470c6eb
8de7bfc
4f2d4fc
50af02e
09d557e
32b7fd5
e334983
1aecc5a
5be2722
153a897
430127d
f20948e
5012b5f
c2312d2
49474f5
ab2220f
ed90f11
e88aa2a
8c8d8d3
1be0316
55fa6e2
3297ddd
748d0d3
095acc0
36360b8
d3d42ca
262bee0
68cdc26
9f833c6
f5d68ae
bffab6d
a1079c2
b8a2e7f
5730a06
c8565d6
165e7ab
0cab772
de04824
b95a2f6
857dd6a
2f7e5d9
0b64439
9e3ddad
01604e9
bb84ae2
71f4ee1
2408d13
ff67929
80dda07
11299f4
ad7aec0
7ea5535
5ea7c3f
6907c8f
2a61ce4
445efbc
444fc49
e7ab323
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -31,8 +31,10 @@ | |||||
import java.util.Collections; | ||||||
import java.util.HashMap; | ||||||
import java.util.HashSet; | ||||||
import java.util.LinkedList; | ||||||
import java.util.List; | ||||||
import java.util.Map; | ||||||
import java.util.Queue; | ||||||
import java.util.Random; | ||||||
import java.util.Set; | ||||||
import java.util.TreeSet; | ||||||
|
@@ -117,7 +119,10 @@ | |||||
import org.apache.pinot.spi.utils.StringUtil; | ||||||
import org.apache.pinot.spi.utils.TimeUtils; | ||||||
import org.apache.pinot.spi.utils.builder.TableNameBuilder; | ||||||
import org.apache.pinot.spi.utils.retry.AttemptsExceededException; | ||||||
import org.apache.pinot.spi.utils.retry.RetriableOperationException; | ||||||
import org.apache.pinot.spi.utils.retry.RetryPolicies; | ||||||
import org.apache.pinot.spi.utils.retry.RetryPolicy; | ||||||
import org.apache.zookeeper.data.Stat; | ||||||
import org.slf4j.Logger; | ||||||
import org.slf4j.LoggerFactory; | ||||||
|
@@ -152,6 +157,9 @@ public class PinotLLCRealtimeSegmentManager { | |||||
|
||||||
// Max time to wait for all LLC segments to complete committing their metadata while stopping the controller. | ||||||
private static final long MAX_LLC_SEGMENT_METADATA_COMMIT_TIME_MILLIS = 30_000L; | ||||||
private static final int FORCE_COMMIT_STATUS_CHECK_INTERVAL_MS = 15000; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's take the check interval also from the rest API because different use case might want different interval; we might also want to add a TIMEOUT and also take that from rest API. The retry count can be calculated from timeout and interval. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sure. |
||||||
private static final RetryPolicy DEFAULT_RETRY_POLICY = | ||||||
RetryPolicies.fixedDelayRetryPolicy(10, FORCE_COMMIT_STATUS_CHECK_INTERVAL_MS); | ||||||
|
||||||
// TODO: make this configurable with default set to 10 | ||||||
/** | ||||||
|
@@ -1848,15 +1856,114 @@ private boolean isTmpAndCanDelete(String filePath, Set<String> downloadUrls, Pin | |||||
* @return the set of consuming segments for which commit was initiated | ||||||
*/ | ||||||
public Set<String> forceCommit(String tableNameWithType, @Nullable String partitionGroupIdsToCommit, | ||||||
@Nullable String segmentsToCommit) { | ||||||
@Nullable String segmentsToCommit, int batchSize) { | ||||||
IdealState idealState = getIdealState(tableNameWithType); | ||||||
Set<String> allConsumingSegments = findConsumingSegments(idealState); | ||||||
Set<String> targetConsumingSegments = filterSegmentsToCommit(allConsumingSegments, partitionGroupIdsToCommit, | ||||||
segmentsToCommit); | ||||||
sendForceCommitMessageToServers(tableNameWithType, targetConsumingSegments); | ||||||
|
||||||
List<Set<String>> segmentBatchList = getSegmentBatchList(idealState, targetConsumingSegments, batchSize); | ||||||
ExecutorService executorService = Executors.newFixedThreadPool(1); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do not create new thread pool on every request There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ohh right! |
||||||
|
||||||
try { | ||||||
for (Set<String> segmentBatchToCommit : segmentBatchList) { | ||||||
executorService.submit(() -> executeBatch(tableNameWithType, segmentBatchToCommit)); | ||||||
} | ||||||
} finally { | ||||||
executorService.shutdown(); | ||||||
} | ||||||
|
||||||
return targetConsumingSegments; | ||||||
} | ||||||
|
||||||
private void executeBatch(String tableNameWithType, Set<String> segmentBatchToCommit) { | ||||||
sendForceCommitMessageToServers(tableNameWithType, segmentBatchToCommit); | ||||||
|
||||||
try { | ||||||
Thread.sleep(FORCE_COMMIT_STATUS_CHECK_INTERVAL_MS); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why sleep at start? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This sleep is to save zookeeper calls, as at this point of code it's almost certain that all segments are yet to be committed. |
||||||
} catch (InterruptedException ignored) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ignoring interrupt could be risky (holding a long running thread). Let's wrap it as a |
||||||
} | ||||||
|
||||||
int attemptCount = 0; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No need for this variable. Both AttemptsExceededException and RetriableOperationException have getAttempts method. |
||||||
final Set<String>[] segmentsYetToBeCommitted = new Set[]{new HashSet<>()}; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
try { | ||||||
attemptCount = DEFAULT_RETRY_POLICY.attempt(() -> { | ||||||
segmentsYetToBeCommitted[0] = getSegmentsYetToBeCommitted(tableNameWithType, segmentBatchToCommit); | ||||||
return segmentsYetToBeCommitted[0].isEmpty(); | ||||||
}); | ||||||
} catch (AttemptsExceededException | RetriableOperationException e) { | ||||||
String errorMsg = String.format( | ||||||
"Exception occurred while executing the forceCommit batch of segments: %s, attempt count: %d, " | ||||||
+ "segmentsYetToBeCommitted: %s", | ||||||
segmentBatchToCommit, attemptCount, segmentsYetToBeCommitted[0]); | ||||||
LOGGER.error(errorMsg, e); | ||||||
throw new RuntimeException(e); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If an exception is thrown, there's no need to log. Add the errorMsg to the runtime exception. |
||||||
} | ||||||
} | ||||||
|
||||||
@VisibleForTesting | ||||||
List<Set<String>> getSegmentBatchList(IdealState idealState, Set<String> targetConsumingSegments, | ||||||
int batchSize) { | ||||||
Map<String, Queue<String>> instanceToConsumingSegments = | ||||||
getInstanceToConsumingSegments(idealState, targetConsumingSegments); | ||||||
|
||||||
List<Set<String>> segmentBatchList = new ArrayList<>(); | ||||||
Set<String> currentBatch = new HashSet<>(); | ||||||
Set<String> segmentsAdded = new HashSet<>(); | ||||||
boolean segmentsRemaining = true; | ||||||
|
||||||
while (segmentsRemaining) { | ||||||
segmentsRemaining = false; | ||||||
// pick segments in round-robin fashion to parallelize | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Smart! |
||||||
// forceCommit across max servers | ||||||
for (Queue<String> queue : instanceToConsumingSegments.values()) { | ||||||
if (!queue.isEmpty()) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can remove the queue when it is empty to avoid checking it again and again. You may use iterator to remove entry without extra lookup |
||||||
segmentsRemaining = true; | ||||||
String segmentName = queue.poll(); | ||||||
// there might be a segment replica hosted on | ||||||
// another instance added before | ||||||
if (segmentsAdded.contains(segmentName)) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can reduce a lookup by
Suggested change
|
||||||
continue; | ||||||
} | ||||||
currentBatch.add(segmentName); | ||||||
segmentsAdded.add(segmentName); | ||||||
if (currentBatch.size() == batchSize) { | ||||||
segmentBatchList.add(currentBatch); | ||||||
currentBatch = new HashSet<>(); | ||||||
} | ||||||
} | ||||||
} | ||||||
} | ||||||
|
||||||
if (!currentBatch.isEmpty()) { | ||||||
segmentBatchList.add(currentBatch); | ||||||
} | ||||||
return segmentBatchList; | ||||||
} | ||||||
|
||||||
@VisibleForTesting | ||||||
Map<String, Queue<String>> getInstanceToConsumingSegments(IdealState idealState, | ||||||
Set<String> targetConsumingSegments) { | ||||||
Map<String, Queue<String>> instanceToConsumingSegments = new HashMap<>(); | ||||||
|
||||||
Map<String, Map<String, String>> segmentNameToInstanceToStateMap = idealState.getRecord().getMapFields(); | ||||||
for (String segmentName : segmentNameToInstanceToStateMap.keySet()) { | ||||||
if (!targetConsumingSegments.contains(segmentName)) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's loop over |
||||||
continue; | ||||||
} | ||||||
Map<String, String> instanceToStateMap = segmentNameToInstanceToStateMap.get(segmentName); | ||||||
for (String instance : instanceToStateMap.keySet()) { | ||||||
String state = instanceToStateMap.get(instance); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use |
||||||
if (state.equals(SegmentStateModel.CONSUMING)) { | ||||||
instanceToConsumingSegments.putIfAbsent(instance, new LinkedList<>()); | ||||||
instanceToConsumingSegments.get(instance).add(segmentName); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can simply this by using |
||||||
} | ||||||
} | ||||||
} | ||||||
|
||||||
return instanceToConsumingSegments; | ||||||
} | ||||||
|
||||||
/** | ||||||
* Among all consuming segments, filter the ones that are in the given partitions or segments. | ||||||
*/ | ||||||
|
@@ -2009,4 +2116,20 @@ String moveSegmentFile(String rawTableName, String segmentName, String segmentLo | |||||
URI createSegmentPath(String rawTableName, String segmentName) { | ||||||
return URIUtils.getUri(_controllerConf.getDataDir(), rawTableName, URIUtils.encode(segmentName)); | ||||||
} | ||||||
|
||||||
public Set<String> getSegmentsYetToBeCommitted(String tableNameWithType, Set<String> segmentsToCheck) { | ||||||
Set<String> segmentsYetToBeCommitted = new HashSet<>(); | ||||||
for (String segmentName: segmentsToCheck) { | ||||||
SegmentZKMetadata segmentZKMetadata = | ||||||
_helixResourceManager.getSegmentZKMetadata(tableNameWithType, segmentName); | ||||||
if (segmentZKMetadata == null) { | ||||||
// Segment is deleted. No need to track this segment among segments yetToBeCommitted. | ||||||
continue; | ||||||
} | ||||||
if (!(segmentZKMetadata.getStatus().equals(Status.DONE))) { | ||||||
segmentsYetToBeCommitted.add(segmentName); | ||||||
} | ||||||
} | ||||||
return segmentsYetToBeCommitted; | ||||||
} | ||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mention that default is all consuming segments at once in description