Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skip task validation during table creation with schema #14683

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ public ConfigSuccessResponse addTable(String tableConfigStr,
Pair<TableConfig, Map<String, Object>> tableConfigAndUnrecognizedProperties;
TableConfig tableConfig;
String tableNameWithType;
Schema schema;
try {
tableConfigAndUnrecognizedProperties =
JsonUtils.stringToObjectAndUnrecognizedProperties(tableConfigStr, TableConfig.class);
Expand All @@ -224,7 +225,7 @@ public ConfigSuccessResponse addTable(String tableConfigStr,
ResourceUtils.checkPermissionAndAccess(tableNameWithType, request, httpHeaders,
AccessType.CREATE, Actions.Table.CREATE_TABLE, _accessControlFactory, LOGGER);

Schema schema = _pinotHelixResourceManager.getSchemaForTableConfig(tableConfig);
schema = _pinotHelixResourceManager.getSchemaForTableConfig(tableConfig);

TableConfigTunerUtils.applyTunerConfigs(_pinotHelixResourceManager, tableConfig, schema, Collections.emptyMap());

Expand All @@ -239,7 +240,7 @@ public ConfigSuccessResponse addTable(String tableConfigStr,
TableConfigUtils.ensureMinReplicas(tableConfig, _controllerConf.getDefaultTableMinReplicas());
TableConfigUtils.ensureStorageQuotaConstraints(tableConfig, _controllerConf.getDimTableMaxSize());
checkHybridTableConfig(TableNameBuilder.extractRawTableName(tableNameWithType), tableConfig);
TaskConfigUtils.validateTaskConfigs(tableConfig, _pinotTaskManager, typesToSkip);
TaskConfigUtils.validateTaskConfigs(tableConfig, schema, _pinotTaskManager, typesToSkip);
} catch (Exception e) {
throw new InvalidTableConfigException(e);
}
Expand Down Expand Up @@ -481,6 +482,7 @@ public ConfigSuccessResponse updateTableConfig(
Pair<TableConfig, Map<String, Object>> tableConfigAndUnrecognizedProperties;
TableConfig tableConfig;
String tableNameWithType;
Schema schema;
try {
tableConfigAndUnrecognizedProperties =
JsonUtils.stringToObjectAndUnrecognizedProperties(tableConfigString, TableConfig.class);
Expand All @@ -497,7 +499,7 @@ public ConfigSuccessResponse updateTableConfig(
Response.Status.BAD_REQUEST);
}

Schema schema = _pinotHelixResourceManager.getSchemaForTableConfig(tableConfig);
schema = _pinotHelixResourceManager.getSchemaForTableConfig(tableConfig);
TableConfigUtils.validate(tableConfig, schema, typesToSkip);
} catch (Exception e) {
String msg = String.format("Invalid table config: %s with error: %s", tableName, e.getMessage());
Expand All @@ -514,7 +516,7 @@ public ConfigSuccessResponse updateTableConfig(
TableConfigUtils.ensureMinReplicas(tableConfig, _controllerConf.getDefaultTableMinReplicas());
TableConfigUtils.ensureStorageQuotaConstraints(tableConfig, _controllerConf.getDimTableMaxSize());
checkHybridTableConfig(TableNameBuilder.extractRawTableName(tableNameWithType), tableConfig);
TaskConfigUtils.validateTaskConfigs(tableConfig, _pinotTaskManager, typesToSkip);
TaskConfigUtils.validateTaskConfigs(tableConfig, schema, _pinotTaskManager, typesToSkip);
} catch (Exception e) {
throw new InvalidTableConfigException(e);
}
Expand Down Expand Up @@ -575,7 +577,7 @@ private ObjectNode validateConfig(TableConfig tableConfig, Schema schema, @Nulla
throw new SchemaNotFoundException("Got empty schema");
}
TableConfigUtils.validate(tableConfig, schema, typesToSkip);
TaskConfigUtils.validateTaskConfigs(tableConfig, _pinotTaskManager, typesToSkip);
TaskConfigUtils.validateTaskConfigs(tableConfig, schema, _pinotTaskManager, typesToSkip);
ObjectNode tableConfigValidateStr = JsonUtils.newObjectNode();
if (tableConfig.getTableType() == TableType.OFFLINE) {
tableConfigValidateStr.set(TableType.OFFLINE.name(), tableConfig.toJsonNode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ private void validateConfig(TableConfigs tableConfigs, String database, @Nullabl
"Name in 'offline' table config: %s must be equal to 'tableName': %s", offlineRawTableName, rawTableName);
TableConfigUtils.validateTableName(offlineTableConfig);
TableConfigUtils.validate(offlineTableConfig, schema, typesToSkip);
TaskConfigUtils.validateTaskConfigs(tableConfigs.getOffline(), _pinotTaskManager, typesToSkip);
TaskConfigUtils.validateTaskConfigs(tableConfigs.getOffline(), schema, _pinotTaskManager, typesToSkip);
}
if (realtimeTableConfig != null) {
String realtimeRawTableName = DatabaseUtils.translateTableName(
Expand All @@ -471,7 +471,7 @@ private void validateConfig(TableConfigs tableConfigs, String database, @Nullabl
"Name in 'realtime' table config: %s must be equal to 'tableName': %s", realtimeRawTableName, rawTableName);
TableConfigUtils.validateTableName(realtimeTableConfig);
TableConfigUtils.validate(realtimeTableConfig, schema, typesToSkip);
TaskConfigUtils.validateTaskConfigs(tableConfigs.getRealtime(), _pinotTaskManager, typesToSkip);
TaskConfigUtils.validateTaskConfigs(tableConfigs.getRealtime(), schema, _pinotTaskManager, typesToSkip);
}
if (offlineTableConfig != null && realtimeTableConfig != null) {
TableConfigUtils.verifyHybridTableConfigs(rawTableName, offlineTableConfig, realtimeTableConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.CommonConstants;


Expand Down Expand Up @@ -103,8 +104,9 @@ default String getMinionInstanceTag(TableConfig tableConfig) {
/**
* Performs task type specific validations for the given task type.
* @param tableConfig The table configuration that is getting added/updated/validated.
* @param schema The schema of the table.
* @param taskConfigs The task type specific task configuration to be validated.
*/
default void validateTaskConfigs(TableConfig tableConfig, Map<String, String> taskConfigs) {
default void validateTaskConfigs(TableConfig tableConfig, Schema schema, Map<String, String> taskConfigs) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorRegistry;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableTaskConfig;
import org.apache.pinot.spi.data.Schema;
import org.quartz.CronScheduleBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -40,7 +41,7 @@ public class TaskConfigUtils {
private TaskConfigUtils() {
}

public static void validateTaskConfigs(TableConfig tableConfig, PinotTaskManager pinotTaskManager,
public static void validateTaskConfigs(TableConfig tableConfig, Schema schema, PinotTaskManager pinotTaskManager,
String validationTypesToSkip) {
if (tableConfig == null || tableConfig.getTaskConfig() == null) {
return;
Expand All @@ -59,7 +60,7 @@ public static void validateTaskConfigs(TableConfig tableConfig, PinotTaskManager
if (taskGenerator != null) {
Map<String, String> taskConfigs = taskConfigEntry.getValue();
doCommonTaskValidations(tableConfig, taskType, taskConfigs);
taskGenerator.validateTaskConfigs(tableConfig, taskConfigs);
taskGenerator.validateTaskConfigs(tableConfig, schema, taskConfigs);
} else {
throw new RuntimeException(String.format("Task generator not found for task type: %s, while validating table "
+ "configs for table: %s", taskType, tableConfig.getTableName()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableTaskConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.mockito.Mockito;
import org.testng.Assert;
Expand Down Expand Up @@ -64,7 +65,7 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
}

@Override
public void validateTaskConfigs(TableConfig tableConfig, Map<String, String> taskConfigs) {
public void validateTaskConfigs(TableConfig tableConfig, Schema schema, Map<String, String> taskConfigs) {
throw new RuntimeException("TableConfig validation failed");
}
};
Expand All @@ -73,22 +74,22 @@ public void validateTaskConfigs(TableConfig tableConfig, Map<String, String> tas
when(_mockTaskManager.getTaskGeneratorRegistry()).thenReturn(_mockTaskRegistry);
}

@Test (expectedExceptions = RuntimeException.class)
@Test(expectedExceptions = RuntimeException.class)
public void testValidateTableTaskConfigsValidationException() {
TableTaskConfig tableTaskConfig =
new TableTaskConfig(ImmutableMap.of(TEST_TASK_TYPE, ImmutableMap.of("schedule", "0 */10 * ? * * *")));
TableConfig tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName(TEST_TABLE_NAME).setTaskConfig(tableTaskConfig).build();
TaskConfigUtils.validateTaskConfigs(tableConfig, _mockTaskManager, null);
TaskConfigUtils.validateTaskConfigs(tableConfig, new Schema(), _mockTaskManager, null);
}

@Test (expectedExceptions = RuntimeException.class)
@Test(expectedExceptions = RuntimeException.class)
public void testValidateTableTaskConfigsUnknownTaskType() {
TableTaskConfig tableTaskConfig =
new TableTaskConfig(ImmutableMap.of("otherTask", ImmutableMap.of("schedule", "0 */10 * ? * * *")));
TableConfig tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName(TEST_TABLE_NAME).setTaskConfig(tableTaskConfig).build();
TaskConfigUtils.validateTaskConfigs(tableConfig, _mockTaskManager, null);
TaskConfigUtils.validateTaskConfigs(tableConfig, new Schema(), _mockTaskManager, null);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ private long getWatermarkMs(String realtimeTableName, List<SegmentZKMetadata> co
}

@Override
public void validateTaskConfigs(TableConfig tableConfig, Map<String, String> taskConfigs) {
public void validateTaskConfigs(TableConfig tableConfig, Schema schema, Map<String, String> taskConfigs) {
// check table is not upsert
Preconditions.checkState(tableConfig.getUpsertMode() == UpsertConfig.Mode.NONE,
"RealtimeToOfflineTask doesn't support upsert table!");
Expand All @@ -336,8 +336,8 @@ public void validateTaskConfigs(TableConfig tableConfig, Map<String, String> tas
Preconditions.checkState(ImmutableSet.of(MergeType.CONCAT.name(), MergeType.ROLLUP.name(), MergeType.DEDUP.name())
.contains(taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.MERGE_TYPE_KEY, MergeType.CONCAT.name())
.toUpperCase()), "MergeType must be one of [CONCAT, ROLLUP, DEDUP]!");

Schema schema = _clusterInfoAccessor.getPinotHelixResourceManager().getSchemaForTableConfig(tableConfig);
// check schema is not null
Preconditions.checkNotNull(schema, "Schema should not be null!");
// check no mis-configured columns
Set<String> columnNames = schema.getColumnNames();
for (Map.Entry<String, String> entry : taskConfigs.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.TimeUtils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -289,7 +290,7 @@ public static int getMaxTasks(String taskType, String tableNameWithType, Map<Str
}

@Override
public void validateTaskConfigs(TableConfig tableConfig, Map<String, String> taskConfigs) {
public void validateTaskConfigs(TableConfig tableConfig, Schema schema, Map<String, String> taskConfigs) {
// check table is realtime
Preconditions.checkState(tableConfig.getTableType() == TableType.REALTIME,
"UpsertCompactionTask only supports realtime tables!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.DataSizeUtils;
import org.apache.pinot.spi.utils.TimeUtils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -425,7 +426,7 @@ protected static Set<String> getAlreadyMergedSegments(List<SegmentZKMetadata> al
}

@Override
public void validateTaskConfigs(TableConfig tableConfig, Map<String, String> taskConfigs) {
public void validateTaskConfigs(TableConfig tableConfig, Schema schema, Map<String, String> taskConfigs) {
// check table is realtime
Preconditions.checkState(tableConfig.getTableType() == TableType.REALTIME,
String.format("%s only supports realtime tables!", MinionConstants.UpsertCompactMergeTask.TASK_TYPE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ public void testRealtimeToOfflineSegmentsTaskConfig() {
"SegmentGenerationAndPushTask", segmentGenerationAndPushTaskConfig))).build();

// validate valid config
taskGenerator.validateTaskConfigs(tableConfig, realtimeToOfflineTaskConfig);
taskGenerator.validateTaskConfigs(tableConfig, schema, realtimeToOfflineTaskConfig);

// invalid Upsert config with RealtimeToOfflineTask
tableConfig =
Expand All @@ -550,7 +550,7 @@ public void testRealtimeToOfflineSegmentsTaskConfig() {
ImmutableMap.of("RealtimeToOfflineSegmentsTask", realtimeToOfflineTaskConfig,
"SegmentGenerationAndPushTask", segmentGenerationAndPushTaskConfig))).build();
try {
taskGenerator.validateTaskConfigs(tableConfig, realtimeToOfflineTaskConfig);
taskGenerator.validateTaskConfigs(tableConfig, schema, realtimeToOfflineTaskConfig);
Assert.fail();
} catch (IllegalStateException e) {
Assert.assertTrue(e.getMessage().contains("RealtimeToOfflineTask doesn't support upsert table"));
Expand All @@ -564,7 +564,7 @@ public void testRealtimeToOfflineSegmentsTaskConfig() {
ImmutableMap.of("RealtimeToOfflineSegmentsTask", invalidPeriodConfig, "SegmentGenerationAndPushTask",
segmentGenerationAndPushTaskConfig))).build();
try {
taskGenerator.validateTaskConfigs(tableConfig, invalidPeriodConfig);
taskGenerator.validateTaskConfigs(tableConfig, schema, invalidPeriodConfig);
Assert.fail();
} catch (IllegalArgumentException e) {
Assert.assertTrue(e.getMessage().contains("Invalid time spec"));
Expand All @@ -578,7 +578,7 @@ public void testRealtimeToOfflineSegmentsTaskConfig() {
ImmutableMap.of("RealtimeToOfflineSegmentsTask", invalidMergeType, "SegmentGenerationAndPushTask",
segmentGenerationAndPushTaskConfig))).build();
try {
taskGenerator.validateTaskConfigs(tableConfig, invalidMergeType);
taskGenerator.validateTaskConfigs(tableConfig, schema, invalidMergeType);
Assert.fail();
} catch (IllegalStateException e) {
Assert.assertTrue(e.getMessage().contains("MergeType must be one of"));
Expand All @@ -592,7 +592,7 @@ public void testRealtimeToOfflineSegmentsTaskConfig() {
ImmutableMap.of("RealtimeToOfflineSegmentsTask", invalidColumnConfig, "SegmentGenerationAndPushTask",
segmentGenerationAndPushTaskConfig))).build();
try {
taskGenerator.validateTaskConfigs(tableConfig, invalidColumnConfig);
taskGenerator.validateTaskConfigs(tableConfig, schema, invalidColumnConfig);
Assert.fail();
} catch (IllegalStateException e) {
Assert.assertTrue(e.getMessage().contains("not found in schema"));
Expand All @@ -606,7 +606,7 @@ public void testRealtimeToOfflineSegmentsTaskConfig() {
ImmutableMap.of("RealtimeToOfflineSegmentsTask", invalidAggConfig, "SegmentGenerationAndPushTask",
segmentGenerationAndPushTaskConfig))).build();
try {
taskGenerator.validateTaskConfigs(tableConfig, invalidAggConfig);
taskGenerator.validateTaskConfigs(tableConfig, schema, invalidAggConfig);
Assert.fail();
} catch (IllegalStateException e) {
Assert.assertTrue(e.getMessage().contains("has invalid aggregate type"));
Expand All @@ -620,7 +620,7 @@ public void testRealtimeToOfflineSegmentsTaskConfig() {
ImmutableMap.of("RealtimeToOfflineSegmentsTask", invalidAgg2Config, "SegmentGenerationAndPushTask",
segmentGenerationAndPushTaskConfig))).build();
try {
taskGenerator.validateTaskConfigs(tableConfig, invalidAgg2Config);
taskGenerator.validateTaskConfigs(tableConfig, schema, invalidAgg2Config);
Assert.fail();
} catch (IllegalStateException e) {
Assert.assertTrue(e.getMessage().contains("has invalid aggregate type"));
Expand All @@ -633,7 +633,7 @@ public void testRealtimeToOfflineSegmentsTaskConfig() {
new TableTaskConfig(
ImmutableMap.of("RealtimeToOfflineSegmentsTask", validAggConfig, "SegmentGenerationAndPushTask",
segmentGenerationAndPushTaskConfig))).build();
taskGenerator.validateTaskConfigs(tableConfig, validAggConfig);
taskGenerator.validateTaskConfigs(tableConfig, schema, validAggConfig);

// valid agg
HashMap<String, String> validAgg2Config = new HashMap<>(realtimeToOfflineTaskConfig);
Expand All @@ -642,7 +642,7 @@ public void testRealtimeToOfflineSegmentsTaskConfig() {
new TableTaskConfig(
ImmutableMap.of("RealtimeToOfflineSegmentsTask", validAgg2Config, "SegmentGenerationAndPushTask",
segmentGenerationAndPushTaskConfig))).build();
taskGenerator.validateTaskConfigs(tableConfig, validAgg2Config);
taskGenerator.validateTaskConfigs(tableConfig, schema, validAgg2Config);
}

private SegmentZKMetadata getSegmentZKMetadata(String segmentName, Status status, long startTime, long endTime,
Expand All @@ -659,7 +659,7 @@ private SegmentZKMetadata getSegmentZKMetadata(String segmentName, Status status
private IdealState getIdealState(String tableName, List<String> segmentNames) {
IdealState idealState = new IdealState(tableName);
idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
for (String segmentName: segmentNames) {
for (String segmentName : segmentNames) {
idealState.setPartitionState(segmentName, "Server_0", "ONLINE");
}
return idealState;
Expand Down
Loading
Loading