Skip to content

Commit

Permalink
Config for max output segment size in UpsertCompactMerge task (#14742)
Browse files Browse the repository at this point in the history
* Config for max output segment size in UpsertCompactMerge task

* address comments
  • Loading branch information
tibrewalpratik17 authored Jan 7, 2025
1 parent e5d2417 commit 4588f8c
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,16 @@ public static class UpsertCompactMergeTask {
*/
public static final String MAX_NUM_SEGMENTS_PER_TASK_KEY = "maxNumSegmentsPerTask";

/**
* maximum size of output segments to produce
*/
public static final String OUTPUT_SEGMENT_MAX_SIZE_KEY = "outputSegmentMaxSize";

/**
* default output segment size
*/
public static final String DEFAULT_OUTPUT_SEGMENT_MAX_SIZE = "200MB";

/**
* default maximum number of segments to process in a single task
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.utils.DataSizeUtils;
import org.apache.pinot.spi.utils.TimeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -63,11 +64,14 @@ public static class SegmentMergerMetadata {
private final SegmentZKMetadata _segmentZKMetadata;
private final long _validDocIds;
private final long _invalidDocIds;
private final double _segmentSizeInBytes;

SegmentMergerMetadata(SegmentZKMetadata segmentZKMetadata, long validDocIds, long invalidDocIds) {
SegmentMergerMetadata(SegmentZKMetadata segmentZKMetadata, long validDocIds, long invalidDocIds,
double segmentSizeInBytes) {
_segmentZKMetadata = segmentZKMetadata;
_validDocIds = validDocIds;
_invalidDocIds = invalidDocIds;
_segmentSizeInBytes = segmentSizeInBytes;
}

public SegmentZKMetadata getSegmentZKMetadata() {
Expand All @@ -81,6 +85,10 @@ public long getValidDocIds() {
public long getInvalidDocIds() {
return _invalidDocIds;
}

public double getSegmentSizeInBytes() {
return _segmentSizeInBytes;
}
}

public static class SegmentSelectionResult {
Expand Down Expand Up @@ -226,6 +234,27 @@ public static SegmentSelectionResult processValidDocIdsMetadata(Map<String, Stri
Map<String, List<ValidDocIdsMetadataInfo>> validDocIdsMetadataInfoMap, Set<String> alreadyMergedSegments) {
Map<Integer, List<SegmentMergerMetadata>> segmentsEligibleForCompactMerge = new HashMap<>();
Set<String> segmentsForDeletion = new HashSet<>();

// task config thresholds
long validDocsThreshold = Long.parseLong(
taskConfigs.getOrDefault(MinionConstants.UpsertCompactMergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY,
String.valueOf(MinionConstants.UpsertCompactMergeTask.DEFAULT_MAX_NUM_RECORDS_PER_SEGMENT)));
long maxRecordsPerTask = Long.parseLong(
taskConfigs.getOrDefault(MinionConstants.UpsertCompactMergeTask.MAX_NUM_RECORDS_PER_TASK_KEY,
String.valueOf(MinionConstants.UpsertCompactMergeTask.DEFAULT_MAX_NUM_RECORDS_PER_TASK)));
long maxNumSegments = Long.parseLong(
taskConfigs.getOrDefault(MinionConstants.UpsertCompactMergeTask.MAX_NUM_SEGMENTS_PER_TASK_KEY,
String.valueOf(MinionConstants.UpsertCompactMergeTask.DEFAULT_MAX_NUM_SEGMENTS_PER_TASK)));
// default to Long.MAX_VALUE to avoid size-based compaction by default
long outputSegmentMaxSizeInBytes = Long.MAX_VALUE;
try {
outputSegmentMaxSizeInBytes = DataSizeUtils.toBytes(
taskConfigs.getOrDefault(MinionConstants.UpsertCompactMergeTask.OUTPUT_SEGMENT_MAX_SIZE_KEY,
MinionConstants.UpsertCompactMergeTask.DEFAULT_OUTPUT_SEGMENT_MAX_SIZE));
} catch (Exception e) {
LOGGER.warn("Invalid value for outputSegmentMaxSizeInBytes, defaulting to Long.MAX_VALUE", e);
}

for (String segmentName : validDocIdsMetadataInfoMap.keySet()) {
// check if segment is part of completed segments
if (!candidateSegmentsMap.containsKey(segmentName)) {
Expand All @@ -237,6 +266,7 @@ public static SegmentSelectionResult processValidDocIdsMetadata(Map<String, Stri
for (ValidDocIdsMetadataInfo validDocIdsMetadata : validDocIdsMetadataInfoMap.get(segmentName)) {
long totalInvalidDocs = validDocIdsMetadata.getTotalInvalidDocs();
long totalValidDocs = validDocIdsMetadata.getTotalValidDocs();
long segmentSizeInBytes = validDocIdsMetadata.getSegmentSizeInBytes();

// Skip segments if the crc from zk metadata and server does not match. They may be getting reloaded.
if (segment.getCrc() != Long.parseLong(validDocIdsMetadata.getSegmentCrc())) {
Expand All @@ -260,8 +290,10 @@ public static SegmentSelectionResult processValidDocIdsMetadata(Map<String, Stri
MinionConstants.UpsertCompactMergeTask.TASK_TYPE);
continue;
}
double expectedSegmentSizeAfterCompaction = (segmentSizeInBytes * totalValidDocs * 1.0) / totalDocs;
segmentsEligibleForCompactMerge.computeIfAbsent(partitionID, k -> new ArrayList<>())
.add(new SegmentMergerMetadata(segment, totalValidDocs, totalInvalidDocs));
.add(new SegmentMergerMetadata(segment, totalValidDocs, totalInvalidDocs,
expectedSegmentSizeAfterCompaction));
}
break;
}
Expand All @@ -277,17 +309,6 @@ public static SegmentSelectionResult processValidDocIdsMetadata(Map<String, Stri
for (Map.Entry<Integer, List<SegmentMergerMetadata>> entry : segmentsEligibleForCompactMerge.entrySet()) {
int partitionID = entry.getKey();
List<SegmentMergerMetadata> segments = entry.getValue();
// task config thresholds
// TODO add output segment size as one of the thresholds
long validDocsThreshold = Long.parseLong(
taskConfigs.getOrDefault(MinionConstants.UpsertCompactMergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY,
String.valueOf(MinionConstants.UpsertCompactMergeTask.DEFAULT_MAX_NUM_RECORDS_PER_SEGMENT)));
long maxRecordsPerTask = Long.parseLong(
taskConfigs.getOrDefault(MinionConstants.UpsertCompactMergeTask.MAX_NUM_RECORDS_PER_TASK_KEY,
String.valueOf(MinionConstants.UpsertCompactMergeTask.DEFAULT_MAX_NUM_RECORDS_PER_TASK)));
long maxNumSegments = Long.parseLong(
taskConfigs.getOrDefault(MinionConstants.UpsertCompactMergeTask.MAX_NUM_SEGMENTS_PER_TASK_KEY,
String.valueOf(MinionConstants.UpsertCompactMergeTask.DEFAULT_MAX_NUM_SEGMENTS_PER_TASK)));

// List to store groups for the current partition
List<List<SegmentMergerMetadata>> groups = new ArrayList<>();
Expand All @@ -296,18 +317,22 @@ public static SegmentSelectionResult processValidDocIdsMetadata(Map<String, Stri
// variables to maintain current group sum
long currentValidDocsSum = 0;
long currentTotalDocsSum = 0;
double currentOutputSegmentSizeInBytes = 0.0;

for (SegmentMergerMetadata segment : segments) {
long validDocs = segment.getValidDocIds();
long invalidDocs = segment.getInvalidDocIds();
double expectedSegmentSizeInBytes = segment.getSegmentSizeInBytes();

// Check if adding this segment would keep the validDocs sum within the threshold
if (currentValidDocsSum + validDocs <= validDocsThreshold && currentGroup.size() < maxNumSegments
&& currentTotalDocsSum + validDocs + invalidDocs < maxRecordsPerTask) {
&& currentTotalDocsSum + validDocs + invalidDocs < maxRecordsPerTask
&& currentOutputSegmentSizeInBytes + expectedSegmentSizeInBytes < outputSegmentMaxSizeInBytes) {
// Add the segment to the current group
currentGroup.add(segment);
currentValidDocsSum += validDocs;
currentTotalDocsSum += validDocs + invalidDocs;
currentOutputSegmentSizeInBytes += expectedSegmentSizeInBytes;
} else {
// Finalize the current group and start a new one
if (!currentGroup.isEmpty()) {
Expand All @@ -319,6 +344,7 @@ public static SegmentSelectionResult processValidDocIdsMetadata(Map<String, Stri
currentGroup.add(segment);
currentValidDocsSum = validDocs;
currentTotalDocsSum = validDocs + invalidDocs;
currentOutputSegmentSizeInBytes = expectedSegmentSizeInBytes;
}
}
// Add the last group
Expand Down Expand Up @@ -408,6 +434,10 @@ public void validateTaskConfigs(TableConfig tableConfig, Map<String, String> tas
Preconditions.checkState(upsertConfig.isEnableSnapshot(),
String.format("'enableSnapshot' from UpsertConfig must be enabled for %s",
MinionConstants.UpsertCompactMergeTask.TASK_TYPE));
// check valid task config for maxOutputSegmentSize
if (taskConfigs.containsKey(MinionConstants.UpsertCompactMergeTask.OUTPUT_SEGMENT_MAX_SIZE_KEY)) {
DataSizeUtils.toBytes(taskConfigs.get(MinionConstants.UpsertCompactMergeTask.OUTPUT_SEGMENT_MAX_SIZE_KEY));
}
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,13 +221,13 @@ public void testGetDownloadUrl() {

// single segment
segmentMergerMetadataList =
List.of(new UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment, 100, 10));
List.of(new UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment, 100, 10, 100000));
Assert.assertEquals(_taskGenerator.getDownloadUrl(segmentMergerMetadataList), "fs://testTable__0");

// multiple segments
segmentMergerMetadataList = Arrays.asList(
new UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment, 100, 10),
new UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment2, 200, 20)
new UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment, 100, 10, 100000),
new UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment2, 200, 20, 100000)
);
Assert.assertEquals(_taskGenerator.getDownloadUrl(segmentMergerMetadataList),
"fs://testTable__0,fs://testTable__1");
Expand All @@ -241,13 +241,13 @@ public void testGetSegmentCrcList() {

// single segment
segmentMergerMetadataList =
List.of(new UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment, 100, 10));
List.of(new UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment, 100, 10, 100000));
Assert.assertEquals(_taskGenerator.getSegmentCrcList(segmentMergerMetadataList), "1000");

// multiple segments
segmentMergerMetadataList = Arrays.asList(
new UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment, 100, 10),
new UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment2, 200, 20)
new UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment, 100, 10, 100000),
new UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment2, 200, 20, 100000)
);
Assert.assertEquals(_taskGenerator.getSegmentCrcList(segmentMergerMetadataList), "1000,2000");
}
Expand Down

0 comments on commit 4588f8c

Please sign in to comment.