Skip to content

Commit

Permalink
Add Schema to the PinotTaskGenerator validateTaskConfigs method call (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangfu0 authored Jan 8, 2025
1 parent 0600185 commit 6b1ccc5
Show file tree
Hide file tree
Showing 11 changed files with 58 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ public ConfigSuccessResponse addTable(String tableConfigStr,
Pair<TableConfig, Map<String, Object>> tableConfigAndUnrecognizedProperties;
TableConfig tableConfig;
String tableNameWithType;
Schema schema;
try {
tableConfigAndUnrecognizedProperties =
JsonUtils.stringToObjectAndUnrecognizedProperties(tableConfigStr, TableConfig.class);
Expand All @@ -224,7 +225,7 @@ public ConfigSuccessResponse addTable(String tableConfigStr,
ResourceUtils.checkPermissionAndAccess(tableNameWithType, request, httpHeaders,
AccessType.CREATE, Actions.Table.CREATE_TABLE, _accessControlFactory, LOGGER);

Schema schema = _pinotHelixResourceManager.getSchemaForTableConfig(tableConfig);
schema = _pinotHelixResourceManager.getSchemaForTableConfig(tableConfig);

TableConfigTunerUtils.applyTunerConfigs(_pinotHelixResourceManager, tableConfig, schema, Collections.emptyMap());

Expand All @@ -239,7 +240,7 @@ public ConfigSuccessResponse addTable(String tableConfigStr,
TableConfigUtils.ensureMinReplicas(tableConfig, _controllerConf.getDefaultTableMinReplicas());
TableConfigUtils.ensureStorageQuotaConstraints(tableConfig, _controllerConf.getDimTableMaxSize());
checkHybridTableConfig(TableNameBuilder.extractRawTableName(tableNameWithType), tableConfig);
TaskConfigUtils.validateTaskConfigs(tableConfig, _pinotTaskManager, typesToSkip);
TaskConfigUtils.validateTaskConfigs(tableConfig, schema, _pinotTaskManager, typesToSkip);
} catch (Exception e) {
throw new InvalidTableConfigException(e);
}
Expand Down Expand Up @@ -481,6 +482,7 @@ public ConfigSuccessResponse updateTableConfig(
Pair<TableConfig, Map<String, Object>> tableConfigAndUnrecognizedProperties;
TableConfig tableConfig;
String tableNameWithType;
Schema schema;
try {
tableConfigAndUnrecognizedProperties =
JsonUtils.stringToObjectAndUnrecognizedProperties(tableConfigString, TableConfig.class);
Expand All @@ -497,7 +499,7 @@ public ConfigSuccessResponse updateTableConfig(
Response.Status.BAD_REQUEST);
}

Schema schema = _pinotHelixResourceManager.getSchemaForTableConfig(tableConfig);
schema = _pinotHelixResourceManager.getSchemaForTableConfig(tableConfig);
TableConfigUtils.validate(tableConfig, schema, typesToSkip);
} catch (Exception e) {
String msg = String.format("Invalid table config: %s with error: %s", tableName, e.getMessage());
Expand All @@ -514,7 +516,7 @@ public ConfigSuccessResponse updateTableConfig(
TableConfigUtils.ensureMinReplicas(tableConfig, _controllerConf.getDefaultTableMinReplicas());
TableConfigUtils.ensureStorageQuotaConstraints(tableConfig, _controllerConf.getDimTableMaxSize());
checkHybridTableConfig(TableNameBuilder.extractRawTableName(tableNameWithType), tableConfig);
TaskConfigUtils.validateTaskConfigs(tableConfig, _pinotTaskManager, typesToSkip);
TaskConfigUtils.validateTaskConfigs(tableConfig, schema, _pinotTaskManager, typesToSkip);
} catch (Exception e) {
throw new InvalidTableConfigException(e);
}
Expand Down Expand Up @@ -575,7 +577,7 @@ private ObjectNode validateConfig(TableConfig tableConfig, Schema schema, @Nulla
throw new SchemaNotFoundException("Got empty schema");
}
TableConfigUtils.validate(tableConfig, schema, typesToSkip);
TaskConfigUtils.validateTaskConfigs(tableConfig, _pinotTaskManager, typesToSkip);
TaskConfigUtils.validateTaskConfigs(tableConfig, schema, _pinotTaskManager, typesToSkip);
ObjectNode tableConfigValidateStr = JsonUtils.newObjectNode();
if (tableConfig.getTableType() == TableType.OFFLINE) {
tableConfigValidateStr.set(TableType.OFFLINE.name(), tableConfig.toJsonNode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ private void validateConfig(TableConfigs tableConfigs, String database, @Nullabl
"Name in 'offline' table config: %s must be equal to 'tableName': %s", offlineRawTableName, rawTableName);
TableConfigUtils.validateTableName(offlineTableConfig);
TableConfigUtils.validate(offlineTableConfig, schema, typesToSkip);
TaskConfigUtils.validateTaskConfigs(tableConfigs.getOffline(), _pinotTaskManager, typesToSkip);
TaskConfigUtils.validateTaskConfigs(tableConfigs.getOffline(), schema, _pinotTaskManager, typesToSkip);
}
if (realtimeTableConfig != null) {
String realtimeRawTableName = DatabaseUtils.translateTableName(
Expand All @@ -471,7 +471,7 @@ private void validateConfig(TableConfigs tableConfigs, String database, @Nullabl
"Name in 'realtime' table config: %s must be equal to 'tableName': %s", realtimeRawTableName, rawTableName);
TableConfigUtils.validateTableName(realtimeTableConfig);
TableConfigUtils.validate(realtimeTableConfig, schema, typesToSkip);
TaskConfigUtils.validateTaskConfigs(tableConfigs.getRealtime(), _pinotTaskManager, typesToSkip);
TaskConfigUtils.validateTaskConfigs(tableConfigs.getRealtime(), schema, _pinotTaskManager, typesToSkip);
}
if (offlineTableConfig != null && realtimeTableConfig != null) {
TableConfigUtils.verifyHybridTableConfigs(rawTableName, offlineTableConfig, realtimeTableConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.CommonConstants;


Expand Down Expand Up @@ -103,8 +104,9 @@ default String getMinionInstanceTag(TableConfig tableConfig) {
/**
* Performs task type specific validations for the given task type.
* @param tableConfig The table configuration that is getting added/updated/validated.
* @param schema The schema of the table.
* @param taskConfigs The task type specific task configuration to be validated.
*/
default void validateTaskConfigs(TableConfig tableConfig, Map<String, String> taskConfigs) {
default void validateTaskConfigs(TableConfig tableConfig, Schema schema, Map<String, String> taskConfigs) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorRegistry;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableTaskConfig;
import org.apache.pinot.spi.data.Schema;
import org.quartz.CronScheduleBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -40,7 +41,7 @@ public class TaskConfigUtils {
private TaskConfigUtils() {
}

public static void validateTaskConfigs(TableConfig tableConfig, PinotTaskManager pinotTaskManager,
public static void validateTaskConfigs(TableConfig tableConfig, Schema schema, PinotTaskManager pinotTaskManager,
String validationTypesToSkip) {
if (tableConfig == null || tableConfig.getTaskConfig() == null) {
return;
Expand All @@ -59,7 +60,7 @@ public static void validateTaskConfigs(TableConfig tableConfig, PinotTaskManager
if (taskGenerator != null) {
Map<String, String> taskConfigs = taskConfigEntry.getValue();
doCommonTaskValidations(tableConfig, taskType, taskConfigs);
taskGenerator.validateTaskConfigs(tableConfig, taskConfigs);
taskGenerator.validateTaskConfigs(tableConfig, schema, taskConfigs);
} else {
throw new RuntimeException(String.format("Task generator not found for task type: %s, while validating table "
+ "configs for table: %s", taskType, tableConfig.getTableName()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableTaskConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.mockito.Mockito;
import org.testng.Assert;
Expand Down Expand Up @@ -64,7 +65,7 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
}

@Override
public void validateTaskConfigs(TableConfig tableConfig, Map<String, String> taskConfigs) {
public void validateTaskConfigs(TableConfig tableConfig, Schema schema, Map<String, String> taskConfigs) {
throw new RuntimeException("TableConfig validation failed");
}
};
Expand All @@ -73,22 +74,22 @@ public void validateTaskConfigs(TableConfig tableConfig, Map<String, String> tas
when(_mockTaskManager.getTaskGeneratorRegistry()).thenReturn(_mockTaskRegistry);
}

@Test (expectedExceptions = RuntimeException.class)
@Test(expectedExceptions = RuntimeException.class)
public void testValidateTableTaskConfigsValidationException() {
TableTaskConfig tableTaskConfig =
new TableTaskConfig(ImmutableMap.of(TEST_TASK_TYPE, ImmutableMap.of("schedule", "0 */10 * ? * * *")));
TableConfig tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName(TEST_TABLE_NAME).setTaskConfig(tableTaskConfig).build();
TaskConfigUtils.validateTaskConfigs(tableConfig, _mockTaskManager, null);
TaskConfigUtils.validateTaskConfigs(tableConfig, new Schema(), _mockTaskManager, null);
}

@Test (expectedExceptions = RuntimeException.class)
@Test(expectedExceptions = RuntimeException.class)
public void testValidateTableTaskConfigsUnknownTaskType() {
TableTaskConfig tableTaskConfig =
new TableTaskConfig(ImmutableMap.of("otherTask", ImmutableMap.of("schedule", "0 */10 * ? * * *")));
TableConfig tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName(TEST_TABLE_NAME).setTaskConfig(tableTaskConfig).build();
TaskConfigUtils.validateTaskConfigs(tableConfig, _mockTaskManager, null);
TaskConfigUtils.validateTaskConfigs(tableConfig, new Schema(), _mockTaskManager, null);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ private long getWatermarkMs(String realtimeTableName, List<SegmentZKMetadata> co
}

@Override
public void validateTaskConfigs(TableConfig tableConfig, Map<String, String> taskConfigs) {
public void validateTaskConfigs(TableConfig tableConfig, Schema schema, Map<String, String> taskConfigs) {
// check table is not upsert
Preconditions.checkState(tableConfig.getUpsertMode() == UpsertConfig.Mode.NONE,
"RealtimeToOfflineTask doesn't support upsert table!");
Expand All @@ -336,8 +336,8 @@ public void validateTaskConfigs(TableConfig tableConfig, Map<String, String> tas
Preconditions.checkState(ImmutableSet.of(MergeType.CONCAT.name(), MergeType.ROLLUP.name(), MergeType.DEDUP.name())
.contains(taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.MERGE_TYPE_KEY, MergeType.CONCAT.name())
.toUpperCase()), "MergeType must be one of [CONCAT, ROLLUP, DEDUP]!");

Schema schema = _clusterInfoAccessor.getPinotHelixResourceManager().getSchemaForTableConfig(tableConfig);
// check schema is not null
Preconditions.checkNotNull(schema, "Schema should not be null!");
// check no mis-configured columns
Set<String> columnNames = schema.getColumnNames();
for (Map.Entry<String, String> entry : taskConfigs.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.TimeUtils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -289,7 +290,7 @@ public static int getMaxTasks(String taskType, String tableNameWithType, Map<Str
}

@Override
public void validateTaskConfigs(TableConfig tableConfig, Map<String, String> taskConfigs) {
public void validateTaskConfigs(TableConfig tableConfig, Schema schema, Map<String, String> taskConfigs) {
// check table is realtime
Preconditions.checkState(tableConfig.getTableType() == TableType.REALTIME,
"UpsertCompactionTask only supports realtime tables!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.DataSizeUtils;
import org.apache.pinot.spi.utils.TimeUtils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -425,7 +426,7 @@ protected static Set<String> getAlreadyMergedSegments(List<SegmentZKMetadata> al
}

@Override
public void validateTaskConfigs(TableConfig tableConfig, Map<String, String> taskConfigs) {
public void validateTaskConfigs(TableConfig tableConfig, Schema schema, Map<String, String> taskConfigs) {
// check table is realtime
Preconditions.checkState(tableConfig.getTableType() == TableType.REALTIME,
String.format("%s only supports realtime tables!", MinionConstants.UpsertCompactMergeTask.TASK_TYPE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ public void testRealtimeToOfflineSegmentsTaskConfig() {
"SegmentGenerationAndPushTask", segmentGenerationAndPushTaskConfig))).build();

// validate valid config
taskGenerator.validateTaskConfigs(tableConfig, realtimeToOfflineTaskConfig);
taskGenerator.validateTaskConfigs(tableConfig, schema, realtimeToOfflineTaskConfig);

// invalid Upsert config with RealtimeToOfflineTask
tableConfig =
Expand All @@ -550,7 +550,7 @@ public void testRealtimeToOfflineSegmentsTaskConfig() {
ImmutableMap.of("RealtimeToOfflineSegmentsTask", realtimeToOfflineTaskConfig,
"SegmentGenerationAndPushTask", segmentGenerationAndPushTaskConfig))).build();
try {
taskGenerator.validateTaskConfigs(tableConfig, realtimeToOfflineTaskConfig);
taskGenerator.validateTaskConfigs(tableConfig, schema, realtimeToOfflineTaskConfig);
Assert.fail();
} catch (IllegalStateException e) {
Assert.assertTrue(e.getMessage().contains("RealtimeToOfflineTask doesn't support upsert table"));
Expand All @@ -564,7 +564,7 @@ public void testRealtimeToOfflineSegmentsTaskConfig() {
ImmutableMap.of("RealtimeToOfflineSegmentsTask", invalidPeriodConfig, "SegmentGenerationAndPushTask",
segmentGenerationAndPushTaskConfig))).build();
try {
taskGenerator.validateTaskConfigs(tableConfig, invalidPeriodConfig);
taskGenerator.validateTaskConfigs(tableConfig, schema, invalidPeriodConfig);
Assert.fail();
} catch (IllegalArgumentException e) {
Assert.assertTrue(e.getMessage().contains("Invalid time spec"));
Expand All @@ -578,7 +578,7 @@ public void testRealtimeToOfflineSegmentsTaskConfig() {
ImmutableMap.of("RealtimeToOfflineSegmentsTask", invalidMergeType, "SegmentGenerationAndPushTask",
segmentGenerationAndPushTaskConfig))).build();
try {
taskGenerator.validateTaskConfigs(tableConfig, invalidMergeType);
taskGenerator.validateTaskConfigs(tableConfig, schema, invalidMergeType);
Assert.fail();
} catch (IllegalStateException e) {
Assert.assertTrue(e.getMessage().contains("MergeType must be one of"));
Expand All @@ -592,7 +592,7 @@ public void testRealtimeToOfflineSegmentsTaskConfig() {
ImmutableMap.of("RealtimeToOfflineSegmentsTask", invalidColumnConfig, "SegmentGenerationAndPushTask",
segmentGenerationAndPushTaskConfig))).build();
try {
taskGenerator.validateTaskConfigs(tableConfig, invalidColumnConfig);
taskGenerator.validateTaskConfigs(tableConfig, schema, invalidColumnConfig);
Assert.fail();
} catch (IllegalStateException e) {
Assert.assertTrue(e.getMessage().contains("not found in schema"));
Expand All @@ -606,7 +606,7 @@ public void testRealtimeToOfflineSegmentsTaskConfig() {
ImmutableMap.of("RealtimeToOfflineSegmentsTask", invalidAggConfig, "SegmentGenerationAndPushTask",
segmentGenerationAndPushTaskConfig))).build();
try {
taskGenerator.validateTaskConfigs(tableConfig, invalidAggConfig);
taskGenerator.validateTaskConfigs(tableConfig, schema, invalidAggConfig);
Assert.fail();
} catch (IllegalStateException e) {
Assert.assertTrue(e.getMessage().contains("has invalid aggregate type"));
Expand All @@ -620,7 +620,7 @@ public void testRealtimeToOfflineSegmentsTaskConfig() {
ImmutableMap.of("RealtimeToOfflineSegmentsTask", invalidAgg2Config, "SegmentGenerationAndPushTask",
segmentGenerationAndPushTaskConfig))).build();
try {
taskGenerator.validateTaskConfigs(tableConfig, invalidAgg2Config);
taskGenerator.validateTaskConfigs(tableConfig, schema, invalidAgg2Config);
Assert.fail();
} catch (IllegalStateException e) {
Assert.assertTrue(e.getMessage().contains("has invalid aggregate type"));
Expand All @@ -633,7 +633,7 @@ public void testRealtimeToOfflineSegmentsTaskConfig() {
new TableTaskConfig(
ImmutableMap.of("RealtimeToOfflineSegmentsTask", validAggConfig, "SegmentGenerationAndPushTask",
segmentGenerationAndPushTaskConfig))).build();
taskGenerator.validateTaskConfigs(tableConfig, validAggConfig);
taskGenerator.validateTaskConfigs(tableConfig, schema, validAggConfig);

// valid agg
HashMap<String, String> validAgg2Config = new HashMap<>(realtimeToOfflineTaskConfig);
Expand All @@ -642,7 +642,7 @@ public void testRealtimeToOfflineSegmentsTaskConfig() {
new TableTaskConfig(
ImmutableMap.of("RealtimeToOfflineSegmentsTask", validAgg2Config, "SegmentGenerationAndPushTask",
segmentGenerationAndPushTaskConfig))).build();
taskGenerator.validateTaskConfigs(tableConfig, validAgg2Config);
taskGenerator.validateTaskConfigs(tableConfig, schema, validAgg2Config);
}

private SegmentZKMetadata getSegmentZKMetadata(String segmentName, Status status, long startTime, long endTime,
Expand All @@ -659,7 +659,7 @@ private SegmentZKMetadata getSegmentZKMetadata(String segmentName, Status status
private IdealState getIdealState(String tableName, List<String> segmentNames) {
IdealState idealState = new IdealState(tableName);
idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
for (String segmentName: segmentNames) {
for (String segmentName : segmentNames) {
idealState.setPartitionState(segmentName, "Server_0", "ONLINE");
}
return idealState;
Expand Down
Loading

0 comments on commit 6b1ccc5

Please sign in to comment.