-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Updates forceCommit APIs to handle Pauseless #14828
Changes from 5 commits
ab2220f
ed90f11
e88aa2a
8c8d8d3
1be0316
095acc0
36360b8
68cdc26
b8a2e7f
5730a06
c8565d6
857dd6a
2f7e5d9
0b64439
9e3ddad
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2249,9 +2249,31 @@ public boolean addNewForceCommitJob(String tableNameWithType, String jobId, long | |
jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, Long.toString(jobSubmissionTimeMs)); | ||
jobMetadata.put(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST, | ||
JsonUtils.objectToString(consumingSegmentsCommitted)); | ||
jobMetadata.put(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST, | ||
JsonUtils.objectToString(consumingSegmentsCommitted)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (minor) This shouldn't be needed. Adding it will actually add overhead for one extra parsing |
||
return addControllerJobToZK(jobId, jobMetadata, ControllerJobType.FORCE_COMMIT); | ||
} | ||
|
||
public void updateForceCommitJobMetadata(String forceCommitJobId, Set<String> segmentsYetToBeCommitted, | ||
Map<String, String> controllerJobZKMetadata) { | ||
addControllerJobToZK(forceCommitJobId, | ||
controllerJobZKMetadata, ControllerJobType.FORCE_COMMIT, prevJobMetadata -> { | ||
String existingSegmentsYetToBeCommittedString = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add some comments describing why we want to perform this check There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually this method is only useful for a rare edge case when two async forceCommitStatus APIs are running. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. have removed this method |
||
prevJobMetadata.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST); | ||
if (existingSegmentsYetToBeCommittedString == null) { | ||
return true; | ||
} | ||
try { | ||
Set<String> existingSegmentsYetToBeCommitted = | ||
JsonUtils.stringToObject(existingSegmentsYetToBeCommittedString, Set.class); | ||
return segmentsYetToBeCommitted.size() < existingSegmentsYetToBeCommitted.size(); | ||
} catch (JsonProcessingException e) { | ||
return false; | ||
} | ||
} | ||
); | ||
} | ||
|
||
/** | ||
* Adds a new job metadata for controller job like table rebalance or reload into ZK | ||
* @param jobId job's UUID | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1889,4 +1889,19 @@ String moveSegmentFile(String rawTableName, String segmentName, String segmentLo | |
URI createSegmentPath(String rawTableName, String segmentName) { | ||
return URIUtils.getUri(_controllerConf.getDataDir(), rawTableName, URIUtils.encode(segmentName)); | ||
} | ||
|
||
public Set<String> getSegmentsYetToBeCommitted(String tableNameWithType, Set<String> segmentsToCheck) { | ||
Set<String> segmentsYetToBeCommitted = new HashSet<>(); | ||
for (String segmentName: segmentsToCheck) { | ||
SegmentZKMetadata segmentZKMetadata = | ||
_helixResourceManager.getSegmentZKMetadata(tableNameWithType, segmentName); | ||
if (segmentZKMetadata == null) { | ||
continue; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (minor) Add some comment about this behavior. We are counting deleted segment as not need to be committed |
||
} | ||
if (!segmentZKMetadata.getStatus().isCompleted()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. isCompleted should return true only when the stratus is Uploaded or Done. Raising a PR along with few minor improvement wrt pauseless, |
||
segmentsYetToBeCommitted.add(segmentName); | ||
} | ||
} | ||
return segmentsYetToBeCommitted; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,6 +41,7 @@ | |
import org.apache.helix.model.ExternalView; | ||
import org.apache.helix.model.IdealState; | ||
import org.apache.pinot.common.metadata.ZKMetadataProvider; | ||
import org.apache.pinot.common.metadata.controllerjob.ControllerJobType; | ||
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; | ||
import org.apache.pinot.common.utils.FileUploadDownloadClient; | ||
import org.apache.pinot.common.utils.HashUtil; | ||
|
@@ -427,6 +428,14 @@ public void testForceCommit() | |
throws Exception { | ||
Set<String> consumingSegments = getConsumingSegmentsFromIdealState(getTableName() + "_REALTIME"); | ||
String jobId = forceCommit(getTableName()); | ||
Map<String, String> jobMetadata = | ||
_helixResourceManager.getControllerJobZKMetadata(jobId, ControllerJobType.FORCE_COMMIT); | ||
assert jobMetadata != null; | ||
assert jobMetadata.get("segmentsForceCommitted") != null; | ||
assert jobMetadata.get("segmentsYetToBeCommitted") != null; | ||
Set<String> allSegments = JsonUtils.stringToObject(jobMetadata.get("segmentsForceCommitted"), HashSet.class); | ||
Set<String> segmentsPending = JsonUtils.stringToObject(jobMetadata.get("segmentsYetToBeCommitted"), HashSet.class); | ||
assert segmentsPending.size() <= allSegments.size(); | ||
|
||
TestUtils.waitForCondition(aVoid -> { | ||
try { | ||
|
@@ -462,6 +471,16 @@ public boolean isForceCommitJobCompleted(String forceCommitJobId) | |
|
||
assertEquals(jobStatus.get("jobId").asText(), forceCommitJobId); | ||
assertEquals(jobStatus.get("jobType").asText(), "FORCE_COMMIT"); | ||
|
||
assert jobStatus.get("segmentsForceCommitted") != null; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. use CommonConstants variable rather than hardcode |
||
assert jobStatus.get("segmentsYetToBeCommitted") != null; | ||
|
||
Set<String> allSegments = JsonUtils.stringToObject(jobStatus.get("segmentsForceCommitted").asText(), HashSet.class); | ||
Set<String> segmentsPending = | ||
JsonUtils.stringToObject(jobStatus.get("segmentsYetToBeCommitted").asText(), HashSet.class); | ||
assert segmentsPending.size() <= allSegments.size(); | ||
assert jobStatus.get("numberOfSegmentsYetToBeCommitted").asInt(-1) == segmentsPending.size(); | ||
|
||
return jobStatus.get("numberOfSegmentsYetToBeCommitted").asInt(-1) == 0; | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@noob-se7en iiuc the objective of introducing this field is to reduce the number of segmentZKMetadata that we will be iterating over.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it server any other purpose ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, Just to reduce the ZK calls only.