From 6b1ccc5c70cbd8ab6533780b1171416e1ae30c1a Mon Sep 17 00:00:00 2001 From: Xiang Fu Date: Wed, 8 Jan 2025 08:24:47 -0800 Subject: [PATCH] Add Schema to the PinotTaskGenerator validateTaskConfigs method call (#14683) --- .../resources/PinotTableRestletResource.java | 12 ++++++----- .../TableConfigsRestletResource.java | 4 ++-- .../minion/generator/PinotTaskGenerator.java | 4 +++- .../controller/util/TaskConfigUtils.java | 5 +++-- .../controller/util/TaskConfigUtilsTest.java | 11 +++++----- ...ealtimeToOfflineSegmentsTaskGenerator.java | 6 +++--- .../UpsertCompactionTaskGenerator.java | 3 ++- .../UpsertCompactMergeTaskGenerator.java | 3 ++- ...imeToOfflineSegmentsTaskGeneratorTest.java | 20 +++++++++---------- .../UpsertCompactionTaskGeneratorTest.java | 15 ++++++++------ .../UpsertCompactMergeTaskGeneratorTest.java | 19 ++++++++++-------- 11 files changed, 58 insertions(+), 44 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java index 8c67df32b36e..638849df4603 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java @@ -211,6 +211,7 @@ public ConfigSuccessResponse addTable(String tableConfigStr, Pair> tableConfigAndUnrecognizedProperties; TableConfig tableConfig; String tableNameWithType; + Schema schema; try { tableConfigAndUnrecognizedProperties = JsonUtils.stringToObjectAndUnrecognizedProperties(tableConfigStr, TableConfig.class); @@ -224,7 +225,7 @@ public ConfigSuccessResponse addTable(String tableConfigStr, ResourceUtils.checkPermissionAndAccess(tableNameWithType, request, httpHeaders, AccessType.CREATE, Actions.Table.CREATE_TABLE, _accessControlFactory, LOGGER); - Schema schema = _pinotHelixResourceManager.getSchemaForTableConfig(tableConfig); + schema = _pinotHelixResourceManager.getSchemaForTableConfig(tableConfig); TableConfigTunerUtils.applyTunerConfigs(_pinotHelixResourceManager, tableConfig, schema, Collections.emptyMap()); @@ -239,7 +240,7 @@ public ConfigSuccessResponse addTable(String tableConfigStr, TableConfigUtils.ensureMinReplicas(tableConfig, _controllerConf.getDefaultTableMinReplicas()); TableConfigUtils.ensureStorageQuotaConstraints(tableConfig, _controllerConf.getDimTableMaxSize()); checkHybridTableConfig(TableNameBuilder.extractRawTableName(tableNameWithType), tableConfig); - TaskConfigUtils.validateTaskConfigs(tableConfig, _pinotTaskManager, typesToSkip); + TaskConfigUtils.validateTaskConfigs(tableConfig, schema, _pinotTaskManager, typesToSkip); } catch (Exception e) { throw new InvalidTableConfigException(e); } @@ -481,6 +482,7 @@ public ConfigSuccessResponse updateTableConfig( Pair> tableConfigAndUnrecognizedProperties; TableConfig tableConfig; String tableNameWithType; + Schema schema; try { tableConfigAndUnrecognizedProperties = JsonUtils.stringToObjectAndUnrecognizedProperties(tableConfigString, TableConfig.class); @@ -497,7 +499,7 @@ public ConfigSuccessResponse updateTableConfig( Response.Status.BAD_REQUEST); } - Schema schema = _pinotHelixResourceManager.getSchemaForTableConfig(tableConfig); + schema = _pinotHelixResourceManager.getSchemaForTableConfig(tableConfig); TableConfigUtils.validate(tableConfig, schema, typesToSkip); } catch (Exception e) { String msg = String.format("Invalid table config: %s with error: %s", tableName, e.getMessage()); @@ -514,7 +516,7 @@ public ConfigSuccessResponse updateTableConfig( TableConfigUtils.ensureMinReplicas(tableConfig, _controllerConf.getDefaultTableMinReplicas()); TableConfigUtils.ensureStorageQuotaConstraints(tableConfig, _controllerConf.getDimTableMaxSize()); checkHybridTableConfig(TableNameBuilder.extractRawTableName(tableNameWithType), tableConfig); - TaskConfigUtils.validateTaskConfigs(tableConfig, _pinotTaskManager, typesToSkip); + TaskConfigUtils.validateTaskConfigs(tableConfig, schema, _pinotTaskManager, typesToSkip); } catch (Exception e) { throw new InvalidTableConfigException(e); } @@ -575,7 +577,7 @@ private ObjectNode validateConfig(TableConfig tableConfig, Schema schema, @Nulla throw new SchemaNotFoundException("Got empty schema"); } TableConfigUtils.validate(tableConfig, schema, typesToSkip); - TaskConfigUtils.validateTaskConfigs(tableConfig, _pinotTaskManager, typesToSkip); + TaskConfigUtils.validateTaskConfigs(tableConfig, schema, _pinotTaskManager, typesToSkip); ObjectNode tableConfigValidateStr = JsonUtils.newObjectNode(); if (tableConfig.getTableType() == TableType.OFFLINE) { tableConfigValidateStr.set(TableType.OFFLINE.name(), tableConfig.toJsonNode()); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableConfigsRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableConfigsRestletResource.java index 5d55df609590..82a9f164eafa 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableConfigsRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableConfigsRestletResource.java @@ -462,7 +462,7 @@ private void validateConfig(TableConfigs tableConfigs, String database, @Nullabl "Name in 'offline' table config: %s must be equal to 'tableName': %s", offlineRawTableName, rawTableName); TableConfigUtils.validateTableName(offlineTableConfig); TableConfigUtils.validate(offlineTableConfig, schema, typesToSkip); - TaskConfigUtils.validateTaskConfigs(tableConfigs.getOffline(), _pinotTaskManager, typesToSkip); + TaskConfigUtils.validateTaskConfigs(tableConfigs.getOffline(), schema, _pinotTaskManager, typesToSkip); } if (realtimeTableConfig != null) { String realtimeRawTableName = DatabaseUtils.translateTableName( @@ -471,7 +471,7 @@ private void validateConfig(TableConfigs tableConfigs, String database, @Nullabl "Name in 'realtime' table config: %s must be equal to 'tableName': %s", realtimeRawTableName, rawTableName); TableConfigUtils.validateTableName(realtimeTableConfig); TableConfigUtils.validate(realtimeTableConfig, schema, typesToSkip); - TaskConfigUtils.validateTaskConfigs(tableConfigs.getRealtime(), _pinotTaskManager, typesToSkip); + TaskConfigUtils.validateTaskConfigs(tableConfigs.getRealtime(), schema, _pinotTaskManager, typesToSkip); } if (offlineTableConfig != null && realtimeTableConfig != null) { TableConfigUtils.verifyHybridTableConfigs(rawTableName, offlineTableConfig, realtimeTableConfig); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/PinotTaskGenerator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/PinotTaskGenerator.java index 9be76f253d6a..8d5d9bedcc2c 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/PinotTaskGenerator.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/PinotTaskGenerator.java @@ -25,6 +25,7 @@ import org.apache.pinot.core.common.MinionConstants; import org.apache.pinot.core.minion.PinotTaskConfig; import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.utils.CommonConstants; @@ -103,8 +104,9 @@ default String getMinionInstanceTag(TableConfig tableConfig) { /** * Performs task type specific validations for the given task type. * @param tableConfig The table configuration that is getting added/updated/validated. + * @param schema The schema of the table. * @param taskConfigs The task type specific task configuration to be validated. */ - default void validateTaskConfigs(TableConfig tableConfig, Map taskConfigs) { + default void validateTaskConfigs(TableConfig tableConfig, Schema schema, Map taskConfigs) { } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TaskConfigUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TaskConfigUtils.java index 059908ea8db1..53ba30093472 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TaskConfigUtils.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TaskConfigUtils.java @@ -26,6 +26,7 @@ import org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorRegistry; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableTaskConfig; +import org.apache.pinot.spi.data.Schema; import org.quartz.CronScheduleBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,7 +41,7 @@ public class TaskConfigUtils { private TaskConfigUtils() { } - public static void validateTaskConfigs(TableConfig tableConfig, PinotTaskManager pinotTaskManager, + public static void validateTaskConfigs(TableConfig tableConfig, Schema schema, PinotTaskManager pinotTaskManager, String validationTypesToSkip) { if (tableConfig == null || tableConfig.getTaskConfig() == null) { return; @@ -59,7 +60,7 @@ public static void validateTaskConfigs(TableConfig tableConfig, PinotTaskManager if (taskGenerator != null) { Map taskConfigs = taskConfigEntry.getValue(); doCommonTaskValidations(tableConfig, taskType, taskConfigs); - taskGenerator.validateTaskConfigs(tableConfig, taskConfigs); + taskGenerator.validateTaskConfigs(tableConfig, schema, taskConfigs); } else { throw new RuntimeException(String.format("Task generator not found for task type: %s, while validating table " + "configs for table: %s", taskType, tableConfig.getTableName())); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/util/TaskConfigUtilsTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/util/TaskConfigUtilsTest.java index 000bf9826ca3..6d4753fed826 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/util/TaskConfigUtilsTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/util/TaskConfigUtilsTest.java @@ -30,6 +30,7 @@ import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableTaskConfig; import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.mockito.Mockito; import org.testng.Assert; @@ -64,7 +65,7 @@ public List generateTasks(List tableConfigs) { } @Override - public void validateTaskConfigs(TableConfig tableConfig, Map taskConfigs) { + public void validateTaskConfigs(TableConfig tableConfig, Schema schema, Map taskConfigs) { throw new RuntimeException("TableConfig validation failed"); } }; @@ -73,22 +74,22 @@ public void validateTaskConfigs(TableConfig tableConfig, Map tas when(_mockTaskManager.getTaskGeneratorRegistry()).thenReturn(_mockTaskRegistry); } - @Test (expectedExceptions = RuntimeException.class) + @Test(expectedExceptions = RuntimeException.class) public void testValidateTableTaskConfigsValidationException() { TableTaskConfig tableTaskConfig = new TableTaskConfig(ImmutableMap.of(TEST_TASK_TYPE, ImmutableMap.of("schedule", "0 */10 * ? * * *"))); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TEST_TABLE_NAME).setTaskConfig(tableTaskConfig).build(); - TaskConfigUtils.validateTaskConfigs(tableConfig, _mockTaskManager, null); + TaskConfigUtils.validateTaskConfigs(tableConfig, new Schema(), _mockTaskManager, null); } - @Test (expectedExceptions = RuntimeException.class) + @Test(expectedExceptions = RuntimeException.class) public void testValidateTableTaskConfigsUnknownTaskType() { TableTaskConfig tableTaskConfig = new TableTaskConfig(ImmutableMap.of("otherTask", ImmutableMap.of("schedule", "0 */10 * ? * * *"))); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TEST_TABLE_NAME).setTaskConfig(tableTaskConfig).build(); - TaskConfigUtils.validateTaskConfigs(tableConfig, _mockTaskManager, null); + TaskConfigUtils.validateTaskConfigs(tableConfig, new Schema(), _mockTaskManager, null); } @Test diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java index 73ff19ebef9f..128610ae6411 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java @@ -321,7 +321,7 @@ private long getWatermarkMs(String realtimeTableName, List co } @Override - public void validateTaskConfigs(TableConfig tableConfig, Map taskConfigs) { + public void validateTaskConfigs(TableConfig tableConfig, Schema schema, Map taskConfigs) { // check table is not upsert Preconditions.checkState(tableConfig.getUpsertMode() == UpsertConfig.Mode.NONE, "RealtimeToOfflineTask doesn't support upsert table!"); @@ -336,8 +336,8 @@ public void validateTaskConfigs(TableConfig tableConfig, Map tas Preconditions.checkState(ImmutableSet.of(MergeType.CONCAT.name(), MergeType.ROLLUP.name(), MergeType.DEDUP.name()) .contains(taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.MERGE_TYPE_KEY, MergeType.CONCAT.name()) .toUpperCase()), "MergeType must be one of [CONCAT, ROLLUP, DEDUP]!"); - - Schema schema = _clusterInfoAccessor.getPinotHelixResourceManager().getSchemaForTableConfig(tableConfig); + // check schema is not null + Preconditions.checkNotNull(schema, "Schema should not be null!"); // check no mis-configured columns Set columnNames = schema.getColumnNames(); for (Map.Entry entry : taskConfigs.entrySet()) { diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java index 77aaefc069d3..6be851682bc4 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java @@ -45,6 +45,7 @@ import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.config.table.UpsertConfig; +import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.TimeUtils; import org.slf4j.Logger; @@ -289,7 +290,7 @@ public static int getMaxTasks(String taskType, String tableNameWithType, Map taskConfigs) { + public void validateTaskConfigs(TableConfig tableConfig, Schema schema, Map taskConfigs) { // check table is realtime Preconditions.checkState(tableConfig.getTableType() == TableType.REALTIME, "UpsertCompactionTask only supports realtime tables!"); diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java index b15bff86980f..3c3df0bd4d39 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java @@ -47,6 +47,7 @@ import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.config.table.UpsertConfig; +import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.utils.DataSizeUtils; import org.apache.pinot.spi.utils.TimeUtils; import org.slf4j.Logger; @@ -425,7 +426,7 @@ protected static Set getAlreadyMergedSegments(List al } @Override - public void validateTaskConfigs(TableConfig tableConfig, Map taskConfigs) { + public void validateTaskConfigs(TableConfig tableConfig, Schema schema, Map taskConfigs) { // check table is realtime Preconditions.checkState(tableConfig.getTableType() == TableType.REALTIME, String.format("%s only supports realtime tables!", MinionConstants.UpsertCompactMergeTask.TASK_TYPE)); diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java index 49a9fd8d57d3..754f7224a248 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java @@ -541,7 +541,7 @@ public void testRealtimeToOfflineSegmentsTaskConfig() { "SegmentGenerationAndPushTask", segmentGenerationAndPushTaskConfig))).build(); // validate valid config - taskGenerator.validateTaskConfigs(tableConfig, realtimeToOfflineTaskConfig); + taskGenerator.validateTaskConfigs(tableConfig, schema, realtimeToOfflineTaskConfig); // invalid Upsert config with RealtimeToOfflineTask tableConfig = @@ -550,7 +550,7 @@ public void testRealtimeToOfflineSegmentsTaskConfig() { ImmutableMap.of("RealtimeToOfflineSegmentsTask", realtimeToOfflineTaskConfig, "SegmentGenerationAndPushTask", segmentGenerationAndPushTaskConfig))).build(); try { - taskGenerator.validateTaskConfigs(tableConfig, realtimeToOfflineTaskConfig); + taskGenerator.validateTaskConfigs(tableConfig, schema, realtimeToOfflineTaskConfig); Assert.fail(); } catch (IllegalStateException e) { Assert.assertTrue(e.getMessage().contains("RealtimeToOfflineTask doesn't support upsert table")); @@ -564,7 +564,7 @@ public void testRealtimeToOfflineSegmentsTaskConfig() { ImmutableMap.of("RealtimeToOfflineSegmentsTask", invalidPeriodConfig, "SegmentGenerationAndPushTask", segmentGenerationAndPushTaskConfig))).build(); try { - taskGenerator.validateTaskConfigs(tableConfig, invalidPeriodConfig); + taskGenerator.validateTaskConfigs(tableConfig, schema, invalidPeriodConfig); Assert.fail(); } catch (IllegalArgumentException e) { Assert.assertTrue(e.getMessage().contains("Invalid time spec")); @@ -578,7 +578,7 @@ public void testRealtimeToOfflineSegmentsTaskConfig() { ImmutableMap.of("RealtimeToOfflineSegmentsTask", invalidMergeType, "SegmentGenerationAndPushTask", segmentGenerationAndPushTaskConfig))).build(); try { - taskGenerator.validateTaskConfigs(tableConfig, invalidMergeType); + taskGenerator.validateTaskConfigs(tableConfig, schema, invalidMergeType); Assert.fail(); } catch (IllegalStateException e) { Assert.assertTrue(e.getMessage().contains("MergeType must be one of")); @@ -592,7 +592,7 @@ public void testRealtimeToOfflineSegmentsTaskConfig() { ImmutableMap.of("RealtimeToOfflineSegmentsTask", invalidColumnConfig, "SegmentGenerationAndPushTask", segmentGenerationAndPushTaskConfig))).build(); try { - taskGenerator.validateTaskConfigs(tableConfig, invalidColumnConfig); + taskGenerator.validateTaskConfigs(tableConfig, schema, invalidColumnConfig); Assert.fail(); } catch (IllegalStateException e) { Assert.assertTrue(e.getMessage().contains("not found in schema")); @@ -606,7 +606,7 @@ public void testRealtimeToOfflineSegmentsTaskConfig() { ImmutableMap.of("RealtimeToOfflineSegmentsTask", invalidAggConfig, "SegmentGenerationAndPushTask", segmentGenerationAndPushTaskConfig))).build(); try { - taskGenerator.validateTaskConfigs(tableConfig, invalidAggConfig); + taskGenerator.validateTaskConfigs(tableConfig, schema, invalidAggConfig); Assert.fail(); } catch (IllegalStateException e) { Assert.assertTrue(e.getMessage().contains("has invalid aggregate type")); @@ -620,7 +620,7 @@ public void testRealtimeToOfflineSegmentsTaskConfig() { ImmutableMap.of("RealtimeToOfflineSegmentsTask", invalidAgg2Config, "SegmentGenerationAndPushTask", segmentGenerationAndPushTaskConfig))).build(); try { - taskGenerator.validateTaskConfigs(tableConfig, invalidAgg2Config); + taskGenerator.validateTaskConfigs(tableConfig, schema, invalidAgg2Config); Assert.fail(); } catch (IllegalStateException e) { Assert.assertTrue(e.getMessage().contains("has invalid aggregate type")); @@ -633,7 +633,7 @@ public void testRealtimeToOfflineSegmentsTaskConfig() { new TableTaskConfig( ImmutableMap.of("RealtimeToOfflineSegmentsTask", validAggConfig, "SegmentGenerationAndPushTask", segmentGenerationAndPushTaskConfig))).build(); - taskGenerator.validateTaskConfigs(tableConfig, validAggConfig); + taskGenerator.validateTaskConfigs(tableConfig, schema, validAggConfig); // valid agg HashMap validAgg2Config = new HashMap<>(realtimeToOfflineTaskConfig); @@ -642,7 +642,7 @@ public void testRealtimeToOfflineSegmentsTaskConfig() { new TableTaskConfig( ImmutableMap.of("RealtimeToOfflineSegmentsTask", validAgg2Config, "SegmentGenerationAndPushTask", segmentGenerationAndPushTaskConfig))).build(); - taskGenerator.validateTaskConfigs(tableConfig, validAgg2Config); + taskGenerator.validateTaskConfigs(tableConfig, schema, validAgg2Config); } private SegmentZKMetadata getSegmentZKMetadata(String segmentName, Status status, long startTime, long endTime, @@ -659,7 +659,7 @@ private SegmentZKMetadata getSegmentZKMetadata(String segmentName, Status status private IdealState getIdealState(String tableName, List segmentNames) { IdealState idealState = new IdealState(tableName); idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED); - for (String segmentName: segmentNames) { + for (String segmentName : segmentNames) { idealState.setPartitionState(segmentName, "Server_0", "ONLINE"); } return idealState; diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java index 1204c5ae5f37..f4a31c180b0d 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java @@ -38,6 +38,7 @@ import org.apache.pinot.spi.config.table.TableTaskConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.config.table.UpsertConfig; +import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.spi.utils.TimeUtils; @@ -327,7 +328,7 @@ public void testUpsertCompactionTaskConfig() { .setTaskConfig(new TableTaskConfig(ImmutableMap.of("UpsertCompactionTask", upsertCompactionTaskConfig))) .build(); - _taskGenerator.validateTaskConfigs(tableConfig, upsertCompactionTaskConfig); + _taskGenerator.validateTaskConfigs(tableConfig, new Schema(), upsertCompactionTaskConfig); // test with invalidRecordsThresholdPercents as 0 Map upsertCompactionTaskConfig1 = ImmutableMap.of("invalidRecordsThresholdPercent", "0"); @@ -335,7 +336,7 @@ public void testUpsertCompactionTaskConfig() { .setUpsertConfig(upsertConfig) .setTaskConfig(new TableTaskConfig(ImmutableMap.of("UpsertCompactionTask", upsertCompactionTaskConfig1))) .build(); - _taskGenerator.validateTaskConfigs(zeroPercentTableConfig, upsertCompactionTaskConfig1); + _taskGenerator.validateTaskConfigs(zeroPercentTableConfig, new Schema(), upsertCompactionTaskConfig1); // test with invalid invalidRecordsThresholdPercents as -1 and 110 Map upsertCompactionTaskConfig2 = ImmutableMap.of("invalidRecordsThresholdPercent", "-1"); @@ -344,14 +345,16 @@ public void testUpsertCompactionTaskConfig() { .setTaskConfig(new TableTaskConfig(ImmutableMap.of("UpsertCompactionTask", upsertCompactionTaskConfig2))) .build(); Assert.assertThrows(IllegalStateException.class, - () -> _taskGenerator.validateTaskConfigs(negativePercentTableConfig, upsertCompactionTaskConfig2)); + () -> _taskGenerator.validateTaskConfigs(negativePercentTableConfig, new Schema(), + upsertCompactionTaskConfig2)); Map upsertCompactionTaskConfig3 = ImmutableMap.of("invalidRecordsThresholdPercent", "110"); TableConfig hundredTenPercentTableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME) .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL)) .setTaskConfig(new TableTaskConfig(ImmutableMap.of("UpsertCompactionTask", upsertCompactionTaskConfig3))) .build(); Assert.assertThrows(IllegalStateException.class, - () -> _taskGenerator.validateTaskConfigs(hundredTenPercentTableConfig, upsertCompactionTaskConfig3)); + () -> _taskGenerator.validateTaskConfigs(hundredTenPercentTableConfig, new Schema(), + upsertCompactionTaskConfig3)); // test with invalid invalidRecordsThresholdCount Map upsertCompactionTaskConfig4 = ImmutableMap.of("invalidRecordsThresholdCount", "0"); @@ -360,7 +363,7 @@ public void testUpsertCompactionTaskConfig() { .setTaskConfig(new TableTaskConfig(ImmutableMap.of("UpsertCompactionTask", upsertCompactionTaskConfig4))) .build(); Assert.assertThrows(IllegalStateException.class, - () -> _taskGenerator.validateTaskConfigs(invalidCountTableConfig, upsertCompactionTaskConfig4)); + () -> _taskGenerator.validateTaskConfigs(invalidCountTableConfig, new Schema(), upsertCompactionTaskConfig4)); // test without invalidRecordsThresholdPercent or invalidRecordsThresholdCount Map upsertCompactionTaskConfig5 = ImmutableMap.of("bufferTimePeriod", "5d"); @@ -369,7 +372,7 @@ public void testUpsertCompactionTaskConfig() { .setTaskConfig(new TableTaskConfig(ImmutableMap.of("UpsertCompactionTask", upsertCompactionTaskConfig5))) .build(); Assert.assertThrows(IllegalStateException.class, - () -> _taskGenerator.validateTaskConfigs(invalidTableConfig, upsertCompactionTaskConfig5)); + () -> _taskGenerator.validateTaskConfigs(invalidTableConfig, new Schema(), upsertCompactionTaskConfig5)); } private Map getCompactionConfigs(String invalidRecordsThresholdPercent, diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java index 3709392e76e0..7e4fbda5f563 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java @@ -33,6 +33,7 @@ import org.apache.pinot.spi.config.table.TableTaskConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.config.table.UpsertConfig; +import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.TimeUtils; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; @@ -96,11 +97,11 @@ public void testUpsertCompactMergeTaskConfig() { ImmutableMap.of("bufferTimePeriod", "5d"); TableConfig offlineTableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTaskConfig( - new TableTaskConfig(ImmutableMap.of(MinionConstants.UpsertCompactMergeTask.TASK_TYPE, - upsertCompactMergeTaskConfig))) - .build(); + new TableTaskConfig(ImmutableMap.of(MinionConstants.UpsertCompactMergeTask.TASK_TYPE, + upsertCompactMergeTaskConfig))) + .build(); Assert.assertThrows(IllegalStateException.class, - () -> _taskGenerator.validateTaskConfigs(offlineTableConfig, upsertCompactMergeTaskConfig)); + () -> _taskGenerator.validateTaskConfigs(offlineTableConfig, new Schema(), upsertCompactMergeTaskConfig)); // check with non-upsert REALTIME table TableConfig nonUpsertRealtimetableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME) @@ -109,7 +110,8 @@ public void testUpsertCompactMergeTaskConfig() { .build(); Assert.assertThrows(IllegalStateException.class, - () -> _taskGenerator.validateTaskConfigs(nonUpsertRealtimetableConfig, upsertCompactMergeTaskConfig)); + () -> _taskGenerator.validateTaskConfigs(nonUpsertRealtimetableConfig, new Schema(), + upsertCompactMergeTaskConfig)); // check with snapshot disabled TableConfig disabledSnapshotTableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME) @@ -118,7 +120,8 @@ public void testUpsertCompactMergeTaskConfig() { upsertCompactMergeTaskConfig))) .build(); Assert.assertThrows(IllegalStateException.class, - () -> _taskGenerator.validateTaskConfigs(disabledSnapshotTableConfig, upsertCompactMergeTaskConfig)); + () -> _taskGenerator.validateTaskConfigs(disabledSnapshotTableConfig, new Schema(), + upsertCompactMergeTaskConfig)); // valid table configs UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL); @@ -128,13 +131,13 @@ public void testUpsertCompactMergeTaskConfig() { .setTaskConfig(new TableTaskConfig(ImmutableMap.of(MinionConstants.UpsertCompactMergeTask.TASK_TYPE, upsertCompactMergeTaskConfig))) .build(); - _taskGenerator.validateTaskConfigs(validTableConfig, upsertCompactMergeTaskConfig); + _taskGenerator.validateTaskConfigs(validTableConfig, new Schema(), upsertCompactMergeTaskConfig); // invalid buffer time period Map upsertCompactMergeTaskConfig1 = ImmutableMap.of("bufferTimePeriod", "5hd"); Assert.assertThrows(IllegalArgumentException.class, - () -> _taskGenerator.validateTaskConfigs(validTableConfig, upsertCompactMergeTaskConfig1)); + () -> _taskGenerator.validateTaskConfigs(validTableConfig, new Schema(), upsertCompactMergeTaskConfig1)); } @Test