From 720a2cc54c2ee55da8534bbd74de756490da5f29 Mon Sep 17 00:00:00 2001 From: tibrewalpratik Date: Fri, 10 Jan 2025 12:58:50 +0100 Subject: [PATCH] Enable UpsertCompactMergeTask with enableDeletedKeysCompactionConsistency config --- .../pinot/segment/local/utils/TableConfigUtils.java | 11 +++++++---- .../segment/local/utils/TableConfigUtilsTest.java | 5 +++-- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java index 141e0c280a93..b6d8c17a9fdf 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java @@ -112,6 +112,7 @@ private TableConfigUtils() { // supported TableTaskTypes, must be identical to the one return in the impl of {@link PinotTaskGenerator}. private static final String UPSERT_COMPACTION_TASK_TYPE = "UpsertCompactionTask"; + private static final String UPSERT_COMPACT_MERGE_TASK_TYPE = "UpsertCompactMergeTask"; // this is duplicate with KinesisConfig.STREAM_TYPE, while instead of use KinesisConfig.STREAM_TYPE directly, we // hardcode the value here to avoid pulling the entire pinot-kinesis module as dependency. @@ -760,11 +761,13 @@ static void validateUpsertAndDedupConfig(TableConfig tableConfig, Schema schema) Preconditions.checkState(upsertConfig.isEnableSnapshot(), "enableDeletedKeysCompactionConsistency should exist with enableSnapshot for upsert table"); - // enableDeletedKeysCompactionConsistency should exist with UpsertCompactionTask + // enableDeletedKeysCompactionConsistency should exist with UpsertCompactionTask / UpsertCompactMergeTask TableTaskConfig taskConfig = tableConfig.getTaskConfig(); - Preconditions.checkState( - taskConfig != null && taskConfig.getTaskTypeConfigsMap().containsKey(UPSERT_COMPACTION_TASK_TYPE), - "enableDeletedKeysCompactionConsistency should exist with UpsertCompactionTask for upsert table"); + Preconditions.checkState(taskConfig != null + && (taskConfig.getTaskTypeConfigsMap().containsKey(UPSERT_COMPACTION_TASK_TYPE) + || taskConfig.getTaskTypeConfigsMap().containsKey(UPSERT_COMPACT_MERGE_TASK_TYPE)), + "enableDeletedKeysCompactionConsistency should exist with UpsertCompactionTask" + + " / UpsertCompactMergeTask for upsert table"); } if (upsertConfig.getConsistencyMode() != UpsertConfig.ConsistencyMode.NONE) { diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java index 72a17ee7d1c6..88691dd8c15f 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java @@ -2067,7 +2067,7 @@ public void testValidateUpsertConfig() { "enableDeletedKeysCompactionConsistency should exist with enableSnapshot for upsert table"); } - // test enableDeletedKeysCompactionConsistency should exist with UpsertCompactionTask + // test enableDeletedKeysCompactionConsistency should exist with UpsertCompactionTask / UpsertCompactMerge task upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL); upsertConfig.setEnableDeletedKeysCompactionConsistency(true); upsertConfig.setDeletedKeysTTL(100); @@ -2080,7 +2080,8 @@ public void testValidateUpsertConfig() { TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema); } catch (IllegalStateException e) { Assert.assertEquals(e.getMessage(), - "enableDeletedKeysCompactionConsistency should exist with UpsertCompactionTask for upsert table"); + "enableDeletedKeysCompactionConsistency should exist with UpsertCompactionTask " + + "/ UpsertCompactMergeTask for upsert table"); } }