Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updates forceCommit APIs to handle Pauseless #14828

Merged
merged 15 commits into from
Jan 24, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import io.swagger.annotations.SecurityDefinition;
import io.swagger.annotations.SwaggerDefinition;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
Expand Down Expand Up @@ -222,18 +221,29 @@ public JsonNode getForceCommitJobStatus(
String tableNameWithType = controllerJobZKMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE);
Set<String> consumingSegmentCommitted = JsonUtils.stringToObject(
controllerJobZKMetadata.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST), Set.class);
Set<String> onlineSegmentsForTable =
_pinotHelixResourceManager.getOnlineSegmentsFromIdealState(tableNameWithType, false);

Set<String> segmentsYetToBeCommitted = new HashSet<>();
consumingSegmentCommitted.forEach(segmentName -> {
if (!onlineSegmentsForTable.contains(segmentName)) {
segmentsYetToBeCommitted.add(segmentName);
}
});
Set<String> segmentsToCheck;
String segmentsPendingToBeComittedString =
controllerJobZKMetadata.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST);

if (segmentsPendingToBeComittedString != null) {
segmentsToCheck = JsonUtils.stringToObject(segmentsPendingToBeComittedString, Set.class);
} else {
segmentsToCheck = consumingSegmentCommitted;
}

Set<String> segmentsYetToBeCommitted =
Copy link
Contributor

@9aman 9aman Jan 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@noob-se7en iiuc the objective of introducing this field is to reduce the number of segmentZKMetadata that we will be iterating over.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it server any other purpose ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, Just to reduce the ZK calls only.

_pinotLLCRealtimeSegmentManager.getSegmentsYetToBeCommitted(tableNameWithType, segmentsToCheck);

controllerJobZKMetadata.put(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST,
JsonUtils.objectToString(segmentsYetToBeCommitted));

if (segmentsYetToBeCommitted.size() < segmentsToCheck.size()) {
_pinotHelixResourceManager.updateForceCommitJobMetadata(forceCommitJobId, segmentsYetToBeCommitted,
controllerJobZKMetadata);
}

Map<String, Object> result = new HashMap<>(controllerJobZKMetadata);
result.put("segmentsYetToBeCommitted", segmentsYetToBeCommitted);
result.put("numberOfSegmentsYetToBeCommitted", segmentsYetToBeCommitted.size());
return JsonUtils.objectToJsonNode(result);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2249,9 +2249,31 @@ public boolean addNewForceCommitJob(String tableNameWithType, String jobId, long
jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, Long.toString(jobSubmissionTimeMs));
jobMetadata.put(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST,
JsonUtils.objectToString(consumingSegmentsCommitted));
jobMetadata.put(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST,
JsonUtils.objectToString(consumingSegmentsCommitted));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(minor) This shouldn't be needed. Adding it will actually add overhead for one extra parsing

return addControllerJobToZK(jobId, jobMetadata, ControllerJobType.FORCE_COMMIT);
}

public void updateForceCommitJobMetadata(String forceCommitJobId, Set<String> segmentsYetToBeCommitted,
Map<String, String> controllerJobZKMetadata) {
addControllerJobToZK(forceCommitJobId,
controllerJobZKMetadata, ControllerJobType.FORCE_COMMIT, prevJobMetadata -> {
String existingSegmentsYetToBeCommittedString =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add some comments describing why we want to perform this check

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually this method is only useful for a rare edge case when two async forceCommitStatus APIs are running.
But we can remove this as it will save overhead for one extra parsing on each API call.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have removed this method

prevJobMetadata.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST);
if (existingSegmentsYetToBeCommittedString == null) {
return true;
}
try {
Set<String> existingSegmentsYetToBeCommitted =
JsonUtils.stringToObject(existingSegmentsYetToBeCommittedString, Set.class);
return segmentsYetToBeCommitted.size() < existingSegmentsYetToBeCommitted.size();
} catch (JsonProcessingException e) {
return false;
}
}
);
}

/**
* Adds a new job metadata for controller job like table rebalance or reload into ZK
* @param jobId job's UUID
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1889,4 +1889,19 @@ String moveSegmentFile(String rawTableName, String segmentName, String segmentLo
URI createSegmentPath(String rawTableName, String segmentName) {
return URIUtils.getUri(_controllerConf.getDataDir(), rawTableName, URIUtils.encode(segmentName));
}

public Set<String> getSegmentsYetToBeCommitted(String tableNameWithType, Set<String> segmentsToCheck) {
Set<String> segmentsYetToBeCommitted = new HashSet<>();
for (String segmentName: segmentsToCheck) {
SegmentZKMetadata segmentZKMetadata =
_helixResourceManager.getSegmentZKMetadata(tableNameWithType, segmentName);
if (segmentZKMetadata == null) {
continue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(minor) Add some comment about this behavior. We are counting deleted segment as not need to be committed

}
if (!segmentZKMetadata.getStatus().isCompleted()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot use isCompleted() here. We should explicitly check for it to be DONE

@9aman @KKcorps Currently COMMITTING is also count as completed, is this expected?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isCompleted should return true only when the stratus is Uploaded or Done. Raising a PR along with few minor improvement wrt pauseless,

segmentsYetToBeCommitted.add(segmentName);
}
}
return segmentsYetToBeCommitted;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.HashUtil;
Expand Down Expand Up @@ -427,6 +428,14 @@ public void testForceCommit()
throws Exception {
Set<String> consumingSegments = getConsumingSegmentsFromIdealState(getTableName() + "_REALTIME");
String jobId = forceCommit(getTableName());
Map<String, String> jobMetadata =
_helixResourceManager.getControllerJobZKMetadata(jobId, ControllerJobType.FORCE_COMMIT);
assert jobMetadata != null;
assert jobMetadata.get("segmentsForceCommitted") != null;
assert jobMetadata.get("segmentsYetToBeCommitted") != null;
Set<String> allSegments = JsonUtils.stringToObject(jobMetadata.get("segmentsForceCommitted"), HashSet.class);
Set<String> segmentsPending = JsonUtils.stringToObject(jobMetadata.get("segmentsYetToBeCommitted"), HashSet.class);
assert segmentsPending.size() <= allSegments.size();

TestUtils.waitForCondition(aVoid -> {
try {
Expand Down Expand Up @@ -462,6 +471,16 @@ public boolean isForceCommitJobCompleted(String forceCommitJobId)

assertEquals(jobStatus.get("jobId").asText(), forceCommitJobId);
assertEquals(jobStatus.get("jobType").asText(), "FORCE_COMMIT");

assert jobStatus.get("segmentsForceCommitted") != null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use CommonConstants variable rather than hardcode

assert jobStatus.get("segmentsYetToBeCommitted") != null;

Set<String> allSegments = JsonUtils.stringToObject(jobStatus.get("segmentsForceCommitted").asText(), HashSet.class);
Set<String> segmentsPending =
JsonUtils.stringToObject(jobStatus.get("segmentsYetToBeCommitted").asText(), HashSet.class);
assert segmentsPending.size() <= allSegments.size();
assert jobStatus.get("numberOfSegmentsYetToBeCommitted").asInt(-1) == segmentsPending.size();

return jobStatus.get("numberOfSegmentsYetToBeCommitted").asInt(-1) == 0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -994,6 +994,7 @@ public static class ControllerJob {
public static final String SEGMENT_RELOAD_JOB_INSTANCE_NAME = "instanceName";
// Force commit job ZK props
public static final String CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST = "segmentsForceCommitted";
public static final String CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST = "segmentsYetToBeCommitted";
}

// prefix for scheduler related features, e.g. query accountant
Expand Down
Loading