Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updates forceCommit APIs to handle Pauseless #14828

Merged
merged 15 commits into from
Jan 24, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import io.swagger.annotations.SecurityDefinition;
import io.swagger.annotations.SwaggerDefinition;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
Expand Down Expand Up @@ -222,15 +221,26 @@ public JsonNode getForceCommitJobStatus(
String tableNameWithType = controllerJobZKMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE);
Set<String> consumingSegmentCommitted = JsonUtils.stringToObject(
controllerJobZKMetadata.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST), Set.class);
Set<String> onlineSegmentsForTable =
_pinotHelixResourceManager.getOnlineSegmentsFromIdealState(tableNameWithType, false);

Set<String> segmentsYetToBeCommitted = new HashSet<>();
consumingSegmentCommitted.forEach(segmentName -> {
if (!onlineSegmentsForTable.contains(segmentName)) {
segmentsYetToBeCommitted.add(segmentName);
}
});
Set<String> segmentsToCheck;
String segmentsPendingToBeComittedString =
controllerJobZKMetadata.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST);

if (segmentsPendingToBeComittedString != null) {
segmentsToCheck = JsonUtils.stringToObject(segmentsPendingToBeComittedString, Set.class);
} else {
segmentsToCheck = consumingSegmentCommitted;
}

Set<String> segmentsYetToBeCommitted =
Copy link
Contributor

@9aman 9aman Jan 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@noob-se7en iiuc the objective of introducing this field is to reduce the number of segmentZKMetadata that we will be iterating over.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it server any other purpose ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, Just to reduce the ZK calls only.

_pinotLLCRealtimeSegmentManager.getSegmentsYetToBeCommitted(tableNameWithType, segmentsToCheck);

if (segmentsYetToBeCommitted.size() < segmentsToCheck.size()) {
controllerJobZKMetadata.put(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST,
JsonUtils.objectToString(segmentsYetToBeCommitted));
_pinotHelixResourceManager.addControllerJobToZK(forceCommitJobId, controllerJobZKMetadata,
ControllerJobType.FORCE_COMMIT, prev -> true);
}

Map<String, Object> result = new HashMap<>(controllerJobZKMetadata);
result.put("segmentsYetToBeCommitted", segmentsYetToBeCommitted);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2009,4 +2009,20 @@ String moveSegmentFile(String rawTableName, String segmentName, String segmentLo
URI createSegmentPath(String rawTableName, String segmentName) {
return URIUtils.getUri(_controllerConf.getDataDir(), rawTableName, URIUtils.encode(segmentName));
}

public Set<String> getSegmentsYetToBeCommitted(String tableNameWithType, Set<String> segmentsToCheck) {
Set<String> segmentsYetToBeCommitted = new HashSet<>();
for (String segmentName: segmentsToCheck) {
SegmentZKMetadata segmentZKMetadata =
_helixResourceManager.getSegmentZKMetadata(tableNameWithType, segmentName);
if (segmentZKMetadata == null) {
// Segment is deleted. No need to track this segment among segments yetToBeCommitted.
continue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(minor) Add some comment about this behavior. We are counting deleted segment as not need to be committed

}
if (!(segmentZKMetadata.getStatus().equals(Status.DONE))) {
segmentsYetToBeCommitted.add(segmentName);
}
}
return segmentsYetToBeCommitted;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.controller.helix.core.realtime;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -1247,6 +1248,36 @@ public void testGetPartitionIds()
Assert.assertEquals(partitionIds.size(), 2);
}

@Test
public void getSegmentsYetToBeCommitted() {
PinotHelixResourceManager mockHelixResourceManager = mock(PinotHelixResourceManager.class);
FakePinotLLCRealtimeSegmentManager realtimeSegmentManager =
new FakePinotLLCRealtimeSegmentManager(mockHelixResourceManager);

SegmentZKMetadata mockSegmentZKMetadataDone = mock(SegmentZKMetadata.class);
when(mockSegmentZKMetadataDone.getStatus()).thenReturn(Status.DONE);

SegmentZKMetadata mockSegmentZKMetadataUploaded = mock(SegmentZKMetadata.class);
when(mockSegmentZKMetadataUploaded.getStatus()).thenReturn(Status.UPLOADED);

SegmentZKMetadata mockSegmentZKMetadataInProgress = mock(SegmentZKMetadata.class);
when(mockSegmentZKMetadataInProgress.getStatus()).thenReturn(Status.IN_PROGRESS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also add one for Status.COMMITTING


SegmentZKMetadata mockSegmentZKMetadataInCommitting = mock(SegmentZKMetadata.class);
when(mockSegmentZKMetadataInCommitting.getStatus()).thenReturn(Status.COMMITTING);

when(mockHelixResourceManager.getSegmentZKMetadata("test", "s0")).thenReturn(mockSegmentZKMetadataDone);
when(mockHelixResourceManager.getSegmentZKMetadata("test", "s3")).thenReturn(mockSegmentZKMetadataDone);
when(mockHelixResourceManager.getSegmentZKMetadata("test", "s2")).thenReturn(mockSegmentZKMetadataUploaded);
when(mockHelixResourceManager.getSegmentZKMetadata("test", "s4")).thenReturn(mockSegmentZKMetadataInProgress);
when(mockHelixResourceManager.getSegmentZKMetadata("test", "s1")).thenReturn(null);
when(mockHelixResourceManager.getSegmentZKMetadata("test", "s5")).thenReturn(mockSegmentZKMetadataInCommitting);

Set<String> segmentsToCheck = ImmutableSet.of("s0", "s1", "s2", "s3", "s4", "s5");
Set<String> segmentsYetToBeCommitted = realtimeSegmentManager.getSegmentsYetToBeCommitted("test", segmentsToCheck);
assert ImmutableSet.of("s2", "s4", "s5").equals(segmentsYetToBeCommitted);
}

//////////////////////////////////////////////////////////////////////////////////
// Fake classes
/////////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.HashUtil;
Expand Down Expand Up @@ -427,6 +428,10 @@ public void testForceCommit()
throws Exception {
Set<String> consumingSegments = getConsumingSegmentsFromIdealState(getTableName() + "_REALTIME");
String jobId = forceCommit(getTableName());
Map<String, String> jobMetadata =
_helixResourceManager.getControllerJobZKMetadata(jobId, ControllerJobType.FORCE_COMMIT);
assert jobMetadata != null;
assert jobMetadata.get("segmentsForceCommitted") != null;

TestUtils.waitForCondition(aVoid -> {
try {
Expand Down Expand Up @@ -462,6 +467,20 @@ public boolean isForceCommitJobCompleted(String forceCommitJobId)

assertEquals(jobStatus.get("jobId").asText(), forceCommitJobId);
assertEquals(jobStatus.get("jobType").asText(), "FORCE_COMMIT");

assert jobStatus.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST) != null;
assert jobStatus.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST) != null;

Set<String> allSegments = JsonUtils.stringToObject(
jobStatus.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST).asText(), HashSet.class);
Set<String> segmentsPending = new HashSet<>();
for (JsonNode element : jobStatus.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST)) {
segmentsPending.add(element.asText());
}

assert segmentsPending.size() <= allSegments.size();
assert jobStatus.get("numberOfSegmentsYetToBeCommitted").asInt(-1) == segmentsPending.size();

return jobStatus.get("numberOfSegmentsYetToBeCommitted").asInt(-1) == 0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1030,6 +1030,7 @@ public static class ControllerJob {
public static final String SEGMENT_RELOAD_JOB_INSTANCE_NAME = "instanceName";
// Force commit job ZK props
public static final String CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST = "segmentsForceCommitted";
public static final String CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST = "segmentsYetToBeCommitted";
}

// prefix for scheduler related features, e.g. query accountant
Expand Down
Loading