diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java index 7a276d625412..24db1f9ede3c 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java @@ -282,6 +282,16 @@ public static class UpsertCompactMergeTask { */ public static final String MAX_NUM_SEGMENTS_PER_TASK_KEY = "maxNumSegmentsPerTask"; + /** + * maximum size of output segments to produce + */ + public static final String OUTPUT_SEGMENT_MAX_SIZE_KEY = "outputSegmentMaxSize"; + + /** + * default output segment size + */ + public static final String DEFAULT_OUTPUT_SEGMENT_MAX_SIZE = "200MB"; + /** * default maximum number of segments to process in a single task */ diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java index ae3a4aa0d847..dd7bf283532a 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java @@ -47,6 +47,7 @@ import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.config.table.UpsertConfig; +import org.apache.pinot.spi.utils.DataSizeUtils; import org.apache.pinot.spi.utils.TimeUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,11 +64,14 @@ public static class SegmentMergerMetadata { private final SegmentZKMetadata _segmentZKMetadata; private final long _validDocIds; private final long _invalidDocIds; + private final double _segmentSizeInBytes; - SegmentMergerMetadata(SegmentZKMetadata segmentZKMetadata, long validDocIds, long invalidDocIds) { + SegmentMergerMetadata(SegmentZKMetadata segmentZKMetadata, long validDocIds, long invalidDocIds, + double segmentSizeInBytes) { _segmentZKMetadata = segmentZKMetadata; _validDocIds = validDocIds; _invalidDocIds = invalidDocIds; + _segmentSizeInBytes = segmentSizeInBytes; } public SegmentZKMetadata getSegmentZKMetadata() { @@ -81,6 +85,10 @@ public long getValidDocIds() { public long getInvalidDocIds() { return _invalidDocIds; } + + public double getSegmentSizeInBytes() { + return _segmentSizeInBytes; + } } public static class SegmentSelectionResult { @@ -226,6 +234,27 @@ public static SegmentSelectionResult processValidDocIdsMetadata(Map> validDocIdsMetadataInfoMap, Set alreadyMergedSegments) { Map> segmentsEligibleForCompactMerge = new HashMap<>(); Set segmentsForDeletion = new HashSet<>(); + + // task config thresholds + long validDocsThreshold = Long.parseLong( + taskConfigs.getOrDefault(MinionConstants.UpsertCompactMergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY, + String.valueOf(MinionConstants.UpsertCompactMergeTask.DEFAULT_MAX_NUM_RECORDS_PER_SEGMENT))); + long maxRecordsPerTask = Long.parseLong( + taskConfigs.getOrDefault(MinionConstants.UpsertCompactMergeTask.MAX_NUM_RECORDS_PER_TASK_KEY, + String.valueOf(MinionConstants.UpsertCompactMergeTask.DEFAULT_MAX_NUM_RECORDS_PER_TASK))); + long maxNumSegments = Long.parseLong( + taskConfigs.getOrDefault(MinionConstants.UpsertCompactMergeTask.MAX_NUM_SEGMENTS_PER_TASK_KEY, + String.valueOf(MinionConstants.UpsertCompactMergeTask.DEFAULT_MAX_NUM_SEGMENTS_PER_TASK))); + // default to Long.MAX_VALUE to avoid size-based compaction by default + long outputSegmentMaxSizeInBytes = Long.MAX_VALUE; + try { + outputSegmentMaxSizeInBytes = DataSizeUtils.toBytes( + taskConfigs.getOrDefault(MinionConstants.UpsertCompactMergeTask.OUTPUT_SEGMENT_MAX_SIZE_KEY, + MinionConstants.UpsertCompactMergeTask.DEFAULT_OUTPUT_SEGMENT_MAX_SIZE)); + } catch (Exception e) { + LOGGER.warn("Invalid value for outputSegmentMaxSizeInBytes, defaulting to Long.MAX_VALUE", e); + } + for (String segmentName : validDocIdsMetadataInfoMap.keySet()) { // check if segment is part of completed segments if (!candidateSegmentsMap.containsKey(segmentName)) { @@ -237,6 +266,7 @@ public static SegmentSelectionResult processValidDocIdsMetadata(Map new ArrayList<>()) - .add(new SegmentMergerMetadata(segment, totalValidDocs, totalInvalidDocs)); + .add(new SegmentMergerMetadata(segment, totalValidDocs, totalInvalidDocs, + expectedSegmentSizeAfterCompaction)); } break; } @@ -277,17 +309,6 @@ public static SegmentSelectionResult processValidDocIdsMetadata(Map> entry : segmentsEligibleForCompactMerge.entrySet()) { int partitionID = entry.getKey(); List segments = entry.getValue(); - // task config thresholds - // TODO add output segment size as one of the thresholds - long validDocsThreshold = Long.parseLong( - taskConfigs.getOrDefault(MinionConstants.UpsertCompactMergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY, - String.valueOf(MinionConstants.UpsertCompactMergeTask.DEFAULT_MAX_NUM_RECORDS_PER_SEGMENT))); - long maxRecordsPerTask = Long.parseLong( - taskConfigs.getOrDefault(MinionConstants.UpsertCompactMergeTask.MAX_NUM_RECORDS_PER_TASK_KEY, - String.valueOf(MinionConstants.UpsertCompactMergeTask.DEFAULT_MAX_NUM_RECORDS_PER_TASK))); - long maxNumSegments = Long.parseLong( - taskConfigs.getOrDefault(MinionConstants.UpsertCompactMergeTask.MAX_NUM_SEGMENTS_PER_TASK_KEY, - String.valueOf(MinionConstants.UpsertCompactMergeTask.DEFAULT_MAX_NUM_SEGMENTS_PER_TASK))); // List to store groups for the current partition List> groups = new ArrayList<>(); @@ -296,18 +317,22 @@ public static SegmentSelectionResult processValidDocIdsMetadata(Map tas Preconditions.checkState(upsertConfig.isEnableSnapshot(), String.format("'enableSnapshot' from UpsertConfig must be enabled for %s", MinionConstants.UpsertCompactMergeTask.TASK_TYPE)); + // check valid task config for maxOutputSegmentSize + if (taskConfigs.containsKey(MinionConstants.UpsertCompactMergeTask.OUTPUT_SEGMENT_MAX_SIZE_KEY)) { + DataSizeUtils.toBytes(taskConfigs.get(MinionConstants.UpsertCompactMergeTask.OUTPUT_SEGMENT_MAX_SIZE_KEY)); + } } @VisibleForTesting diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java index 5556ac53cd20..3709392e76e0 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java @@ -221,13 +221,13 @@ public void testGetDownloadUrl() { // single segment segmentMergerMetadataList = - List.of(new UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment, 100, 10)); + List.of(new UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment, 100, 10, 100000)); Assert.assertEquals(_taskGenerator.getDownloadUrl(segmentMergerMetadataList), "fs://testTable__0"); // multiple segments segmentMergerMetadataList = Arrays.asList( - new UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment, 100, 10), - new UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment2, 200, 20) + new UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment, 100, 10, 100000), + new UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment2, 200, 20, 100000) ); Assert.assertEquals(_taskGenerator.getDownloadUrl(segmentMergerMetadataList), "fs://testTable__0,fs://testTable__1"); @@ -241,13 +241,13 @@ public void testGetSegmentCrcList() { // single segment segmentMergerMetadataList = - List.of(new UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment, 100, 10)); + List.of(new UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment, 100, 10, 100000)); Assert.assertEquals(_taskGenerator.getSegmentCrcList(segmentMergerMetadataList), "1000"); // multiple segments segmentMergerMetadataList = Arrays.asList( - new UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment, 100, 10), - new UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment2, 200, 20) + new UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment, 100, 10, 100000), + new UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment2, 200, 20, 100000) ); Assert.assertEquals(_taskGenerator.getSegmentCrcList(segmentMergerMetadataList), "1000,2000"); }