From 65b1959c7005d121d89cf0a0e5130f0c7560e089 Mon Sep 17 00:00:00 2001
From: GSHF <18663587295@sohu.com>
Date: Sun, 29 Dec 2024 10:01:34 +0800
Subject: [PATCH] refactor: restructure Flink engine modules and update UI
1. Split Flink engine into separate modules:
- datavines-engine-flink-config
- datavines-engine-flink-jdbc
- datavines-engine-flink-transform
2. Update UI components:
- Add FlinkConfiguration component
- Update CreateConfig to support Flink configuration
- Update localization files
---
.../datavines-engine-flink/HEADER | 14 ++
.../datavines-engine-flink-config/pom.xml | 57 +++++
.../flink/config/FlinkConfiguration.java | 55 +++++
.../config/BaseFlinkConfigurationBuilder.java | 227 ------------------
.../flink/config/FlinkEngineConfig.java | 87 -------
.../FlinkSingleTableConfigurationBuilder.java | 140 -----------
.../flink/config/FlinkSinkSqlBuilder.java | 62 -----
.../engine/flink/sink/FlinkJdbcSink.java | 160 ------------
.../engine/flink/source/FlinkJdbcSource.java | 160 ------------
.../datavines-engine-flink-jdbc/pom.xml | 58 +++++
.../engine/flink/jdbc/sink/JdbcSink.java | 122 ++++++++++
.../engine/flink/jdbc/source/JdbcSource.java | 108 +++++++++
.../datavines-engine-flink-transform/pom.xml | 51 ++++
.../flink/transform/FlinkSqlTransform.java | 93 +++----
.../datavines-engine-flink/delete_dirs.java | 44 ++++
.../datavines-engine-flink/pom.xml | 3 +
datavines-ui/src/locale/en_US.ts | 5 +-
datavines-ui/src/locale/zh_CN.ts | 5 +-
.../src/view/Main/Config/CreateConfig.tsx | 29 ++-
.../view/Main/Config/FlinkConfiguration.tsx | 84 +++++++
20 files changed, 664 insertions(+), 900 deletions(-)
create mode 100644 datavines-engine/datavines-engine-plugins/datavines-engine-flink/HEADER
create mode 100644 datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-config/pom.xml
create mode 100644 datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-config/src/main/java/io/datavines/engine/flink/config/FlinkConfiguration.java
delete mode 100644 datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-core/src/main/java/io/datavines/engine/flink/config/BaseFlinkConfigurationBuilder.java
delete mode 100644 datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-core/src/main/java/io/datavines/engine/flink/config/FlinkEngineConfig.java
delete mode 100644 datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-core/src/main/java/io/datavines/engine/flink/config/FlinkSingleTableConfigurationBuilder.java
delete mode 100644 datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-core/src/main/java/io/datavines/engine/flink/config/FlinkSinkSqlBuilder.java
delete mode 100644 datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-core/src/main/java/io/datavines/engine/flink/sink/FlinkJdbcSink.java
delete mode 100644 datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-core/src/main/java/io/datavines/engine/flink/source/FlinkJdbcSource.java
create mode 100644 datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-jdbc/pom.xml
create mode 100644 datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-jdbc/src/main/java/io/datavines/engine/flink/jdbc/sink/JdbcSink.java
create mode 100644 datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-jdbc/src/main/java/io/datavines/engine/flink/jdbc/source/JdbcSource.java
create mode 100644 datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-transform/pom.xml
rename datavines-engine/datavines-engine-plugins/datavines-engine-flink/{datavines-engine-flink-core => datavines-engine-flink-transform}/src/main/java/io/datavines/engine/flink/transform/FlinkSqlTransform.java (53%)
create mode 100644 datavines-engine/datavines-engine-plugins/datavines-engine-flink/delete_dirs.java
create mode 100644 datavines-ui/src/view/Main/Config/FlinkConfiguration.tsx
diff --git a/datavines-engine/datavines-engine-plugins/datavines-engine-flink/HEADER b/datavines-engine/datavines-engine-plugins/datavines-engine-flink/HEADER
new file mode 100644
index 000000000..8853bce32
--- /dev/null
+++ b/datavines-engine/datavines-engine-plugins/datavines-engine-flink/HEADER
@@ -0,0 +1,14 @@
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
diff --git a/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-config/pom.xml b/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-config/pom.xml
new file mode 100644
index 000000000..d0db0541d
--- /dev/null
+++ b/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-config/pom.xml
@@ -0,0 +1,57 @@
+
+
+
+
+ datavines-engine-flink
+ io.datavines
+ 1.0.0-SNAPSHOT
+
+ 4.0.0
+
+ datavines-engine-flink-config
+
+
+
+ io.datavines
+ datavines-engine-core
+ ${project.version}
+
+
+
+ io.datavines
+ datavines-engine-api
+ ${project.version}
+
+
+
+ io.datavines
+ datavines-engine-flink-api
+ ${project.version}
+
+
+
+ org.apache.flink
+ flink-core
+ ${flink.version}
+
+
+
diff --git a/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-config/src/main/java/io/datavines/engine/flink/config/FlinkConfiguration.java b/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-config/src/main/java/io/datavines/engine/flink/config/FlinkConfiguration.java
new file mode 100644
index 000000000..e2f024a18
--- /dev/null
+++ b/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-config/src/main/java/io/datavines/engine/flink/config/FlinkConfiguration.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.datavines.engine.flink.config;
+
+import io.datavines.common.config.BaseConfig;
+
+public class FlinkConfiguration extends BaseConfig {
+
+ private String jobName;
+ private String checkpointPath;
+ private int checkpointInterval = 10000; // default 10s
+
+ @Override
+ public String getType() {
+ return "FLINK";
+ }
+
+ public String getJobName() {
+ return jobName;
+ }
+
+ public void setJobName(String jobName) {
+ this.jobName = jobName;
+ }
+
+ public String getCheckpointPath() {
+ return checkpointPath;
+ }
+
+ public void setCheckpointPath(String checkpointPath) {
+ this.checkpointPath = checkpointPath;
+ }
+
+ public int getCheckpointInterval() {
+ return checkpointInterval;
+ }
+
+ public void setCheckpointInterval(int checkpointInterval) {
+ this.checkpointInterval = checkpointInterval;
+ }
+}
diff --git a/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-core/src/main/java/io/datavines/engine/flink/config/BaseFlinkConfigurationBuilder.java b/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-core/src/main/java/io/datavines/engine/flink/config/BaseFlinkConfigurationBuilder.java
deleted file mode 100644
index f6e7df009..000000000
--- a/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-core/src/main/java/io/datavines/engine/flink/config/BaseFlinkConfigurationBuilder.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.datavines.engine.flink.config;
-
-import io.datavines.common.config.EnvConfig;
-import io.datavines.common.config.SinkConfig;
-import io.datavines.common.config.SourceConfig;
-import io.datavines.common.config.enums.SinkType;
-import io.datavines.common.config.enums.SourceType;
-import io.datavines.common.entity.ConnectorParameter;
-import io.datavines.common.entity.job.BaseJobParameter;
-import io.datavines.common.exception.DataVinesException;
-import io.datavines.common.utils.JSONUtils;
-import io.datavines.common.utils.StringUtils;
-import io.datavines.engine.common.utils.ParserUtils;
-import io.datavines.engine.config.BaseJobConfigurationBuilder;
-import io.datavines.connector.api.ConnectorFactory;
-import io.datavines.spi.PluginLoader;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections4.CollectionUtils;
-
-import java.util.*;
-import java.util.stream.Collectors;
-
-import static io.datavines.common.CommonConstants.*;
-import static io.datavines.common.ConfigConstants.*;
-import static io.datavines.common.ConfigConstants.TABLE;
-
-/**
- *
- *
- * @author dataVines
- * @since 2021-07-01
- */
-@Slf4j
-public abstract class BaseFlinkConfigurationBuilder extends BaseJobConfigurationBuilder {
-
- @Override
- protected EnvConfig getEnvConfig() {
- EnvConfig envConfig = new EnvConfig();
- envConfig.setEngine(jobExecutionInfo.getEngineType());
- Map configMap = envConfig.getConfig();
- if (configMap == null) {
- configMap = new HashMap<>();
- }
-
- ConnectorParameter connectorParameter = jobExecutionParameter.getConnectorParameter();
- String srcConnectorType = "";
- if (connectorParameter != null) {
- srcConnectorType = connectorParameter.getType();
- }
-
- ConnectorParameter connectorParameter2 = jobExecutionParameter.getConnectorParameter2();
- String srcConnectorType2 = "";
- if (connectorParameter2 != null) {
- srcConnectorType2 = connectorParameter2.getType();
- }
-
- envConfig.setConfig(configMap);
- return envConfig;
- }
-
- @Override
- protected List getSourceConfigs() throws DataVinesException {
- List sourceConfigs = new ArrayList<>();
- List metricJobParameterList = jobExecutionParameter.getMetricParameterList();
- boolean isAddValidateResultDataSource = false;
- if (CollectionUtils.isNotEmpty(metricJobParameterList)) {
- Set sourceConnectorSet = new HashSet<>();
- Set targetConnectorSet = new HashSet<>();
- for (BaseJobParameter parameter : metricJobParameterList) {
- String metricUniqueKey = getMetricUniqueKey(parameter);
- Map metricInputParameter = metric2InputParameter.get(metricUniqueKey);
- if (jobExecutionParameter.getConnectorParameter() != null) {
- ConnectorParameter connectorParameter = jobExecutionParameter.getConnectorParameter();
- SourceConfig sourceConfig = new SourceConfig();
-
- Map connectorParameterMap = new HashMap<>(connectorParameter.getParameters());
- connectorParameterMap.putAll(metricInputParameter);
-
- if (connectorParameter.getParameters().get(SCHEMA) != null) {
- metricInputParameter.put(SCHEMA, (String)connectorParameter.getParameters().get(SCHEMA));
- }
-
- metricInputParameter.put(DATABASE_NAME, metricInputParameter.get(DATABASE));
- metricInputParameter.put(TABLE_NAME, metricInputParameter.get(TABLE));
- metricInputParameter.put(COLUMN_NAME, metricInputParameter.get(COLUMN));
-
- ConnectorFactory connectorFactory = PluginLoader
- .getPluginLoader(ConnectorFactory.class)
- .getNewPlugin(connectorParameter.getType());
-
- connectorParameterMap.put(TABLE, metricInputParameter.get(TABLE));
- connectorParameterMap.put(DATABASE, metricInputParameter.get(DATABASE));
- connectorParameterMap = connectorFactory.getConnectorParameterConverter().converter(connectorParameterMap);
- connectorParameterMap.put(PASSWORD, ParserUtils.encode((String)connectorParameterMap.get(PASSWORD)));
-
- String outputTable = getOutputTable(metricInputParameter.get(DATABASE), metricInputParameter.get(SCHEMA), metricInputParameter.get(TABLE));
- String tableAlias = getTableAlias(metricInputParameter.get(DATABASE), metricInputParameter.get(SCHEMA), metricInputParameter.get(TABLE), "1");
- connectorParameterMap.put(OUTPUT_TABLE, outputTable);
- connectorParameterMap.put(DRIVER, connectorFactory.getDialect().getDriver());
-
- metricInputParameter.put(TABLE, outputTable);
- metricInputParameter.put(TABLE_ALIAS, tableAlias);
- metricInputParameter.put(COLUMN, metricInputParameter.get(COLUMN));
- metricInputParameter.put(REGEX_KEY, "REGEXP(${column}, ${regex})");
- metricInputParameter.put(NOT_REGEX_KEY, "NOT REGEXP(${column}, ${regex})");
- metricInputParameter.put(STRING_TYPE, "STRING");
- metricInputParameter.put(IF_FUNCTION_KEY, "IF");
- metricInputParameter.put(LIMIT_TOP_50_KEY, " LIMIT 50");
- metricInputParameter.put(LENGTH_KEY, "CHARACTER_LENGTH(${column})");
- metricInputParameter.put(SRC_CONNECTOR_TYPE, connectorParameter.getType());
- metricInputParameter.put(ENGINE_TYPE, jobExecutionInfo.getEngineType());
-
- String connectorUUID = connectorFactory.getConnectorParameterConverter().getConnectorUUID(connectorParameterMap);
-
- if (sourceConnectorSet.contains(connectorUUID)) {
- continue;
- }
-
- sourceConfig.setPlugin(connectorFactory.getCategory());
- sourceConfig.setConfig(connectorParameterMap);
- sourceConfig.setType(SourceType.SOURCE.getDescription());
- sourceConfigs.add(sourceConfig);
- sourceConnectorSet.add(connectorUUID);
- }
-
- if (jobExecutionParameter.getConnectorParameter2() != null
- && jobExecutionParameter.getConnectorParameter2().getParameters() != null) {
- ConnectorParameter connectorParameter2 = jobExecutionParameter.getConnectorParameter2();
- SourceConfig sourceConfig = new SourceConfig();
-
- Map connectorParameterMap = new HashMap<>(connectorParameter2.getParameters());
- connectorParameterMap.putAll(metricInputParameter);
-
- if (connectorParameter2.getParameters().get(SCHEMA) != null) {
- metricInputParameter.put(SCHEMA2, (String)connectorParameter2.getParameters().get(SCHEMA));
- }
-
- ConnectorFactory connectorFactory = PluginLoader
- .getPluginLoader(ConnectorFactory.class)
- .getNewPlugin(connectorParameter2.getType());
-
- connectorParameterMap.put(TABLE, metricInputParameter.get(TABLE2));
- connectorParameterMap.put(DATABASE, metricInputParameter.get(DATABASE2));
- connectorParameterMap = connectorFactory.getConnectorParameterConverter().converter(connectorParameterMap);
- connectorParameterMap.put(PASSWORD, ParserUtils.encode((String)connectorParameterMap.get(PASSWORD)));
-
- String outputTable = getOutputTable(metricInputParameter.get(DATABASE2),
- metricInputParameter.get(SCHEMA2),
- metricInputParameter.get(TABLE2)) + "_2";
-
- String tableAlias = getTableAlias(metricInputParameter.get(DATABASE2),
- metricInputParameter.get(SCHEMA2),
- metricInputParameter.get(TABLE2), "2");
-
- connectorParameterMap.put(OUTPUT_TABLE, outputTable);
- connectorParameterMap.put(DRIVER, connectorFactory.getDialect().getDriver());
-
- metricInputParameter.put(TABLE2, outputTable);
- metricInputParameter.put(TABLE2_ALIAS, tableAlias);
-
- String connectorUUID = connectorFactory.getConnectorParameterConverter().getConnectorUUID(connectorParameterMap);
-
- if (targetConnectorSet.contains(connectorUUID)) {
- continue;
- }
-
- sourceConfig.setPlugin(connectorFactory.getCategory());
- sourceConfig.setConfig(connectorParameterMap);
- sourceConfig.setType(SourceType.SOURCE.getDescription());
- sourceConfigs.add(sourceConfig);
- targetConnectorSet.add(connectorUUID);
- }
-
- metric2InputParameter.put(metricUniqueKey, metricInputParameter);
- }
- }
-
- return sourceConfigs;
- }
-
- protected String getOutputTable(String database, String schema, String table) {
- if (StringUtils.isNotEmpty(schema)) {
- return String.format("%s_%s_%s", database, schema, table);
- }
- return String.format("%s_%s", database, table);
- }
-
- protected String getTableAlias(String database, String schema, String table, String order) {
- if (StringUtils.isNotEmpty(schema)) {
- return String.format("t%s_%s_%s_%s", order, database, schema, table);
- }
- return String.format("t%s_%s_%s", order, database, table);
- }
-
- protected SinkConfig getErrorSinkConfig(Map inputParameter) {
- if (FILE.equalsIgnoreCase(jobExecutionInfo.getErrorDataStorageType())) {
- SinkConfig sinkConfig = new SinkConfig();
- Map configMap = new HashMap<>();
- Map errorDataParameterMap = JSONUtils.toMap(jobExecutionInfo.getErrorDataStorageParameter(),String.class, String.class);
- configMap.put(DATA_DIR, errorDataParameterMap.get(DATA_DIR));
- configMap.put(FILE_NAME, inputParameter.get(ERROR_DATA_FILE_NAME));
- configMap.put(COLUMN_SEPARATOR, errorDataParameterMap.get(COLUMN_SEPARATOR));
- configMap.put(LINE_SEPARATOR, errorDataParameterMap.get(LINE_SEPARATOR));
- sinkConfig.setConfig(configMap);
- sinkConfig.setType(SinkType.ERROR_DATA.getDescription());
- sinkConfig.setPlugin(FILE);
- return sinkConfig;
- }
- return null;
- }
-}
diff --git a/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-core/src/main/java/io/datavines/engine/flink/config/FlinkEngineConfig.java b/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-core/src/main/java/io/datavines/engine/flink/config/FlinkEngineConfig.java
deleted file mode 100644
index d4e47eb33..000000000
--- a/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-core/src/main/java/io/datavines/engine/flink/config/FlinkEngineConfig.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.datavines.engine.flink.config;
-
-import io.datavines.common.config.Config;
-import io.datavines.common.config.CheckResult;
-import io.datavines.engine.api.plugin.Plugin;
-import org.apache.flink.api.common.RuntimeExecutionMode;
-
-import java.io.Serializable;
-
-public class FlinkEngineConfig implements Plugin, Serializable {
-
- private static final long serialVersionUID = 1L;
-
- private static final String CHECKPOINT_INTERVAL = "flink.checkpoint.interval";
- private static final String PARALLELISM = "flink.parallelism";
- private static final String RESTART_ATTEMPTS = "flink.restart.attempts";
- private static final String RESTART_DELAY = "flink.restart.delay";
- private static final String STATE_BACKEND = "flink.state.backend";
- private static final String CHECKPOINT_PATH = "flink.checkpoint.path";
- private static final String EXECUTION_MODE = "flink.execution.mode";
-
- private Config config;
-
- public FlinkEngineConfig() {
- this.config = new Config();
- }
-
- @Override
- public void setConfig(Config config) {
- this.config = config != null ? config : new Config();
- }
-
- @Override
- public Config getConfig() {
- return config;
- }
-
- @Override
- public CheckResult checkConfig() {
- return new CheckResult(true, "");
- }
-
- public long getCheckpointInterval() {
- return config.getLong(CHECKPOINT_INTERVAL, 10000L);
- }
-
- public int getParallelism() {
- return config.getInt(PARALLELISM, 1);
- }
-
- public int getRestartAttempts() {
- return config.getInt(RESTART_ATTEMPTS, 3);
- }
-
- public long getRestartDelay() {
- return config.getLong(RESTART_DELAY, 10000L);
- }
-
- public String getStateBackend() {
- return config.getString(STATE_BACKEND, "memory");
- }
-
- public String getCheckpointPath() {
- return config.getString(CHECKPOINT_PATH, "");
- }
-
- public RuntimeExecutionMode getExecutionMode() {
- String mode = config.getString(EXECUTION_MODE, "STREAMING");
- return RuntimeExecutionMode.valueOf(mode.toUpperCase());
- }
-}
diff --git a/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-core/src/main/java/io/datavines/engine/flink/config/FlinkSingleTableConfigurationBuilder.java b/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-core/src/main/java/io/datavines/engine/flink/config/FlinkSingleTableConfigurationBuilder.java
deleted file mode 100644
index c839d1a9e..000000000
--- a/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-core/src/main/java/io/datavines/engine/flink/config/FlinkSingleTableConfigurationBuilder.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.datavines.engine.flink.config;
-
-import io.datavines.common.config.EnvConfig;
-import io.datavines.common.config.SinkConfig;
-import io.datavines.common.config.SourceConfig;
-import io.datavines.common.config.enums.SinkType;
-import io.datavines.common.entity.job.BaseJobParameter;
-import io.datavines.common.exception.DataVinesException;
-import io.datavines.common.utils.StringUtils;
-import io.datavines.engine.config.MetricParserUtils;
-import io.datavines.metric.api.ExpectedValue;
-import io.datavines.spi.PluginLoader;
-import org.apache.commons.collections4.CollectionUtils;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import static io.datavines.common.ConfigConstants.*;
-
-public class FlinkSingleTableConfigurationBuilder extends BaseFlinkConfigurationBuilder {
-
-
- @Override
- public void buildEnvConfig() {
- EnvConfig envConfig = new EnvConfig();
- envConfig.setEngine("flink");
- configuration.setEnvConfig(envConfig);
- }
-
- @Override
- public void buildSinkConfigs() throws DataVinesException {
- List sinkConfigs = new ArrayList<>();
-
- List metricJobParameterList = jobExecutionParameter.getMetricParameterList();
- if (CollectionUtils.isNotEmpty(metricJobParameterList)) {
- for (BaseJobParameter parameter : metricJobParameterList) {
- String metricUniqueKey = getMetricUniqueKey(parameter);
- Map metricInputParameter = metric2InputParameter.get(metricUniqueKey);
- if (metricInputParameter == null) {
- continue;
- }
-
- // 确保必要的参数存在
- if (!metricInputParameter.containsKey(METRIC_NAME) && parameter.getMetricType() != null) {
- metricInputParameter.put(METRIC_NAME, parameter.getMetricType());
- }
-
- metricInputParameter.put(METRIC_UNIQUE_KEY, metricUniqueKey);
- String expectedType = "local_" + parameter.getExpectedType();
- ExpectedValue expectedValue = PluginLoader
- .getPluginLoader(ExpectedValue.class)
- .getNewPlugin(expectedType);
-
- // 只有在确保必要参数存在的情况下才生成 uniqueCode
- if (metricInputParameter.containsKey(METRIC_NAME)) {
- metricInputParameter.put(UNIQUE_CODE, StringUtils.wrapperSingleQuotes(MetricParserUtils.generateUniqueCode(metricInputParameter)));
- }
-
- // Get the actual value storage parameter
- String actualValueSinkSql = FlinkSinkSqlBuilder.getActualValueSql()
- .replace("${actual_value}", "${actual_value_" + metricUniqueKey + "}");
- SinkConfig actualValueSinkConfig = getValidateResultDataSinkConfig(
- expectedValue, actualValueSinkSql, "dv_actual_values", metricInputParameter);
-
- if (actualValueSinkConfig != null) {
- actualValueSinkConfig.setType(SinkType.ACTUAL_VALUE.getDescription());
- sinkConfigs.add(actualValueSinkConfig);
- }
-
- String taskSinkSql = FlinkSinkSqlBuilder.getDefaultSinkSql()
- .replace("${actual_value}", "${actual_value_" + metricUniqueKey + "}")
- .replace("${expected_value}", "${expected_value_" + metricUniqueKey + "}");
-
- // Get the task data storage parameter
- SinkConfig taskResultSinkConfig = getValidateResultDataSinkConfig(
- expectedValue, taskSinkSql, "dv_job_execution_result", metricInputParameter);
- if (taskResultSinkConfig != null) {
- taskResultSinkConfig.setType(SinkType.VALIDATE_RESULT.getDescription());
- // 设置默认状态为未知(NONE)
- taskResultSinkConfig.getConfig().put("default_state", "0");
- // 添加其他必要参数
- taskResultSinkConfig.getConfig().put("metric_type", "single_table");
- taskResultSinkConfig.getConfig().put("metric_name", metricInputParameter.get(METRIC_NAME));
- taskResultSinkConfig.getConfig().put("metric_dimension", metricInputParameter.get(METRIC_DIMENSION));
- taskResultSinkConfig.getConfig().put("database_name", metricInputParameter.get(DATABASE));
- taskResultSinkConfig.getConfig().put("table_name", metricInputParameter.get(TABLE));
- taskResultSinkConfig.getConfig().put("column_name", metricInputParameter.get(COLUMN));
- taskResultSinkConfig.getConfig().put("expected_type", metricInputParameter.get(EXPECTED_TYPE));
- taskResultSinkConfig.getConfig().put("result_formula", metricInputParameter.get(RESULT_FORMULA));
- sinkConfigs.add(taskResultSinkConfig);
- }
-
- // Get the error data storage parameter if needed
- if (StringUtils.isNotEmpty(jobExecutionInfo.getErrorDataStorageType())
- && StringUtils.isNotEmpty(jobExecutionInfo.getErrorDataStorageParameter())) {
- SinkConfig errorDataSinkConfig = getErrorSinkConfig(metricInputParameter);
- if (errorDataSinkConfig != null) {
- errorDataSinkConfig.setType(SinkType.ERROR_DATA.getDescription());
- sinkConfigs.add(errorDataSinkConfig);
- }
- }
- }
- }
-
- configuration.setSinkParameters(sinkConfigs);
- }
-
- @Override
- public void buildTransformConfigs() {
- // No transform configs needed for single table configuration
- }
-
- @Override
- public void buildSourceConfigs() throws DataVinesException {
- List sourceConfigs = getSourceConfigs();
- configuration.setSourceParameters(sourceConfigs);
- }
-
- @Override
- public void buildName() {
- // Use default name from base implementation
- }
-}
diff --git a/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-core/src/main/java/io/datavines/engine/flink/config/FlinkSinkSqlBuilder.java b/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-core/src/main/java/io/datavines/engine/flink/config/FlinkSinkSqlBuilder.java
deleted file mode 100644
index 568dad4b6..000000000
--- a/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-core/src/main/java/io/datavines/engine/flink/config/FlinkSinkSqlBuilder.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.datavines.engine.flink.config;
-
-public class FlinkSinkSqlBuilder {
-
- private FlinkSinkSqlBuilder() {
- throw new IllegalStateException("Utility class");
- }
-
- public static String getActualValueSql() {
- return "select\n" +
- " '${job_execution_id}' as job_execution_id,\n" +
- " '${metric_unique_key}' as metric_unique_key,\n" +
- " '${unique_code}' as unique_code,\n" +
- " ${actual_value} as actual_value,\n" +
- " cast(null as string) as expected_value,\n" +
- " cast(null as string) as operator,\n" +
- " cast(null as string) as threshold,\n" +
- " cast(null as string) as check_type,\n" +
- " CURRENT_TIMESTAMP as create_time,\n" +
- " CURRENT_TIMESTAMP as update_time\n" +
- "from ${table_name}";
- }
-
- public static String getDefaultSinkSql() {
- return "select\n" +
- " '${job_execution_id}' as job_execution_id,\n" +
- " '${metric_unique_key}' as metric_unique_key,\n" +
- " '${unique_code}' as unique_code,\n" +
- " CASE WHEN ${actual_value} IS NULL THEN NULL ELSE ${actual_value} END as actual_value,\n" +
- " CASE WHEN ${expected_value} IS NULL THEN NULL ELSE ${expected_value} END as expected_value,\n" +
- " '${metric_type}' as metric_type,\n" +
- " '${metric_name}' as metric_name,\n" +
- " '${metric_dimension}' as metric_dimension,\n" +
- " '${database_name}' as database_name,\n" +
- " '${table_name}' as table_name,\n" +
- " '${column_name}' as column_name,\n" +
- " '${operator}' as operator,\n" +
- " '${threshold}' as threshold,\n" +
- " '${expected_type}' as expected_type,\n" +
- " '${result_formula}' as result_formula,\n" +
- " CASE WHEN ${actual_value} IS NULL THEN '${default_state}' ELSE NULL END as state,\n" +
- " CURRENT_TIMESTAMP as create_time,\n" +
- " CURRENT_TIMESTAMP as update_time\n" +
- "from ${table_name} full join ${expected_table}";
- }
-}
diff --git a/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-core/src/main/java/io/datavines/engine/flink/sink/FlinkJdbcSink.java b/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-core/src/main/java/io/datavines/engine/flink/sink/FlinkJdbcSink.java
deleted file mode 100644
index 391e886aa..000000000
--- a/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-core/src/main/java/io/datavines/engine/flink/sink/FlinkJdbcSink.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.datavines.engine.flink.core.sink;
-
-import io.datavines.common.config.CheckResult;
-import io.datavines.common.config.Config;
-import io.datavines.common.utils.StringUtils;
-import io.datavines.engine.api.env.RuntimeEnvironment;
-import io.datavines.engine.flink.api.FlinkRuntimeEnvironment;
-import io.datavines.engine.flink.api.stream.FlinkStreamSink;
-import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
-import org.apache.flink.connector.jdbc.JdbcSink;
-import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.types.Row;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSetMetaData;
-import java.util.*;
-import java.util.stream.Collectors;
-import static io.datavines.common.ConfigConstants.*;
-
-import io.datavines.engine.common.utils.ParserUtils;
-
-public class FlinkJdbcSink implements FlinkStreamSink {
-
- private static final long serialVersionUID = 1L;
-
- private Config config = new Config();
- private transient String[] fieldNames;
- private transient int batchSize = 1000;
- private transient long batchIntervalMs = 200;
- private transient int maxRetries = 3;
-
- @Override
- public void setConfig(Config config) {
- if(config != null) {
- this.config = config;
- this.batchSize = config.getInt("jdbc.batch.size", 1000);
- this.batchIntervalMs = config.getLong("jdbc.batch.interval.ms", 200L);
- this.maxRetries = config.getInt("jdbc.max.retries", 3);
- }
- }
-
- @Override
- public Config getConfig() {
- return config;
- }
-
- @Override
- public CheckResult checkConfig() {
- List requiredOptions = Arrays.asList(URL, TABLE, USER, PASSWORD);
-
- List nonExistsOptions = new ArrayList<>();
- requiredOptions.forEach(x->{
- if(!config.has(x)){
- nonExistsOptions.add(x);
- }
- });
-
- if (!nonExistsOptions.isEmpty()) {
- return new CheckResult(
- false,
- "please specify " + nonExistsOptions.stream().map(option ->
- "[" + option + "]").collect(Collectors.joining(",")) + " as non-empty string");
- } else {
- return new CheckResult(true, "");
- }
- }
-
- @Override
- public void prepare(RuntimeEnvironment env) throws Exception {
- // Load JDBC driver class
- String driver = config.getString(DRIVER, "com.mysql.jdbc.Driver");
- Class.forName(driver);
-
- // Get table metadata to initialize field names
- String url = config.getString(URL);
- String user = config.getString(USER);
- String password = config.getString(PASSWORD);
- String table = config.getString(TABLE);
-
- if (!StringUtils.isEmptyOrNullStr(password)) {
- password = ParserUtils.decode(password);
- }
-
- try (Connection conn = DriverManager.getConnection(url, user, password)) {
- try (java.sql.PreparedStatement ps = conn.prepareStatement("SELECT * FROM " + table + " WHERE 1=0")) {
- ResultSetMetaData metaData = ps.getMetaData();
- int columnCount = metaData.getColumnCount();
-
- fieldNames = new String[columnCount];
- for (int i = 0; i < columnCount; i++) {
- fieldNames[i] = metaData.getColumnName(i + 1);
- }
- }
- }
- }
-
- @Override
- public void output(DataStream dataStream, FlinkRuntimeEnvironment environment) {
- String url = config.getString(URL);
- String table = config.getString(TABLE);
- String user = config.getString(USER);
- String password = config.getString(PASSWORD);
- String driver = config.getString(DRIVER, "com.mysql.jdbc.Driver");
-
- // Decode password if needed
- if (!StringUtils.isEmptyOrNullStr(password)) {
- password = ParserUtils.decode(password);
- }
-
- // Build JDBC execution options
- JdbcExecutionOptions executionOptions = JdbcExecutionOptions.builder()
- .withBatchSize(batchSize)
- .withBatchIntervalMs(batchIntervalMs)
- .withMaxRetries(maxRetries)
- .build();
-
- // Create insert SQL statement
- String insertSql = createInsertSql(table, fieldNames);
-
- // Build JDBC sink
- dataStream.addSink(JdbcSink.sink(
- insertSql,
- (statement, row) -> {
- for (int i = 0; i < fieldNames.length; i++) {
- statement.setObject(i + 1, row.getField(i));
- }
- },
- executionOptions,
- new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
- .withUrl(url)
- .withDriverName(driver)
- .withUsername(user)
- .withPassword(password)
- .build()
- ));
- }
-
- private String createInsertSql(String table, String[] fieldNames) {
- String columns = String.join(", ", fieldNames);
- String placeholders = String.join(", ", Collections.nCopies(fieldNames.length, "?"));
- return String.format("INSERT INTO %s (%s) VALUES (%s)", table, columns, placeholders);
- }
-}
diff --git a/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-core/src/main/java/io/datavines/engine/flink/source/FlinkJdbcSource.java b/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-core/src/main/java/io/datavines/engine/flink/source/FlinkJdbcSource.java
deleted file mode 100644
index 9aef8533f..000000000
--- a/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-core/src/main/java/io/datavines/engine/flink/source/FlinkJdbcSource.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.datavines.engine.flink.core.source;
-
-import io.datavines.common.config.CheckResult;
-import io.datavines.common.config.Config;
-import io.datavines.common.utils.CryptionUtils;
-import io.datavines.common.utils.StringUtils;
-import io.datavines.engine.api.env.RuntimeEnvironment;
-import io.datavines.engine.flink.api.FlinkRuntimeEnvironment;
-import io.datavines.engine.flink.api.stream.FlinkStreamSource;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.connector.jdbc.JdbcInputFormat;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.types.Row;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSetMetaData;
-import java.util.*;
-import java.util.stream.Collectors;
-
-import static io.datavines.common.ConfigConstants.*;
-
-public class FlinkJdbcSource implements FlinkStreamSource {
-
- private static final long serialVersionUID = 1L;
-
- private Config config = new Config();
- private transient String[] fieldNames;
- private transient Class>[] fieldTypes;
-
- @Override
- public void setConfig(Config config) {
- if(config != null) {
- this.config = config;
- }
- }
-
- @Override
- public Config getConfig() {
- return config;
- }
-
- @Override
- public CheckResult checkConfig() {
- List requiredOptions = Arrays.asList(URL, TABLE, USER);
-
- List nonExistsOptions = new ArrayList<>();
- requiredOptions.forEach(x->{
- if(!config.has(x)){
- nonExistsOptions.add(x);
- }
- });
-
- if (!nonExistsOptions.isEmpty()) {
- return new CheckResult(
- false,
- "please specify " + nonExistsOptions.stream().map(option ->
- "[" + option + "]").collect(Collectors.joining(",")) + " as non-empty string");
- } else {
- return new CheckResult(true, "");
- }
- }
-
- @Override
- public void prepare(RuntimeEnvironment env) throws Exception {
- String driver = config.getString(DRIVER, "com.mysql.jdbc.Driver");
- Class.forName(driver);
-
- String url = config.getString(URL);
- String user = config.getString(USER);
- String password = config.getString(PASSWORD);
- String table = config.getString(TABLE);
- String query = config.getString(SQL, "SELECT * FROM " + table);
-
- if (!StringUtils.isEmptyOrNullStr(password)) {
- try {
- password = CryptionUtils.decryptByAES(password, "datavines");
- } catch (Exception e) {
- throw new RuntimeException("Failed to decrypt password", e);
- }
- }
-
- try (Connection conn = DriverManager.getConnection(url, user, password)) {
- try (java.sql.PreparedStatement ps = conn.prepareStatement(query)) {
- ResultSetMetaData metaData = ps.getMetaData();
- int columnCount = metaData.getColumnCount();
-
- fieldNames = new String[columnCount];
- fieldTypes = new Class>[columnCount];
-
- for (int i = 0; i < columnCount; i++) {
- fieldNames[i] = metaData.getColumnName(i + 1);
- fieldTypes[i] = Class.forName(metaData.getColumnClassName(i + 1));
- }
- }
- }
- }
-
- @Override
- public DataStream getData(FlinkRuntimeEnvironment environment) {
- String url = config.getString(URL);
- String user = config.getString(USER);
- String password = config.getString(PASSWORD);
- String driver = config.getString(DRIVER, "com.mysql.jdbc.Driver");
- String table = config.getString(TABLE);
- String query = config.getString(SQL, "SELECT * FROM " + table);
-
- if (!StringUtils.isEmptyOrNullStr(password)) {
- try {
- password = CryptionUtils.decryptByAES(password, "datavines");
- } catch (Exception e) {
- throw new RuntimeException("Failed to decrypt password", e);
- }
- }
-
- TypeInformation>[] typeInfos = new TypeInformation[fieldTypes.length];
- for (int i = 0; i < fieldTypes.length; i++) {
- typeInfos[i] = TypeInformation.of(fieldTypes[i]);
- }
- RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInfos, fieldNames);
-
- JdbcInputFormat jdbcInputFormat = JdbcInputFormat.buildJdbcInputFormat()
- .setDrivername(driver)
- .setDBUrl(url)
- .setUsername(user)
- .setPassword(password)
- .setQuery(query)
- .setRowTypeInfo(rowTypeInfo)
- .finish();
-
- return environment.getEnv().createInput(jdbcInputFormat);
- }
-
- @Override
- public String[] getFieldNames() {
- return fieldNames;
- }
-
- @Override
- public Class>[] getFieldTypes() {
- return fieldTypes;
- }
-}
diff --git a/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-jdbc/pom.xml b/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-jdbc/pom.xml
new file mode 100644
index 000000000..87507b3c3
--- /dev/null
+++ b/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-jdbc/pom.xml
@@ -0,0 +1,58 @@
+
+
+
+
+ datavines-engine-flink
+ io.datavines
+ 1.0.0-SNAPSHOT
+
+ 4.0.0
+
+ datavines-engine-flink-jdbc
+
+
+
+ io.datavines
+ datavines-engine-flink-core
+ ${project.version}
+
+
+
+
+ org.apache.flink
+ flink-connector-jdbc_${scala.binary.version}
+ ${flink.version}
+
+
+
+ org.apache.flink
+ flink-streaming-java_${scala.binary.version}
+ ${flink.version}
+
+
+
+ mysql
+ mysql-connector-java
+ 8.0.16
+
+
+
diff --git a/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-jdbc/src/main/java/io/datavines/engine/flink/jdbc/sink/JdbcSink.java b/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-jdbc/src/main/java/io/datavines/engine/flink/jdbc/sink/JdbcSink.java
new file mode 100644
index 000000000..2e6355f00
--- /dev/null
+++ b/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-jdbc/src/main/java/io/datavines/engine/flink/jdbc/sink/JdbcSink.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.datavines.engine.flink.jdbc.sink;
+
+import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.types.Row;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+
+import io.datavines.common.config.Config;
+import io.datavines.common.config.CheckResult;
+import io.datavines.engine.api.plugin.Plugin;
+import io.datavines.engine.api.env.RuntimeEnvironment;
+import io.datavines.engine.api.component.Component;
+
+public class JdbcSink implements Plugin, Component {
+
+ private String driverName;
+ private String jdbcUrl;
+ private String username;
+ private String password;
+ private String query;
+ private Config config;
+
+ public SinkFunction getSink() {
+ return org.apache.flink.connector.jdbc.JdbcSink.sink(
+ query,
+ (statement, row) -> {
+ // Need to be implemented based on actual schema
+ for (int i = 0; i < row.getArity(); i++) {
+ statement.setObject(i + 1, row.getField(i));
+ }
+ },
+ JdbcExecutionOptions.builder()
+ .withBatchSize(1000)
+ .withBatchIntervalMs(200)
+ .withMaxRetries(5)
+ .build(),
+ new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
+ .withUrl(jdbcUrl)
+ .withDriverName(driverName)
+ .withUsername(username)
+ .withPassword(password)
+ .build()
+ );
+ }
+
+ public void setDriverName(String driverName) {
+ this.driverName = driverName;
+ }
+
+ public void setJdbcUrl(String jdbcUrl) {
+ this.jdbcUrl = jdbcUrl;
+ }
+
+ public void setUsername(String username) {
+ this.username = username;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public void setQuery(String query) {
+ this.query = query;
+ }
+
+ @Override
+ public void setConfig(Config config) {
+ this.config = config;
+ this.driverName = config.getString("driverName");
+ this.jdbcUrl = config.getString("jdbcUrl");
+ this.username = config.getString("username");
+ this.password = config.getString("password");
+ this.query = config.getString("query");
+ }
+
+ @Override
+ public Config getConfig() {
+ return config;
+ }
+
+ @Override
+ public CheckResult checkConfig() {
+ if (driverName == null || driverName.isEmpty()) {
+ return new CheckResult(false, "driverName cannot be empty");
+ }
+ if (jdbcUrl == null || jdbcUrl.isEmpty()) {
+ return new CheckResult(false, "jdbcUrl cannot be empty");
+ }
+ if (username == null || username.isEmpty()) {
+ return new CheckResult(false, "username cannot be empty");
+ }
+ if (password == null || password.isEmpty()) {
+ return new CheckResult(false, "password cannot be empty");
+ }
+ if (query == null || query.isEmpty()) {
+ return new CheckResult(false, "query cannot be empty");
+ }
+ return new CheckResult(true, "");
+ }
+
+ @Override
+ public void prepare(RuntimeEnvironment env) throws Exception {
+ // Load JDBC driver
+ Class.forName(driverName);
+ }
+}
diff --git a/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-jdbc/src/main/java/io/datavines/engine/flink/jdbc/source/JdbcSource.java b/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-jdbc/src/main/java/io/datavines/engine/flink/jdbc/source/JdbcSource.java
new file mode 100644
index 000000000..89e40d153
--- /dev/null
+++ b/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-jdbc/src/main/java/io/datavines/engine/flink/jdbc/source/JdbcSource.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.datavines.engine.flink.jdbc.source;
+
+import org.apache.flink.connector.jdbc.JdbcInputFormat;
+import org.apache.flink.types.Row;
+
+import io.datavines.common.config.Config;
+import io.datavines.common.config.CheckResult;
+import io.datavines.engine.api.plugin.Plugin;
+import io.datavines.engine.api.env.RuntimeEnvironment;
+import io.datavines.engine.api.component.Component;
+
+public class JdbcSource implements Plugin, Component {
+
+ private String driverName;
+ private String jdbcUrl;
+ private String username;
+ private String password;
+ private String query;
+ private Config config;
+
+ public JdbcInputFormat getSource() {
+ return JdbcInputFormat.buildJdbcInputFormat()
+ .setDrivername(driverName)
+ .setDBUrl(jdbcUrl)
+ .setUsername(username)
+ .setPassword(password)
+ .setQuery(query)
+ .setRowTypeInfo(null) // Need to be implemented based on actual schema
+ .finish();
+ }
+
+ public void setDriverName(String driverName) {
+ this.driverName = driverName;
+ }
+
+ public void setJdbcUrl(String jdbcUrl) {
+ this.jdbcUrl = jdbcUrl;
+ }
+
+ public void setUsername(String username) {
+ this.username = username;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public void setQuery(String query) {
+ this.query = query;
+ }
+
+ @Override
+ public void setConfig(Config config) {
+ this.config = config;
+ this.driverName = config.getString("driverName");
+ this.jdbcUrl = config.getString("jdbcUrl");
+ this.username = config.getString("username");
+ this.password = config.getString("password");
+ this.query = config.getString("query");
+ }
+
+ @Override
+ public Config getConfig() {
+ return config;
+ }
+
+ @Override
+ public CheckResult checkConfig() {
+ if (driverName == null || driverName.isEmpty()) {
+ return new CheckResult(false, "driverName cannot be empty");
+ }
+ if (jdbcUrl == null || jdbcUrl.isEmpty()) {
+ return new CheckResult(false, "jdbcUrl cannot be empty");
+ }
+ if (username == null || username.isEmpty()) {
+ return new CheckResult(false, "username cannot be empty");
+ }
+ if (password == null || password.isEmpty()) {
+ return new CheckResult(false, "password cannot be empty");
+ }
+ if (query == null || query.isEmpty()) {
+ return new CheckResult(false, "query cannot be empty");
+ }
+ return new CheckResult(true, "");
+ }
+
+ @Override
+ public void prepare(RuntimeEnvironment env) throws Exception {
+ // Load JDBC driver
+ Class.forName(driverName);
+ }
+}
diff --git a/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-transform/pom.xml b/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-transform/pom.xml
new file mode 100644
index 000000000..658f8deb6
--- /dev/null
+++ b/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-transform/pom.xml
@@ -0,0 +1,51 @@
+
+
+
+
+ datavines-engine-flink
+ io.datavines
+ 1.0.0-SNAPSHOT
+
+ 4.0.0
+
+ datavines-engine-flink-transform
+
+
+
+ io.datavines
+ datavines-engine-flink-core
+ ${project.version}
+
+
+
+ org.apache.flink
+ flink-table-api-java-bridge_${scala.binary.version}
+ ${flink.version}
+
+
+
+ org.apache.flink
+ flink-streaming-java_${scala.binary.version}
+ ${flink.version}
+
+
+
diff --git a/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-core/src/main/java/io/datavines/engine/flink/transform/FlinkSqlTransform.java b/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-transform/src/main/java/io/datavines/engine/flink/transform/FlinkSqlTransform.java
similarity index 53%
rename from datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-core/src/main/java/io/datavines/engine/flink/transform/FlinkSqlTransform.java
rename to datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-transform/src/main/java/io/datavines/engine/flink/transform/FlinkSqlTransform.java
index 831aed1b0..a62abe761 100644
--- a/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-core/src/main/java/io/datavines/engine/flink/transform/FlinkSqlTransform.java
+++ b/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-transform/src/main/java/io/datavines/engine/flink/transform/FlinkSqlTransform.java
@@ -14,89 +14,64 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package io.datavines.engine.flink.core.transform;
-
-import io.datavines.common.config.CheckResult;
-import io.datavines.common.config.Config;
-import io.datavines.engine.api.env.RuntimeEnvironment;
-import io.datavines.engine.flink.api.FlinkRuntimeEnvironment;
-import io.datavines.engine.flink.api.stream.FlinkStreamTransform;
-import io.datavines.engine.api.plugin.Plugin;
-import io.datavines.common.utils.StringUtils;
+package io.datavines.engine.flink.transform;
import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.types.Row;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-import static io.datavines.common.ConfigConstants.SQL;
+import io.datavines.common.config.Config;
+import io.datavines.common.config.CheckResult;
+import io.datavines.engine.api.plugin.Plugin;
+import io.datavines.engine.api.env.RuntimeEnvironment;
+import io.datavines.engine.api.component.Component;
-public class FlinkSqlTransform implements FlinkStreamTransform, Plugin {
+public class FlinkSqlTransform implements Plugin, Component {
private String sql;
- private String[] outputFieldNames;
- private Class>[] outputFieldTypes;
+ private StreamTableEnvironment tableEnv;
private Config config;
+ public FlinkSqlTransform(StreamTableEnvironment tableEnv) {
+ this.tableEnv = tableEnv;
+ }
+
+ public DataStream> transform(DataStream> input) {
+ // Register input as table
+ tableEnv.createTemporaryView("input_table", input);
+
+ // Execute SQL transformation
+ Table resultTable = tableEnv.sqlQuery(sql);
+
+ // Convert back to DataStream
+ return tableEnv.toDataStream(resultTable);
+ }
+
+ public void setSql(String sql) {
+ this.sql = sql;
+ }
+
@Override
public void setConfig(Config config) {
this.config = config;
- if (config != null) {
- this.sql = config.getString(SQL);
- }
+ this.sql = config.getString("sql");
}
-
+
@Override
public Config getConfig() {
- return this.config;
+ return config;
}
-
+
@Override
public CheckResult checkConfig() {
- if (StringUtils.isEmptyOrNullStr(sql)) {
- return new CheckResult(false, "please specify [sql] as non-empty string");
+ if (sql == null || sql.isEmpty()) {
+ return new CheckResult(false, "sql cannot be empty");
}
return new CheckResult(true, "");
}
@Override
public void prepare(RuntimeEnvironment env) throws Exception {
- // No special preparation needed for SQL transform
- }
-
- @Override
- public DataStream process(DataStream dataStream, FlinkRuntimeEnvironment environment) {
- StreamTableEnvironment tableEnv = environment.getTableEnv();
-
- // Register input table
- tableEnv.createTemporaryView("input_table", dataStream);
-
- // Execute SQL transformation
- Table resultTable = tableEnv.sqlQuery(sql);
-
- // Convert back to DataStream
- return tableEnv.toDataStream(resultTable, Row.class);
- }
-
- @Override
- public String[] getOutputFieldNames() {
- return outputFieldNames;
- }
-
- @Override
- public Class>[] getOutputFieldTypes() {
- return outputFieldTypes;
- }
-
- public void setSql(String sql) {
- this.sql = sql;
- }
-
- public void setOutputFieldNames(String[] outputFieldNames) {
- this.outputFieldNames = outputFieldNames;
- }
-
- public void setOutputFieldTypes(Class>[] outputFieldTypes) {
- this.outputFieldTypes = outputFieldTypes;
+ // No preparation needed
}
}
diff --git a/datavines-engine/datavines-engine-plugins/datavines-engine-flink/delete_dirs.java b/datavines-engine/datavines-engine-plugins/datavines-engine-flink/delete_dirs.java
new file mode 100644
index 000000000..acff573db
--- /dev/null
+++ b/datavines-engine/datavines-engine-plugins/datavines-engine-flink/delete_dirs.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.File;
+
+public class DeleteDirs {
+ public static void deleteDirectory(File dir) {
+ if (dir.isDirectory()) {
+ File[] files = dir.listFiles();
+ if (files != null) {
+ for (File file : files) {
+ deleteDirectory(file);
+ }
+ }
+ }
+ dir.delete();
+ }
+
+ public static void main(String[] args) {
+ String basePath = "e:/datavines-new/datavines-engine/datavines-engine-plugins/datavines-engine-flink/datavines-engine-flink-core/src/main/java/io/datavines/engine/flink/";
+ String[] dirs = {"config", "sink", "source", "transform"};
+
+ for (String dir : dirs) {
+ File file = new File(basePath + dir);
+ if (file.exists()) {
+ deleteDirectory(file);
+ System.out.println("Deleted directory: " + file.getAbsolutePath());
+ }
+ }
+ }
+}
diff --git a/datavines-engine/datavines-engine-plugins/datavines-engine-flink/pom.xml b/datavines-engine/datavines-engine-plugins/datavines-engine-flink/pom.xml
index b9e104bb8..0209d987a 100644
--- a/datavines-engine/datavines-engine-plugins/datavines-engine-flink/pom.xml
+++ b/datavines-engine/datavines-engine-plugins/datavines-engine-flink/pom.xml
@@ -33,6 +33,9 @@
datavines-engine-flink-api
datavines-engine-flink-core
+ datavines-engine-flink-config
+ datavines-engine-flink-transform
+ datavines-engine-flink-jdbc
datavines-engine-flink-executor
diff --git a/datavines-ui/src/locale/en_US.ts b/datavines-ui/src/locale/en_US.ts
index 364225577..23a11bcac 100644
--- a/datavines-ui/src/locale/en_US.ts
+++ b/datavines-ui/src/locale/en_US.ts
@@ -309,8 +309,9 @@ export default {
profile_schedule: 'Profile Schedule',
config_title: 'Config Management',
- config_var_key: 'Config Key',
- config_var_value: 'Config Value',
+ config_var_key: 'Variable Key',
+ config_var_value: 'Variable Value',
+ config_type: 'Configuration Type',
create_config: 'Create Config',
token_title: 'Token Management',
diff --git a/datavines-ui/src/locale/zh_CN.ts b/datavines-ui/src/locale/zh_CN.ts
index 0ace2b7de..ed9482c71 100644
--- a/datavines-ui/src/locale/zh_CN.ts
+++ b/datavines-ui/src/locale/zh_CN.ts
@@ -309,8 +309,9 @@ export default {
profile_schedule: '数据概览调度配置',
config_title: '参数管理',
- config_var_key: '参数名',
- config_var_value: '参数值',
+ config_var_key: '变量键',
+ config_var_value: '变量值',
+ config_type: '配置类型',
create_config: '创建参数',
token_title: '令牌管理',
diff --git a/datavines-ui/src/view/Main/Config/CreateConfig.tsx b/datavines-ui/src/view/Main/Config/CreateConfig.tsx
index e484a7584..f2c477fff 100644
--- a/datavines-ui/src/view/Main/Config/CreateConfig.tsx
+++ b/datavines-ui/src/view/Main/Config/CreateConfig.tsx
@@ -1,6 +1,6 @@
import React, { useRef, useState, useImperativeHandle } from 'react';
import {
- Input, ModalProps, Form, FormInstance, message,
+ Input, ModalProps, Form, FormInstance, message, Select, Option,
} from 'antd';
import { useIntl } from 'react-intl';
import {
@@ -9,6 +9,7 @@ import {
import { $http } from '@/http';
import { useSelector } from '@/store';
import { TConfigTableItem } from "@/type/config";
+import { FlinkConfiguration } from './FlinkConfiguration';
type InnerProps = {
form: FormInstance,
@@ -50,6 +51,32 @@ export const CreateConfigComponent = ({ form, detail, innerRef }: InnerProps) =>
],
widget: ,
},
+ {
+ label: intl.formatMessage({ id: 'config_type' }),
+ name: 'type',
+ initialValue: detail?.type || 'flink',
+ rules: [
+ {
+ required: true,
+ message: intl.formatMessage({ id: 'common_required_tip' }),
+ },
+ ],
+ widget: (
+
+ ),
+ },
+ {
+ name: 'flinkConfig',
+ shouldUpdate: true,
+ noStyle: true,
+ widget: ({ getFieldValue }) => {
+ const type = getFieldValue('type');
+ return type === 'flink' ? : null;
+ },
+ },
],
};
useImperativeHandle(innerRef, () => ({
diff --git a/datavines-ui/src/view/Main/Config/FlinkConfiguration.tsx b/datavines-ui/src/view/Main/Config/FlinkConfiguration.tsx
new file mode 100644
index 000000000..a23ca0640
--- /dev/null
+++ b/datavines-ui/src/view/Main/Config/FlinkConfiguration.tsx
@@ -0,0 +1,84 @@
+import React from 'react';
+import { Form, Input, Select, FormInstance } from 'antd';
+import { useIntl } from 'react-intl';
+import { FormRender, IFormRender } from '@/common';
+
+const { Option } = Select;
+
+type InnerProps = {
+ form: FormInstance,
+ detail?: any
+}
+
+export const FlinkConfiguration = ({ form, detail }: InnerProps) => {
+ const intl = useIntl();
+
+ const schema: IFormRender = {
+ name: 'flink-config-form',
+ layout: 'vertical',
+ formItemProps: {
+ style: { marginBottom: 10 },
+ },
+ meta: [
+ {
+ label: intl.formatMessage({ id: 'dv_deploy_mode' }),
+ name: 'deployMode',
+ initialValue: detail?.deployMode,
+ rules: [
+ {
+ required: true,
+ message: intl.formatMessage({ id: 'dv_deploy_mode_required' }),
+ },
+ ],
+ widget: (
+
+ ),
+ },
+ {
+ label: intl.formatMessage({ id: 'dv_flink_home' }),
+ name: 'flinkHome',
+ initialValue: detail?.flinkHome,
+ rules: [
+ {
+ required: true,
+ message: intl.formatMessage({ id: 'dv_flink_home_required' }),
+ },
+ ],
+ widget: ,
+ },
+ {
+ label: intl.formatMessage({ id: 'dv_jobmanager_memory' }),
+ name: 'jobmanagerMemory',
+ initialValue: detail?.jobmanagerMemory,
+ rules: [
+ {
+ required: true,
+ message: intl.formatMessage({ id: 'dv_jobmanager_memory_required' }),
+ },
+ ],
+ widget: ,
+ },
+ {
+ label: intl.formatMessage({ id: 'dv_taskmanager_memory' }),
+ name: 'taskmanagerMemory',
+ initialValue: detail?.taskmanagerMemory,
+ rules: [
+ {
+ required: true,
+ message: intl.formatMessage({ id: 'dv_taskmanager_memory_required' }),
+ },
+ ],
+ widget: ,
+ },
+ ],
+ };
+
+ return ;
+};
+
+export default FlinkConfiguration;