Skip to content

Commit

Permalink
Remove default use of outputSegmentMaxSize in UpsertCompactMerge task
Browse files Browse the repository at this point in the history
  • Loading branch information
tibrewalpratik17 committed Jan 7, 2025
1 parent 4588f8c commit 10226ac
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -287,11 +287,6 @@ public static class UpsertCompactMergeTask {
*/
public static final String OUTPUT_SEGMENT_MAX_SIZE_KEY = "outputSegmentMaxSize";

/**
* default output segment size
*/
public static final String DEFAULT_OUTPUT_SEGMENT_MAX_SIZE = "200MB";

/**
* default maximum number of segments to process in a single task
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
Set<String> alreadyMergedSegments = getAlreadyMergedSegments(allSegments);

SegmentSelectionResult segmentSelectionResult =
processValidDocIdsMetadata(taskConfigs, candidateSegmentsMap, validDocIdsMetadataList, alreadyMergedSegments);
processValidDocIdsMetadata(tableNameWithType, taskConfigs, candidateSegmentsMap, validDocIdsMetadataList,
alreadyMergedSegments);

if (!segmentSelectionResult.getSegmentsForDeletion().isEmpty()) {
pinotHelixResourceManager.deleteSegments(tableNameWithType, segmentSelectionResult.getSegmentsForDeletion(),
Expand Down Expand Up @@ -229,8 +230,8 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
}

@VisibleForTesting
public static SegmentSelectionResult processValidDocIdsMetadata(Map<String, String> taskConfigs,
Map<String, SegmentZKMetadata> candidateSegmentsMap,
public static SegmentSelectionResult processValidDocIdsMetadata(String tableNameWithType,
Map<String, String> taskConfigs, Map<String, SegmentZKMetadata> candidateSegmentsMap,
Map<String, List<ValidDocIdsMetadataInfo>> validDocIdsMetadataInfoMap, Set<String> alreadyMergedSegments) {
Map<Integer, List<SegmentMergerMetadata>> segmentsEligibleForCompactMerge = new HashMap<>();
Set<String> segmentsForDeletion = new HashSet<>();
Expand All @@ -245,14 +246,22 @@ public static SegmentSelectionResult processValidDocIdsMetadata(Map<String, Stri
long maxNumSegments = Long.parseLong(
taskConfigs.getOrDefault(MinionConstants.UpsertCompactMergeTask.MAX_NUM_SEGMENTS_PER_TASK_KEY,
String.valueOf(MinionConstants.UpsertCompactMergeTask.DEFAULT_MAX_NUM_SEGMENTS_PER_TASK)));

// default to Long.MAX_VALUE to avoid size-based compaction by default
long outputSegmentMaxSizeInBytes = Long.MAX_VALUE;
try {
outputSegmentMaxSizeInBytes = DataSizeUtils.toBytes(
taskConfigs.getOrDefault(MinionConstants.UpsertCompactMergeTask.OUTPUT_SEGMENT_MAX_SIZE_KEY,
MinionConstants.UpsertCompactMergeTask.DEFAULT_OUTPUT_SEGMENT_MAX_SIZE));
if (taskConfigs.containsKey(MinionConstants.UpsertCompactMergeTask.OUTPUT_SEGMENT_MAX_SIZE_KEY)) {
String configuredOutputSegmentMaxSize =
taskConfigs.get(MinionConstants.UpsertCompactMergeTask.OUTPUT_SEGMENT_MAX_SIZE_KEY);
LOGGER.info("Configured outputSegmentMaxSizeInByte: {} for {}", configuredOutputSegmentMaxSize,
tableNameWithType);
outputSegmentMaxSizeInBytes = DataSizeUtils.toBytes(configuredOutputSegmentMaxSize);
} else {
LOGGER.info("No configured outputSegmentMaxSizeInByte for {}, defaulting to Long.MAX_VALUE", tableNameWithType);
}
} catch (Exception e) {
LOGGER.warn("Invalid value for outputSegmentMaxSizeInBytes, defaulting to Long.MAX_VALUE", e);
LOGGER.warn("Invalid value outputSegmentMaxSizeInBytes configured for {}, defaulting to Long.MAX_VALUE",
tableNameWithType, e);
}

for (String segmentName : validDocIdsMetadataInfoMap.keySet()) {
Expand Down

0 comments on commit 10226ac

Please sign in to comment.