Skip to content

Commit

Permalink
[Feature][Engine] Support execute job without view permission (datava…
Browse files Browse the repository at this point in the history
  • Loading branch information
zixi0825 committed Sep 1, 2024
1 parent 27c74b3 commit c1ade10
Show file tree
Hide file tree
Showing 13 changed files with 126 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ default String getCreateTableAsSelectStatement(String srcTable, String targetDat
return String.format("CREATE TABLE %s.%s AS SELECT * FROM %s", quoteIdentifier(targetDatabase), quoteIdentifier(targetTable), quoteIdentifier(srcTable));
}

default String getCreateTableAsSelectStatementFromSql(String srcTable, String targetDatabase, String targetTable) {
return String.format("CREATE TABLE %s.%s AS SELECT t.* FROM %s", quoteIdentifier(targetDatabase), quoteIdentifier(targetTable), srcTable);
}

default String getCreateTableStatement(String table, List<StructField> fields, TypeConverter typeConverter) {
if (CollectionUtils.isNotEmpty(fields)) {
String columns = fields.stream().map(field -> {
Expand All @@ -127,6 +131,10 @@ default String getInsertAsSelectStatement(String srcTable, String targetDatabase
return String.format("INSERT INTO %s.%s SELECT * FROM %s", quoteIdentifier(targetDatabase), quoteIdentifier(targetTable), quoteIdentifier(srcTable));
}

default String getInsertAsSelectStatementFromSql(String srcTable, String targetDatabase, String targetTable) {
return String.format("INSERT INTO %s.%s SELECT t.* FROM %s", quoteIdentifier(targetDatabase), quoteIdentifier(targetTable), srcTable);
}

String getErrorDataScript(Map<String, String> configMap);

String getValidateResultDataScript(Map<String, String> configMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,4 @@ protected InputParam getDatabaseInput(boolean isEn) {
isEn ? "please enter database" : "请填入数据库", 1, null,
null);
}

@Override
protected List<PluginParams> getOtherParams(boolean isEn) {

List<PluginParams> list = new ArrayList<>();

InputParam enableExternalCatalog = getInputParam("enable_use_view",
isEn ? "enable.use.view" : "允许使用视图(设置为false时无法导出错误数据)",
isEn ? "please enter true or false" : "请填入 true 或者 false", 2, null,
"true");

list.add(enableExternalCatalog);
return list;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,16 @@ protected InputParam getInputParam(String field, String title, String placeholde
}

protected List<PluginParams> getOtherParams(boolean isEn) {
return Collections.emptyList();

List<PluginParams> list = new ArrayList<>();

InputParam enableExternalCatalog = getInputParam("enable_use_view",
isEn ? "enable.use.view" : "允许使用视图",
isEn ? "please enter true or false" : "请填入 true 或者 false", 2, null,
"false");

list.add(enableExternalCatalog);
return list;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,4 @@ protected InputParam getDatabaseInput(boolean isEn) {
isEn ? "please enter database" : "请填入数据库", 1, null,
null);
}

@Override
protected List<PluginParams> getOtherParams(boolean isEn) {

List<PluginParams> list = new ArrayList<>();

InputParam enableExternalCatalog = getInputParam("enable_use_view",
isEn ? "enable.use.view" : "允许使用视图(设置为false时无法导出错误数据)",
isEn ? "please enter true or false" : "请填入 true 或者 false", 2, null,
"true");

list.add(enableExternalCatalog);
return list;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -207,12 +207,12 @@ public void buildTransformConfigs() {
invalidateItemCanOutput &= sqlMetric.isInvalidateItemsCanOutput();
metricInputParameter.put(INVALIDATE_ITEM_CAN_OUTPUT, String.valueOf(invalidateItemCanOutput));

boolean isEnableExternalCatalog = true;
boolean isEnableUseView = false;
if (metricInputParameter.get(ENABLE_USE_VIEW) != null) {
isEnableExternalCatalog = Boolean.parseBoolean(metricInputParameter.get(ENABLE_USE_VIEW));
isEnableUseView = Boolean.parseBoolean(metricInputParameter.get(ENABLE_USE_VIEW));
}

if (isEnableExternalCatalog) {
if (isEnableUseView) {
// generate invalidate item execute sql
if (sqlMetric.getInvalidateItems(metricInputParameter) != null) {
ExecuteSql invalidateItemExecuteSql = sqlMetric.getInvalidateItems(metricInputParameter);
Expand All @@ -237,16 +237,30 @@ public void buildTransformConfigs() {
metricInputParameter.put(ACTUAL_TABLE, sqlMetric.getActualValue(metricInputParameter).getResultTable());
}
} else {
// generate actual value execute sql
ExecuteSql actualValueExecuteSql = sqlMetric.getDirectActualValue(metricInputParameter);
if (actualValueExecuteSql != null) {
actualValueExecuteSql.setResultTable(sqlMetric.getDirectActualValue(metricInputParameter).getResultTable());
MetricParserUtils.setTransformerConfig(
metricInputParameter,
transformConfigs,
actualValueExecuteSql,
TransformType.ACTUAL_VALUE.getDescription());
metricInputParameter.put(ACTUAL_TABLE, sqlMetric.getActualValue(metricInputParameter).getResultTable());
if (sqlMetric.getInvalidateItems(metricInputParameter) != null) {
// generate actual value execute sql
ExecuteSql actualValueExecuteSql = sqlMetric.getDirectActualValue(metricInputParameter);
if (actualValueExecuteSql != null) {
actualValueExecuteSql.setResultTable(sqlMetric.getDirectActualValue(metricInputParameter).getResultTable());
MetricParserUtils.setTransformerConfig(
metricInputParameter,
transformConfigs,
actualValueExecuteSql,
TransformType.ACTUAL_VALUE.getDescription());
metricInputParameter.put(ACTUAL_TABLE, sqlMetric.getActualValue(metricInputParameter).getResultTable());
}
} else {
// generate actual value execute sql
ExecuteSql actualValueExecuteSql = sqlMetric.getActualValue(metricInputParameter);
if (actualValueExecuteSql != null) {
actualValueExecuteSql.setResultTable(sqlMetric.getActualValue(metricInputParameter).getResultTable());
MetricParserUtils.setTransformerConfig(
metricInputParameter,
transformConfigs,
actualValueExecuteSql,
TransformType.ACTUAL_VALUE.getDescription());
metricInputParameter.put(ACTUAL_TABLE, sqlMetric.getActualValue(metricInputParameter).getResultTable());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,17 @@

import io.datavines.common.config.SinkConfig;
import io.datavines.common.config.enums.SinkType;
import io.datavines.common.entity.ConnectorParameter;
import io.datavines.common.entity.ExecuteSql;
import io.datavines.common.entity.MappingColumn;
import io.datavines.common.entity.job.BaseJobParameter;
import io.datavines.common.exception.DataVinesException;
import io.datavines.common.utils.JSONUtils;
import io.datavines.common.utils.ParameterUtils;
import io.datavines.common.utils.StringUtils;
import io.datavines.connector.api.ConnectorFactory;
import io.datavines.engine.config.MetricParserUtils;
import io.datavines.metric.api.ExpectedValue;
import io.datavines.metric.api.SqlMetric;
import io.datavines.spi.PluginLoader;
import org.apache.commons.collections4.CollectionUtils;

Expand All @@ -35,8 +37,6 @@
import java.util.List;
import java.util.Map;

import static io.datavines.common.CommonConstants.DATABASE2;
import static io.datavines.common.CommonConstants.TABLE2;
import static io.datavines.common.ConfigConstants.*;

/**
Expand Down Expand Up @@ -125,7 +125,24 @@ public void buildSinkConfigs() throws DataVinesException {
connectorParameterMap.put(ERROR_DATA_FILE_NAME, jobExecutionInfo.getErrorDataFileName());
connectorParameterMap.put(ERROR_DATA_DIR, metricInputParameter.get(ERROR_DATA_DIR));
connectorParameterMap.put(METRIC_NAME, metricInputParameter.get(METRIC_NAME));
connectorParameterMap.put(INVALIDATE_ITEMS_TABLE, metricInputParameter.get(INVALIDATE_ITEMS_TABLE));
boolean isEnableUseView = false;
if (metricInputParameter.get(ENABLE_USE_VIEW) != null) {
isEnableUseView = Boolean.parseBoolean(metricInputParameter.get(ENABLE_USE_VIEW));
}

if (isEnableUseView) {
connectorParameterMap.put(INVALIDATE_ITEMS_TABLE, metricInputParameter.get(INVALIDATE_ITEMS_TABLE));
} else {
String metricType = parameter.getMetricType();
SqlMetric sqlMetric = PluginLoader
.getPluginLoader(SqlMetric.class)
.getNewPlugin(metricType);
MetricParserUtils.operateInputParameter(metricInputParameter, sqlMetric, jobExecutionInfo);
if (sqlMetric.getInvalidateItems(metricInputParameter) != null) {
ExecuteSql invalidateItemExecuteSql = sqlMetric.getInvalidateItems(metricInputParameter);
connectorParameterMap.put(INVALIDATE_ITEMS_TABLE, "(" + ParameterUtils.convertParameterPlaceholders(invalidateItemExecuteSql.getSql(), metricInputParameter) + ") t");
}
}
connectorParameterMap.put(INVALIDATE_ITEM_CAN_OUTPUT, metricInputParameter.get(INVALIDATE_ITEM_CAN_OUTPUT));
// use to get source type converter in sink
connectorParameterMap.put(SRC_CONNECTOR_TYPE, metricInputParameter.get(SRC_CONNECTOR_TYPE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@

import io.datavines.common.config.SinkConfig;
import io.datavines.common.config.enums.SinkType;
import io.datavines.common.entity.ConnectorParameter;
import io.datavines.common.entity.ExecuteSql;
import io.datavines.common.entity.job.BaseJobParameter;
import io.datavines.common.exception.DataVinesException;
import io.datavines.common.utils.JSONUtils;
import io.datavines.common.utils.ParameterUtils;
import io.datavines.common.utils.StringUtils;
import io.datavines.connector.api.ConnectorFactory;
import io.datavines.engine.config.MetricParserUtils;
import io.datavines.metric.api.ExpectedValue;
import io.datavines.metric.api.SqlMetric;
import io.datavines.spi.PluginLoader;
import org.apache.commons.collections4.CollectionUtils;

Expand Down Expand Up @@ -94,7 +97,26 @@ public void buildSinkConfigs() throws DataVinesException {
connectorParameterMap.put(ERROR_DATA_FILE_NAME, jobExecutionInfo.getErrorDataFileName());
connectorParameterMap.put(ERROR_DATA_DIR, metricInputParameter.get(ERROR_DATA_DIR));
connectorParameterMap.put(METRIC_NAME, metricInputParameter.get(METRIC_NAME));
connectorParameterMap.put(INVALIDATE_ITEMS_TABLE, metricInputParameter.get(INVALIDATE_ITEMS_TABLE));
boolean isEnableUseView = false;
if (metricInputParameter.get(ENABLE_USE_VIEW) != null) {
isEnableUseView = Boolean.parseBoolean(metricInputParameter.get(ENABLE_USE_VIEW));
}

if (isEnableUseView) {
connectorParameterMap.put(INVALIDATE_ITEMS_TABLE, metricInputParameter.get(INVALIDATE_ITEMS_TABLE));
} else {
String metricType = parameter.getMetricType();
SqlMetric sqlMetric = PluginLoader
.getPluginLoader(SqlMetric.class)
.getNewPlugin(metricType);
MetricParserUtils.operateInputParameter(metricInputParameter, sqlMetric, jobExecutionInfo);
if (sqlMetric.getInvalidateItems(metricInputParameter) != null) {
ExecuteSql invalidateItemExecuteSql = sqlMetric.getInvalidateItems(metricInputParameter);
connectorParameterMap.put(INVALIDATE_ITEMS_TABLE, "(" + ParameterUtils.convertParameterPlaceholders(invalidateItemExecuteSql.getSql(), metricInputParameter) + ") t");
}
}

connectorParameterMap.put(ENABLE_USE_VIEW, isEnableUseView);
connectorParameterMap.put(INVALIDATE_ITEM_CAN_OUTPUT, metricInputParameter.get(INVALIDATE_ITEM_CAN_OUTPUT));
// use to get source type converter in sink
connectorParameterMap.put(SRC_CONNECTOR_TYPE, metricInputParameter.get(SRC_CONNECTOR_TYPE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,22 @@ private void sinkErrorDataToDataSource() {
}

String srcConnectorType = config.getString(SRC_CONNECTOR_TYPE);
boolean isEnableUseView = config.getBoolean(ENABLE_USE_VIEW);
ConnectorFactory connectorFactory = PluginLoader.getPluginLoader(ConnectorFactory.class).getOrCreatePlugin(srcConnectorType);
Dialect dialect = connectorFactory.getDialect();
if (!checkTableExist(getConnectionHolder().getConnection(),
dialect.quoteIdentifier(targetDatabase)+"."+dialect.quoteIdentifier(targetTable), dialect)) {
sourceConnectionStatement.execute(dialect.getCreateTableAsSelectStatement(sourceTable, targetDatabase, targetTable));
if (isEnableUseView) {
sourceConnectionStatement.execute(dialect.getCreateTableAsSelectStatement(sourceTable, targetDatabase, targetTable));
} else {
sourceConnectionStatement.execute(dialect.getCreateTableAsSelectStatementFromSql(sourceTable, targetDatabase, targetTable));
}
} else {
// drop data and insert new data
sourceConnectionStatement.execute(dialect.getInsertAsSelectStatement(sourceTable, targetDatabase, targetTable));
if (isEnableUseView) {
sourceConnectionStatement.execute(dialect.getInsertAsSelectStatement(sourceTable, targetDatabase, targetTable));
} else {
sourceConnectionStatement.execute(dialect.getInsertAsSelectStatementFromSql(sourceTable, targetDatabase, targetTable));
}
}

} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ public ResultList execute(Connection connection, Config config, LocalRuntimeEnvi
statement = connection.createStatement();
env.setCurrentStatement(statement);
resultSet = statement.executeQuery(sql);
ResultList resultList = SqlUtils.getListFromResultSet(resultSet, SqlUtils.getQueryFromsAndJoins(sql));
return resultList;
return SqlUtils.getListFromResultSet(resultSet, SqlUtils.getQueryFromsAndJoins(sql));
} finally {
SqlUtils.closeResultSet(resultSet);
SqlUtils.closeStatement(statement);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,11 @@ public ExecuteSql getActualValue(Map<String,String> inputParameter) {
inputParameter.put(ACTUAL_TABLE, inputParameter.get(TABLE));
String actualAggregateSql = inputParameter.get(ACTUAL_AGGREGATE_SQL);
if (StringUtils.isNotEmpty(actualAggregateSql)) {
actualAggregateSql = actualAggregateSql.replace("as actual_value", "as actual_value_" + inputParameter.get(METRIC_UNIQUE_KEY));
if (actualAggregateSql.contains("as actual_value")) {
actualAggregateSql = actualAggregateSql.replace("as actual_value", "as actual_value_" + inputParameter.get(METRIC_UNIQUE_KEY));
} else if (actualAggregateSql.contains("AS actual_value")) {
actualAggregateSql = actualAggregateSql.replace("AS actual_value", "as actual_value_" + inputParameter.get(METRIC_UNIQUE_KEY));
}
}
return new ExecuteSql(actualAggregateSql, inputParameter.get(TABLE));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,16 @@ public ExecuteSql getActualValue(Map<String,String> inputParameter) {
return executeSql;
}

@Override
public ExecuteSql getDirectActualValue(Map<String, String> inputParameter) {
String uniqueKey = inputParameter.get(METRIC_UNIQUE_KEY);
ExecuteSql executeSql = new ExecuteSql();
executeSql.setResultTable("invalidate_count_" + uniqueKey);
executeSql.setSql("select count(1) as actual_value_" + uniqueKey + " from ( " + invalidateItemsSql.toString() + " ) t");
executeSql.setErrorOutput(false);
return executeSql;
}

@Override
public List<DataVinesDataType> suitableType() {
return Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public static void main(String[] args) {
JobExecutionInfo jobExecutionInfo = new JobExecutionInfo(
id, submitJob.getName(),
submitJob.getEngineType(), JSONUtils.toJsonString(submitJob.getEngineParameter()),
submitJob.getErrorDataStorageType(), JSONUtils.toJsonString(submitJob.getErrorDataStorageParameter()), submitJob.getName()+"_"+ id,
submitJob.getErrorDataStorageType(), JSONUtils.toJsonString(submitJob.getErrorDataStorageParameter()), submitJob.getName() + "_" + id,
submitJob.getValidateResultDataStorageType(), JSONUtils.toJsonString(submitJob.getValidateResultDataStorageParameter()),
submitJob.getParameter());

Expand Down
8 changes: 8 additions & 0 deletions datavines-ui/Editor/hooks/useNotRequiredRule/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { useIntl } from 'react-intl';

export default () => {
const intl = useIntl();
return [{
required: false
}];
};

0 comments on commit c1ade10

Please sign in to comment.