Skip to content

Commit

Permalink
Updates forceCommit APIs to handle Pauseless (#14828)
Browse files Browse the repository at this point in the history
  • Loading branch information
noob-se7en authored Jan 24, 2025
1 parent e0ec848 commit c9e08f3
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import io.swagger.annotations.SecurityDefinition;
import io.swagger.annotations.SwaggerDefinition;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
Expand Down Expand Up @@ -222,15 +221,26 @@ public JsonNode getForceCommitJobStatus(
String tableNameWithType = controllerJobZKMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE);
Set<String> consumingSegmentCommitted = JsonUtils.stringToObject(
controllerJobZKMetadata.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST), Set.class);
Set<String> onlineSegmentsForTable =
_pinotHelixResourceManager.getOnlineSegmentsFromIdealState(tableNameWithType, false);

Set<String> segmentsYetToBeCommitted = new HashSet<>();
consumingSegmentCommitted.forEach(segmentName -> {
if (!onlineSegmentsForTable.contains(segmentName)) {
segmentsYetToBeCommitted.add(segmentName);
}
});
Set<String> segmentsToCheck;
String segmentsPendingToBeComittedString =
controllerJobZKMetadata.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST);

if (segmentsPendingToBeComittedString != null) {
segmentsToCheck = JsonUtils.stringToObject(segmentsPendingToBeComittedString, Set.class);
} else {
segmentsToCheck = consumingSegmentCommitted;
}

Set<String> segmentsYetToBeCommitted =
_pinotLLCRealtimeSegmentManager.getSegmentsYetToBeCommitted(tableNameWithType, segmentsToCheck);

if (segmentsYetToBeCommitted.size() < segmentsToCheck.size()) {
controllerJobZKMetadata.put(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST,
JsonUtils.objectToString(segmentsYetToBeCommitted));
_pinotHelixResourceManager.addControllerJobToZK(forceCommitJobId, controllerJobZKMetadata,
ControllerJobType.FORCE_COMMIT, prev -> true);
}

Map<String, Object> result = new HashMap<>(controllerJobZKMetadata);
result.put("segmentsYetToBeCommitted", segmentsYetToBeCommitted);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2009,4 +2009,20 @@ String moveSegmentFile(String rawTableName, String segmentName, String segmentLo
URI createSegmentPath(String rawTableName, String segmentName) {
return URIUtils.getUri(_controllerConf.getDataDir(), rawTableName, URIUtils.encode(segmentName));
}

public Set<String> getSegmentsYetToBeCommitted(String tableNameWithType, Set<String> segmentsToCheck) {
Set<String> segmentsYetToBeCommitted = new HashSet<>();
for (String segmentName: segmentsToCheck) {
SegmentZKMetadata segmentZKMetadata =
_helixResourceManager.getSegmentZKMetadata(tableNameWithType, segmentName);
if (segmentZKMetadata == null) {
// Segment is deleted. No need to track this segment among segments yetToBeCommitted.
continue;
}
if (!(segmentZKMetadata.getStatus().equals(Status.DONE))) {
segmentsYetToBeCommitted.add(segmentName);
}
}
return segmentsYetToBeCommitted;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.controller.helix.core.realtime;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -1247,6 +1248,36 @@ public void testGetPartitionIds()
Assert.assertEquals(partitionIds.size(), 2);
}

@Test
public void getSegmentsYetToBeCommitted() {
PinotHelixResourceManager mockHelixResourceManager = mock(PinotHelixResourceManager.class);
FakePinotLLCRealtimeSegmentManager realtimeSegmentManager =
new FakePinotLLCRealtimeSegmentManager(mockHelixResourceManager);

SegmentZKMetadata mockSegmentZKMetadataDone = mock(SegmentZKMetadata.class);
when(mockSegmentZKMetadataDone.getStatus()).thenReturn(Status.DONE);

SegmentZKMetadata mockSegmentZKMetadataUploaded = mock(SegmentZKMetadata.class);
when(mockSegmentZKMetadataUploaded.getStatus()).thenReturn(Status.UPLOADED);

SegmentZKMetadata mockSegmentZKMetadataInProgress = mock(SegmentZKMetadata.class);
when(mockSegmentZKMetadataInProgress.getStatus()).thenReturn(Status.IN_PROGRESS);

SegmentZKMetadata mockSegmentZKMetadataInCommitting = mock(SegmentZKMetadata.class);
when(mockSegmentZKMetadataInCommitting.getStatus()).thenReturn(Status.COMMITTING);

when(mockHelixResourceManager.getSegmentZKMetadata("test", "s0")).thenReturn(mockSegmentZKMetadataDone);
when(mockHelixResourceManager.getSegmentZKMetadata("test", "s3")).thenReturn(mockSegmentZKMetadataDone);
when(mockHelixResourceManager.getSegmentZKMetadata("test", "s2")).thenReturn(mockSegmentZKMetadataUploaded);
when(mockHelixResourceManager.getSegmentZKMetadata("test", "s4")).thenReturn(mockSegmentZKMetadataInProgress);
when(mockHelixResourceManager.getSegmentZKMetadata("test", "s1")).thenReturn(null);
when(mockHelixResourceManager.getSegmentZKMetadata("test", "s5")).thenReturn(mockSegmentZKMetadataInCommitting);

Set<String> segmentsToCheck = ImmutableSet.of("s0", "s1", "s2", "s3", "s4", "s5");
Set<String> segmentsYetToBeCommitted = realtimeSegmentManager.getSegmentsYetToBeCommitted("test", segmentsToCheck);
assert ImmutableSet.of("s2", "s4", "s5").equals(segmentsYetToBeCommitted);
}

//////////////////////////////////////////////////////////////////////////////////
// Fake classes
/////////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.HashUtil;
Expand Down Expand Up @@ -427,12 +428,18 @@ public void testForceCommit()
throws Exception {
Set<String> consumingSegments = getConsumingSegmentsFromIdealState(getTableName() + "_REALTIME");
String jobId = forceCommit(getTableName());
Map<String, String> jobMetadata =
_helixResourceManager.getControllerJobZKMetadata(jobId, ControllerJobType.FORCE_COMMIT);
assert jobMetadata != null;
assert jobMetadata.get("segmentsForceCommitted") != null;

TestUtils.waitForCondition(aVoid -> {
try {
if (isForceCommitJobCompleted(jobId)) {
assertTrue(_controllerStarter.getHelixResourceManager()
.getOnlineSegmentsFromIdealState(getTableName() + "_REALTIME", false).containsAll(consumingSegments));
for (String segmentName : consumingSegments) {
assertEquals(CommonConstants.Segment.Realtime.Status.DONE, _controllerStarter.getHelixResourceManager()
.getSegmentZKMetadata(getTableName() + "_REALTIME", segmentName).getStatus());
}
return true;
}
return false;
Expand Down Expand Up @@ -462,6 +469,20 @@ public boolean isForceCommitJobCompleted(String forceCommitJobId)

assertEquals(jobStatus.get("jobId").asText(), forceCommitJobId);
assertEquals(jobStatus.get("jobType").asText(), "FORCE_COMMIT");

assert jobStatus.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST) != null;
assert jobStatus.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST) != null;

Set<String> allSegments = JsonUtils.stringToObject(
jobStatus.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST).asText(), HashSet.class);
Set<String> segmentsPending = new HashSet<>();
for (JsonNode element : jobStatus.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST)) {
segmentsPending.add(element.asText());
}

assert segmentsPending.size() <= allSegments.size();
assert jobStatus.get("numberOfSegmentsYetToBeCommitted").asInt(-1) == segmentsPending.size();

return jobStatus.get("numberOfSegmentsYetToBeCommitted").asInt(-1) == 0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.HashUtil;
Expand Down Expand Up @@ -395,12 +396,18 @@ public void testForceCommit()
throws Exception {
Set<String> consumingSegments = getConsumingSegmentsFromIdealState(getTableName() + "_REALTIME");
String jobId = forceCommit(getTableName());
Map<String, String> jobMetadata =
_helixResourceManager.getControllerJobZKMetadata(jobId, ControllerJobType.FORCE_COMMIT);
assert jobMetadata != null;
assert jobMetadata.get("segmentsForceCommitted") != null;

TestUtils.waitForCondition(aVoid -> {
try {
if (isForceCommitJobCompleted(jobId)) {
assertTrue(_controllerStarter.getHelixResourceManager()
.getOnlineSegmentsFromIdealState(getTableName() + "_REALTIME", false).containsAll(consumingSegments));
for (String segmentName : consumingSegments) {
assertEquals(CommonConstants.Segment.Realtime.Status.DONE, _controllerStarter.getHelixResourceManager()
.getSegmentZKMetadata(getTableName() + "_REALTIME", segmentName).getStatus());
}
return true;
}
return false;
Expand Down Expand Up @@ -430,6 +437,20 @@ public boolean isForceCommitJobCompleted(String forceCommitJobId)

assertEquals(jobStatus.get("jobId").asText(), forceCommitJobId);
assertEquals(jobStatus.get("jobType").asText(), "FORCE_COMMIT");

assert jobStatus.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST) != null;
assert jobStatus.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST) != null;

Set<String> allSegments = JsonUtils.stringToObject(
jobStatus.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST).asText(), HashSet.class);
Set<String> segmentsPending = new HashSet<>();
for (JsonNode element : jobStatus.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST)) {
segmentsPending.add(element.asText());
}

assert segmentsPending.size() <= allSegments.size();
assert jobStatus.get("numberOfSegmentsYetToBeCommitted").asInt(-1) == segmentsPending.size();

return jobStatus.get("numberOfSegmentsYetToBeCommitted").asInt(-1) == 0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1039,6 +1039,7 @@ public static class ControllerJob {
public static final String SEGMENT_RELOAD_JOB_INSTANCE_NAME = "instanceName";
// Force commit job ZK props
public static final String CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST = "segmentsForceCommitted";
public static final String CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST = "segmentsYetToBeCommitted";
}

// prefix for scheduler related features, e.g. query accountant
Expand Down

0 comments on commit c9e08f3

Please sign in to comment.