From ab2220ff60aceee69b9649bd2f844e5b1d7cb098 Mon Sep 17 00:00:00 2001 From: Harnoor7 Date: Thu, 16 Jan 2025 17:04:31 +0530 Subject: [PATCH 01/13] Updates foceCommit API to handle Pauseless --- .../resources/PinotRealtimeTableResource.java | 40 ++++++++++++++----- .../helix/core/PinotHelixResourceManager.java | 2 + .../PinotLLCRealtimeSegmentManager.java | 15 +++++++ .../pinot/spi/utils/CommonConstants.java | 1 + 4 files changed, 48 insertions(+), 10 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java index 2ab15427f754..e74a76acc0c8 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.controller.api.resources; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import io.swagger.annotations.Api; import io.swagger.annotations.ApiKeyAuthDefinition; @@ -29,7 +30,6 @@ import io.swagger.annotations.SecurityDefinition; import io.swagger.annotations.SwaggerDefinition; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.UUID; @@ -222,18 +222,38 @@ public JsonNode getForceCommitJobStatus( String tableNameWithType = controllerJobZKMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE); Set consumingSegmentCommitted = JsonUtils.stringToObject( controllerJobZKMetadata.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST), Set.class); - Set onlineSegmentsForTable = - _pinotHelixResourceManager.getOnlineSegmentsFromIdealState(tableNameWithType, false); - Set segmentsYetToBeCommitted = new HashSet<>(); - consumingSegmentCommitted.forEach(segmentName -> { - if (!onlineSegmentsForTable.contains(segmentName)) { - segmentsYetToBeCommitted.add(segmentName); - } - }); + Set segmentsToCheck; + String segmentsPendingToBeComittedString = + controllerJobZKMetadata.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST); + + if (segmentsPendingToBeComittedString != null) { + segmentsToCheck = JsonUtils.stringToObject(segmentsPendingToBeComittedString, Set.class); + } else { + segmentsToCheck = consumingSegmentCommitted; + } + + Set segmentsYetToBeCommitted = + _pinotLLCRealtimeSegmentManager.getSegmentsYetToBeCommitted(tableNameWithType, segmentsToCheck); + + _pinotHelixResourceManager.addControllerJobToZK(forceCommitJobId, + controllerJobZKMetadata, ControllerJobType.FORCE_COMMIT, prevJobMetadata -> { + String existingSegmentsYetToBeCommittedString = + prevJobMetadata.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST); + if (existingSegmentsYetToBeCommittedString == null) { + return true; + } + try { + Set existingSegmentsYetToBeCommitted = + JsonUtils.stringToObject(existingSegmentsYetToBeCommittedString, Set.class); + return segmentsYetToBeCommitted.size() < existingSegmentsYetToBeCommitted.size(); + } catch (JsonProcessingException e) { + return false; + } + } + ); Map result = new HashMap<>(controllerJobZKMetadata); - result.put("segmentsYetToBeCommitted", segmentsYetToBeCommitted); result.put("numberOfSegmentsYetToBeCommitted", segmentsYetToBeCommitted.size()); return JsonUtils.objectToJsonNode(result); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 77762c2ee8fb..3fa293b99ff4 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -2249,6 +2249,8 @@ public boolean addNewForceCommitJob(String tableNameWithType, String jobId, long jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, Long.toString(jobSubmissionTimeMs)); jobMetadata.put(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST, JsonUtils.objectToString(consumingSegmentsCommitted)); + jobMetadata.put(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST, + JsonUtils.objectToString(consumingSegmentsCommitted)); return addControllerJobToZK(jobId, jobMetadata, ControllerJobType.FORCE_COMMIT); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 4ba7cd2208e6..cbfd6d7d81ff 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -1889,4 +1889,19 @@ String moveSegmentFile(String rawTableName, String segmentName, String segmentLo URI createSegmentPath(String rawTableName, String segmentName) { return URIUtils.getUri(_controllerConf.getDataDir(), rawTableName, URIUtils.encode(segmentName)); } + + public Set getSegmentsYetToBeCommitted(String tableNameWithType, Set segmentsToCheck) { + Set segmentsYetToBeCommitted = new HashSet<>(); + for (String segmentName: segmentsToCheck) { + SegmentZKMetadata segmentZKMetadata = + _helixResourceManager.getSegmentZKMetadata(tableNameWithType, segmentName); + if (segmentZKMetadata == null) { + continue; + } + if (!segmentZKMetadata.getStatus().isCompleted()) { + segmentsYetToBeCommitted.add(segmentName); + } + } + return segmentsYetToBeCommitted; + } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 67bd6191c38c..b852cd954c3c 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -994,6 +994,7 @@ public static class ControllerJob { public static final String SEGMENT_RELOAD_JOB_INSTANCE_NAME = "instanceName"; // Force commit job ZK props public static final String CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST = "segmentsForceCommitted"; + public static final String CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST = "segmentsYetToBeCommitted"; } // prefix for scheduler related features, e.g. query accountant From ed90f11f964ba1aab8b27706d967ad517b8e76f7 Mon Sep 17 00:00:00 2001 From: Harnoor7 Date: Thu, 16 Jan 2025 17:09:58 +0530 Subject: [PATCH 02/13] updates metadata --- .../controller/api/resources/PinotRealtimeTableResource.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java index e74a76acc0c8..bb15dcff2eb4 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java @@ -236,6 +236,9 @@ public JsonNode getForceCommitJobStatus( Set segmentsYetToBeCommitted = _pinotLLCRealtimeSegmentManager.getSegmentsYetToBeCommitted(tableNameWithType, segmentsToCheck); + controllerJobZKMetadata.put(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST, + JsonUtils.objectToString(segmentsYetToBeCommitted)); + _pinotHelixResourceManager.addControllerJobToZK(forceCommitJobId, controllerJobZKMetadata, ControllerJobType.FORCE_COMMIT, prevJobMetadata -> { String existingSegmentsYetToBeCommittedString = From e88aa2a8aa5f81f1d9b2b8d5b1251f5bc21a9589 Mon Sep 17 00:00:00 2001 From: Harnoor7 Date: Thu, 16 Jan 2025 17:23:25 +0530 Subject: [PATCH 03/13] fixes lint --- .../resources/PinotRealtimeTableResource.java | 19 ++---------------- .../helix/core/PinotHelixResourceManager.java | 20 +++++++++++++++++++ 2 files changed, 22 insertions(+), 17 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java index bb15dcff2eb4..2c9daf52b253 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java @@ -18,7 +18,6 @@ */ package org.apache.pinot.controller.api.resources; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import io.swagger.annotations.Api; import io.swagger.annotations.ApiKeyAuthDefinition; @@ -239,22 +238,8 @@ public JsonNode getForceCommitJobStatus( controllerJobZKMetadata.put(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST, JsonUtils.objectToString(segmentsYetToBeCommitted)); - _pinotHelixResourceManager.addControllerJobToZK(forceCommitJobId, - controllerJobZKMetadata, ControllerJobType.FORCE_COMMIT, prevJobMetadata -> { - String existingSegmentsYetToBeCommittedString = - prevJobMetadata.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST); - if (existingSegmentsYetToBeCommittedString == null) { - return true; - } - try { - Set existingSegmentsYetToBeCommitted = - JsonUtils.stringToObject(existingSegmentsYetToBeCommittedString, Set.class); - return segmentsYetToBeCommitted.size() < existingSegmentsYetToBeCommitted.size(); - } catch (JsonProcessingException e) { - return false; - } - } - ); + _pinotHelixResourceManager.updateForceCommitJobMetadata(forceCommitJobId, segmentsYetToBeCommitted, + controllerJobZKMetadata); Map result = new HashMap<>(controllerJobZKMetadata); result.put("numberOfSegmentsYetToBeCommitted", segmentsYetToBeCommitted.size()); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 3fa293b99ff4..3f88da10c514 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -2254,6 +2254,26 @@ public boolean addNewForceCommitJob(String tableNameWithType, String jobId, long return addControllerJobToZK(jobId, jobMetadata, ControllerJobType.FORCE_COMMIT); } + public void updateForceCommitJobMetadata(String forceCommitJobId, Set segmentsYetToBeCommitted, + Map controllerJobZKMetadata) { + addControllerJobToZK(forceCommitJobId, + controllerJobZKMetadata, ControllerJobType.FORCE_COMMIT, prevJobMetadata -> { + String existingSegmentsYetToBeCommittedString = + prevJobMetadata.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST); + if (existingSegmentsYetToBeCommittedString == null) { + return true; + } + try { + Set existingSegmentsYetToBeCommitted = + JsonUtils.stringToObject(existingSegmentsYetToBeCommittedString, Set.class); + return segmentsYetToBeCommitted.size() < existingSegmentsYetToBeCommitted.size(); + } catch (JsonProcessingException e) { + return false; + } + } + ); + } + /** * Adds a new job metadata for controller job like table rebalance or reload into ZK * @param jobId job's UUID From 8c8d8d3661629381bb32e63b83233daceac0af59 Mon Sep 17 00:00:00 2001 From: Harnoor7 Date: Thu, 16 Jan 2025 18:03:30 +0530 Subject: [PATCH 04/13] adds tests --- .../LLCRealtimeClusterIntegrationTest.java | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java index 78b34fc5633b..d99b8ad9e343 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java @@ -41,6 +41,7 @@ import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; import org.apache.pinot.common.metadata.ZKMetadataProvider; +import org.apache.pinot.common.metadata.controllerjob.ControllerJobType; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.utils.FileUploadDownloadClient; import org.apache.pinot.common.utils.HashUtil; @@ -427,6 +428,14 @@ public void testForceCommit() throws Exception { Set consumingSegments = getConsumingSegmentsFromIdealState(getTableName() + "_REALTIME"); String jobId = forceCommit(getTableName()); + Map jobMetadata = + _helixResourceManager.getControllerJobZKMetadata(jobId, ControllerJobType.FORCE_COMMIT); + assert jobMetadata != null; + assert jobMetadata.get("segmentsForceCommitted") != null; + assert jobMetadata.get("segmentsYetToBeCommitted") != null; + Set allSegments = JsonUtils.stringToObject(jobMetadata.get("segmentsForceCommitted"), HashSet.class); + Set segmentsPending = JsonUtils.stringToObject(jobMetadata.get("segmentsYetToBeCommitted"), HashSet.class); + assert segmentsPending.size() <= allSegments.size(); TestUtils.waitForCondition(aVoid -> { try { @@ -462,6 +471,16 @@ public boolean isForceCommitJobCompleted(String forceCommitJobId) assertEquals(jobStatus.get("jobId").asText(), forceCommitJobId); assertEquals(jobStatus.get("jobType").asText(), "FORCE_COMMIT"); + + assert jobStatus.get("segmentsForceCommitted") != null; + assert jobStatus.get("segmentsYetToBeCommitted") != null; + + Set allSegments = JsonUtils.stringToObject(jobStatus.get("segmentsForceCommitted").asText(), HashSet.class); + Set segmentsPending = + JsonUtils.stringToObject(jobStatus.get("segmentsYetToBeCommitted").asText(), HashSet.class); + assert segmentsPending.size() <= allSegments.size(); + assert jobStatus.get("numberOfSegmentsYetToBeCommitted").asInt(-1) == segmentsPending.size(); + return jobStatus.get("numberOfSegmentsYetToBeCommitted").asInt(-1) == 0; } From 1be03161e046374fbf62fb00807551b581327b4c Mon Sep 17 00:00:00 2001 From: Harnoor7 Date: Thu, 16 Jan 2025 19:55:34 +0530 Subject: [PATCH 05/13] saves 1 zk call --- .../api/resources/PinotRealtimeTableResource.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java index 2c9daf52b253..99c7f6f8a0cc 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java @@ -238,8 +238,10 @@ public JsonNode getForceCommitJobStatus( controllerJobZKMetadata.put(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST, JsonUtils.objectToString(segmentsYetToBeCommitted)); - _pinotHelixResourceManager.updateForceCommitJobMetadata(forceCommitJobId, segmentsYetToBeCommitted, - controllerJobZKMetadata); + if (segmentsYetToBeCommitted.size() < segmentsToCheck.size()) { + _pinotHelixResourceManager.updateForceCommitJobMetadata(forceCommitJobId, segmentsYetToBeCommitted, + controllerJobZKMetadata); + } Map result = new HashMap<>(controllerJobZKMetadata); result.put("numberOfSegmentsYetToBeCommitted", segmentsYetToBeCommitted.size()); From 095acc0cca8c7304e5f9eb3f4ba73843b20d6a41 Mon Sep 17 00:00:00 2001 From: Harnoor7 Date: Fri, 17 Jan 2025 15:33:01 +0530 Subject: [PATCH 06/13] Addresses PR comments --- .../resources/PinotRealtimeTableResource.java | 10 ++++----- .../helix/core/PinotHelixResourceManager.java | 22 ------------------- .../PinotLLCRealtimeSegmentManager.java | 3 ++- .../LLCRealtimeClusterIntegrationTest.java | 11 +++++----- 4 files changed, 12 insertions(+), 34 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java index 99c7f6f8a0cc..f4a0e633a010 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java @@ -235,15 +235,15 @@ public JsonNode getForceCommitJobStatus( Set segmentsYetToBeCommitted = _pinotLLCRealtimeSegmentManager.getSegmentsYetToBeCommitted(tableNameWithType, segmentsToCheck); - controllerJobZKMetadata.put(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST, - JsonUtils.objectToString(segmentsYetToBeCommitted)); - if (segmentsYetToBeCommitted.size() < segmentsToCheck.size()) { - _pinotHelixResourceManager.updateForceCommitJobMetadata(forceCommitJobId, segmentsYetToBeCommitted, - controllerJobZKMetadata); + controllerJobZKMetadata.put(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST, + JsonUtils.objectToString(segmentsYetToBeCommitted)); + _pinotHelixResourceManager.addControllerJobToZK(forceCommitJobId, controllerJobZKMetadata, + ControllerJobType.FORCE_COMMIT, prev -> true); } Map result = new HashMap<>(controllerJobZKMetadata); + result.put("segmentsYetToBeCommitted", segmentsYetToBeCommitted); result.put("numberOfSegmentsYetToBeCommitted", segmentsYetToBeCommitted.size()); return JsonUtils.objectToJsonNode(result); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 3f88da10c514..77762c2ee8fb 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -2249,31 +2249,9 @@ public boolean addNewForceCommitJob(String tableNameWithType, String jobId, long jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, Long.toString(jobSubmissionTimeMs)); jobMetadata.put(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST, JsonUtils.objectToString(consumingSegmentsCommitted)); - jobMetadata.put(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST, - JsonUtils.objectToString(consumingSegmentsCommitted)); return addControllerJobToZK(jobId, jobMetadata, ControllerJobType.FORCE_COMMIT); } - public void updateForceCommitJobMetadata(String forceCommitJobId, Set segmentsYetToBeCommitted, - Map controllerJobZKMetadata) { - addControllerJobToZK(forceCommitJobId, - controllerJobZKMetadata, ControllerJobType.FORCE_COMMIT, prevJobMetadata -> { - String existingSegmentsYetToBeCommittedString = - prevJobMetadata.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST); - if (existingSegmentsYetToBeCommittedString == null) { - return true; - } - try { - Set existingSegmentsYetToBeCommitted = - JsonUtils.stringToObject(existingSegmentsYetToBeCommittedString, Set.class); - return segmentsYetToBeCommitted.size() < existingSegmentsYetToBeCommitted.size(); - } catch (JsonProcessingException e) { - return false; - } - } - ); - } - /** * Adds a new job metadata for controller job like table rebalance or reload into ZK * @param jobId job's UUID diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index cbfd6d7d81ff..3e986486d313 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -1896,9 +1896,10 @@ public Set getSegmentsYetToBeCommitted(String tableNameWithType, Set allSegments = JsonUtils.stringToObject(jobMetadata.get("segmentsForceCommitted"), HashSet.class); - Set segmentsPending = JsonUtils.stringToObject(jobMetadata.get("segmentsYetToBeCommitted"), HashSet.class); - assert segmentsPending.size() <= allSegments.size(); TestUtils.waitForCondition(aVoid -> { try { @@ -476,8 +472,11 @@ public boolean isForceCommitJobCompleted(String forceCommitJobId) assert jobStatus.get("segmentsYetToBeCommitted") != null; Set allSegments = JsonUtils.stringToObject(jobStatus.get("segmentsForceCommitted").asText(), HashSet.class); - Set segmentsPending = - JsonUtils.stringToObject(jobStatus.get("segmentsYetToBeCommitted").asText(), HashSet.class); + Set segmentsPending = new HashSet<>(); + for (JsonNode element: jobStatus.get("segmentsYetToBeCommitted")) { + segmentsPending.add(element.asText()); + } + assert segmentsPending.size() <= allSegments.size(); assert jobStatus.get("numberOfSegmentsYetToBeCommitted").asInt(-1) == segmentsPending.size(); From 36360b8d8c51886b876ac7062d47e72dc6b8c09c Mon Sep 17 00:00:00 2001 From: Harnoor7 Date: Fri, 17 Jan 2025 15:35:14 +0530 Subject: [PATCH 07/13] nit --- .../helix/core/realtime/PinotLLCRealtimeSegmentManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 3e986486d313..2c5fa7719e4f 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -1899,7 +1899,7 @@ public Set getSegmentsYetToBeCommitted(String tableNameWithType, Set Date: Fri, 17 Jan 2025 16:00:09 +0530 Subject: [PATCH 08/13] adds unit test --- .../PinotLLCRealtimeSegmentManagerTest.java | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java index dbe640d36400..e17b78013273 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java @@ -19,6 +19,7 @@ package org.apache.pinot.controller.helix.core.realtime; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import java.io.File; import java.io.IOException; @@ -1247,6 +1248,33 @@ public void testGetPartitionIds() Assert.assertEquals(partitionIds.size(), 2); } + @Test + public void getSegmentsYetToBeCommitted() { + PinotHelixResourceManager mockHelixResourceManager = mock(PinotHelixResourceManager.class); + FakePinotLLCRealtimeSegmentManager realtimeSegmentManager = + new FakePinotLLCRealtimeSegmentManager(mockHelixResourceManager); + + SegmentZKMetadata mockSegmentZKMetadataDone = mock(SegmentZKMetadata.class); + when(mockSegmentZKMetadataDone.getStatus()).thenReturn(Status.DONE); + + SegmentZKMetadata mockSegmentZKMetadataUploaded = mock(SegmentZKMetadata.class); + when(mockSegmentZKMetadataUploaded.getStatus()).thenReturn(Status.UPLOADED); + + SegmentZKMetadata mockSegmentZKMetadataInProgress = mock(SegmentZKMetadata.class); + when(mockSegmentZKMetadataInProgress.getStatus()).thenReturn(Status.IN_PROGRESS); + + when(mockHelixResourceManager.getSegmentZKMetadata("test", "s0")).thenReturn(mockSegmentZKMetadataDone); + when(mockHelixResourceManager.getSegmentZKMetadata("test", "s3")).thenReturn(mockSegmentZKMetadataDone); + when(mockHelixResourceManager.getSegmentZKMetadata("test", "s2")).thenReturn(mockSegmentZKMetadataUploaded); + when(mockHelixResourceManager.getSegmentZKMetadata("test", "s4")).thenReturn(mockSegmentZKMetadataInProgress); + when(mockHelixResourceManager.getSegmentZKMetadata("test", "s1")).thenReturn(null); + + Set segmentsToCheck = ImmutableSet.of("s0", "s1", "s2", "s3", "s4"); + Set segmentsYetToBeCommitted = realtimeSegmentManager.getSegmentsYetToBeCommitted("test", segmentsToCheck); + + assert ImmutableSet.of("s2", "s4").equals(segmentsYetToBeCommitted); + } + ////////////////////////////////////////////////////////////////////////////////// // Fake classes ///////////////////////////////////////////////////////////////////////////////// From 5730a067b80377f26307e0aad7e4c33581d87118 Mon Sep 17 00:00:00 2001 From: Harnoor7 Date: Fri, 17 Jan 2025 16:35:59 +0530 Subject: [PATCH 09/13] addresses PR comments --- .../realtime/PinotLLCRealtimeSegmentManagerTest.java | 9 ++++++--- .../tests/LLCRealtimeClusterIntegrationTest.java | 9 +++++---- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java index e17b78013273..aca080313cec 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java @@ -1263,16 +1263,19 @@ public void getSegmentsYetToBeCommitted() { SegmentZKMetadata mockSegmentZKMetadataInProgress = mock(SegmentZKMetadata.class); when(mockSegmentZKMetadataInProgress.getStatus()).thenReturn(Status.IN_PROGRESS); + SegmentZKMetadata mockSegmentZKMetadataInCommitting = mock(SegmentZKMetadata.class); + when(mockSegmentZKMetadataInProgress.getStatus()).thenReturn(Status.COMMITTING); + when(mockHelixResourceManager.getSegmentZKMetadata("test", "s0")).thenReturn(mockSegmentZKMetadataDone); when(mockHelixResourceManager.getSegmentZKMetadata("test", "s3")).thenReturn(mockSegmentZKMetadataDone); when(mockHelixResourceManager.getSegmentZKMetadata("test", "s2")).thenReturn(mockSegmentZKMetadataUploaded); when(mockHelixResourceManager.getSegmentZKMetadata("test", "s4")).thenReturn(mockSegmentZKMetadataInProgress); when(mockHelixResourceManager.getSegmentZKMetadata("test", "s1")).thenReturn(null); + when(mockHelixResourceManager.getSegmentZKMetadata("test", "s5")).thenReturn(mockSegmentZKMetadataInCommitting); - Set segmentsToCheck = ImmutableSet.of("s0", "s1", "s2", "s3", "s4"); + Set segmentsToCheck = ImmutableSet.of("s0", "s1", "s2", "s3", "s4", "s5"); Set segmentsYetToBeCommitted = realtimeSegmentManager.getSegmentsYetToBeCommitted("test", segmentsToCheck); - - assert ImmutableSet.of("s2", "s4").equals(segmentsYetToBeCommitted); + assert ImmutableSet.of("s2", "s4", "s5").equals(segmentsYetToBeCommitted); } ////////////////////////////////////////////////////////////////////////////////// diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java index eaad9d991a3e..c045beb5a7ac 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java @@ -468,12 +468,13 @@ public boolean isForceCommitJobCompleted(String forceCommitJobId) assertEquals(jobStatus.get("jobId").asText(), forceCommitJobId); assertEquals(jobStatus.get("jobType").asText(), "FORCE_COMMIT"); - assert jobStatus.get("segmentsForceCommitted") != null; - assert jobStatus.get("segmentsYetToBeCommitted") != null; + assert jobStatus.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST) != null; + assert jobStatus.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST) != null; - Set allSegments = JsonUtils.stringToObject(jobStatus.get("segmentsForceCommitted").asText(), HashSet.class); + Set allSegments = JsonUtils.stringToObject( + jobStatus.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST).asText(), HashSet.class); Set segmentsPending = new HashSet<>(); - for (JsonNode element: jobStatus.get("segmentsYetToBeCommitted")) { + for (JsonNode element : jobStatus.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST)) { segmentsPending.add(element.asText()); } From c8565d6ce52edfc9a1dda2aa7b51f13ae7799d5b Mon Sep 17 00:00:00 2001 From: Harnoor7 Date: Fri, 17 Jan 2025 16:37:30 +0530 Subject: [PATCH 10/13] nit --- .../helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java index aca080313cec..d5969e611f91 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java @@ -1264,7 +1264,7 @@ public void getSegmentsYetToBeCommitted() { when(mockSegmentZKMetadataInProgress.getStatus()).thenReturn(Status.IN_PROGRESS); SegmentZKMetadata mockSegmentZKMetadataInCommitting = mock(SegmentZKMetadata.class); - when(mockSegmentZKMetadataInProgress.getStatus()).thenReturn(Status.COMMITTING); + when(mockSegmentZKMetadataInCommitting.getStatus()).thenReturn(Status.COMMITTING); when(mockHelixResourceManager.getSegmentZKMetadata("test", "s0")).thenReturn(mockSegmentZKMetadataDone); when(mockHelixResourceManager.getSegmentZKMetadata("test", "s3")).thenReturn(mockSegmentZKMetadataDone); From 857dd6a28ec3932187f2279a08efa0719971af27 Mon Sep 17 00:00:00 2001 From: Harnoor7 Date: Thu, 23 Jan 2025 15:59:01 +0530 Subject: [PATCH 11/13] attempt to fix test --- .../integration/tests/LLCRealtimeClusterIntegrationTest.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java index c045beb5a7ac..50c6f1bdcd1f 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java @@ -438,6 +438,10 @@ public void testForceCommit() if (isForceCommitJobCompleted(jobId)) { assertTrue(_controllerStarter.getHelixResourceManager() .getOnlineSegmentsFromIdealState(getTableName() + "_REALTIME", false).containsAll(consumingSegments)); + for (String segmentName : consumingSegments) { + assertEquals(CommonConstants.Segment.Realtime.Status.DONE, _controllerStarter.getHelixResourceManager() + .getSegmentZKMetadata(getTableName() + "_REALTIME", segmentName).getStatus()); + } return true; } return false; From 0b64439746c28e44dc2cb19dfb832373063e0964 Mon Sep 17 00:00:00 2001 From: Harnoor7 Date: Thu, 23 Jan 2025 16:06:36 +0530 Subject: [PATCH 12/13] nit --- .../integration/tests/LLCRealtimeClusterIntegrationTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java index 50c6f1bdcd1f..17655506411b 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java @@ -436,8 +436,6 @@ public void testForceCommit() TestUtils.waitForCondition(aVoid -> { try { if (isForceCommitJobCompleted(jobId)) { - assertTrue(_controllerStarter.getHelixResourceManager() - .getOnlineSegmentsFromIdealState(getTableName() + "_REALTIME", false).containsAll(consumingSegments)); for (String segmentName : consumingSegments) { assertEquals(CommonConstants.Segment.Realtime.Status.DONE, _controllerStarter.getHelixResourceManager() .getSegmentZKMetadata(getTableName() + "_REALTIME", segmentName).getStatus()); From 9e3ddadf8c2f1354d96a4e6c7263bbb8686b007e Mon Sep 17 00:00:00 2001 From: Harnoor7 Date: Thu, 23 Jan 2025 18:51:02 +0530 Subject: [PATCH 13/13] Attempts to fix test --- ...CRealtimeKafka3ClusterIntegrationTest.java | 25 +++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeKafka3ClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeKafka3ClusterIntegrationTest.java index dce404d64db5..e61cb07c69e6 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeKafka3ClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeKafka3ClusterIntegrationTest.java @@ -41,6 +41,7 @@ import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; import org.apache.pinot.common.metadata.ZKMetadataProvider; +import org.apache.pinot.common.metadata.controllerjob.ControllerJobType; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.utils.FileUploadDownloadClient; import org.apache.pinot.common.utils.HashUtil; @@ -395,12 +396,18 @@ public void testForceCommit() throws Exception { Set consumingSegments = getConsumingSegmentsFromIdealState(getTableName() + "_REALTIME"); String jobId = forceCommit(getTableName()); + Map jobMetadata = + _helixResourceManager.getControllerJobZKMetadata(jobId, ControllerJobType.FORCE_COMMIT); + assert jobMetadata != null; + assert jobMetadata.get("segmentsForceCommitted") != null; TestUtils.waitForCondition(aVoid -> { try { if (isForceCommitJobCompleted(jobId)) { - assertTrue(_controllerStarter.getHelixResourceManager() - .getOnlineSegmentsFromIdealState(getTableName() + "_REALTIME", false).containsAll(consumingSegments)); + for (String segmentName : consumingSegments) { + assertEquals(CommonConstants.Segment.Realtime.Status.DONE, _controllerStarter.getHelixResourceManager() + .getSegmentZKMetadata(getTableName() + "_REALTIME", segmentName).getStatus()); + } return true; } return false; @@ -430,6 +437,20 @@ public boolean isForceCommitJobCompleted(String forceCommitJobId) assertEquals(jobStatus.get("jobId").asText(), forceCommitJobId); assertEquals(jobStatus.get("jobType").asText(), "FORCE_COMMIT"); + + assert jobStatus.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST) != null; + assert jobStatus.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST) != null; + + Set allSegments = JsonUtils.stringToObject( + jobStatus.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST).asText(), HashSet.class); + Set segmentsPending = new HashSet<>(); + for (JsonNode element : jobStatus.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST)) { + segmentsPending.add(element.asText()); + } + + assert segmentsPending.size() <= allSegments.size(); + assert jobStatus.get("numberOfSegmentsYetToBeCommitted").asInt(-1) == segmentsPending.size(); + return jobStatus.get("numberOfSegmentsYetToBeCommitted").asInt(-1) == 0; }