From d46ae1d700b4a3295f6831c76518563154d23c4d Mon Sep 17 00:00:00 2001 From: zhuhuipei <> Date: Mon, 20 Mar 2023 16:56:49 +0800 Subject: [PATCH 1/2] fix-test --- flink-streaming-web/src/main/resources/application.properties | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-streaming-web/src/main/resources/application.properties b/flink-streaming-web/src/main/resources/application.properties index 92669d6b..7882f465 100644 --- a/flink-streaming-web/src/main/resources/application.properties +++ b/flink-streaming-web/src/main/resources/application.properties @@ -8,10 +8,10 @@ spring.http.encoding.force=true mybatis.mapper-locations=classpath:mapper/*.xml spring.servlet.multipart.max-file-size=1024MB spring.servlet.multipart.max-request-size=1024MB -####jdbc连接池 +#### spring.datasource.url=jdbc:mysql://localhost:3306/flink_web?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=false spring.datasource.username=root -spring.datasource.password=root +spring.datasource.password=root123456 spring.datasource.driver-class-name=com.mysql.jdbc.Driver spring.datasource.type=com.alibaba.druid.pool.DruidDataSource spring.datasource.druid.initial-size=5 From 892f2ec3263a2e941e1ef5a766bb245815c09f4c Mon Sep 17 00:00:00 2001 From: zhuhuipei <> Date: Sat, 8 Apr 2023 09:32:55 +0800 Subject: [PATCH 2/2] =?UTF-8?q?flink=E7=89=88=E6=9C=AC=E5=8D=87=E7=BA=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/compile.md | 2 +- docs/deploy.md | 4 ++-- flink-streaming-core/pom.xml | 10 +++++----- .../java/com/flink/streaming/core/JobApplication.java | 7 +++---- .../com/flink/streaming/core/execute/ExecuteSql.java | 8 +++++--- flink-streaming-core/src/test/java/Demo.java | 1 - flink-streaming-core/src/test/java/Test.java | 1 - .../flink/streaming/sql/validation/SqlValidation.java | 2 -- .../src/main/resources/application.properties | 2 +- pom.xml | 4 ++-- 10 files changed, 19 insertions(+), 22 deletions(-) diff --git a/docs/compile.md b/docs/compile.md index ee4b214f..f5189132 100644 --- a/docs/compile.md +++ b/docs/compile.md @@ -1,4 +1,4 @@ -目前web客户端支持的flink版本是1.14.3,如果需要调整flink版本可下载源码 +目前web客户端支持的flink版本是1.15.3,如果需要调整flink版本可下载源码 然后修改pom里面的版本号 https://github.com/zhp8341/flink-streaming-platform-web/blob/master/pom.xml ~~~~ 1.12.0 diff --git a/docs/deploy.md b/docs/deploy.md index 546ba54e..fa4eca19 100644 --- a/docs/deploy.md +++ b/docs/deploy.md @@ -24,10 +24,10 @@ mysql版本 5.6+ #### 1、flink客户端安装 下载对应版本 -https://www.apache.org/dyn/closer.lua/flink/flink-1.14.3/flink-1.14.3-bin-scala_2.11.tgz 然后解压 +https://www.apache.org/dyn/closer.lua/flink/flink-1.15.3/flink-1.15.3-bin-scala_2.11.tgz 然后解压 -a: /flink-1.14.3/conf +a: /flink-1.15.3/conf **1、YARN_PER模式** diff --git a/flink-streaming-core/pom.xml b/flink-streaming-core/pom.xml index 3cb5547a..5fac07d6 100644 --- a/flink-streaming-core/pom.xml +++ b/flink-streaming-core/pom.xml @@ -97,7 +97,7 @@ org.apache.flink - flink-table-api-java-bridge_${scala.binary.version} + flink-table-api-java-bridge ${flink.version} provided @@ -123,7 +123,7 @@ org.apache.flink - flink-streaming-java_${scala.binary.version} + flink-streaming-java ${flink.version} provided @@ -152,7 +152,7 @@ org.apache.flink - flink-runtime-web_${scala.binary.version} + flink-runtime-web ${flink.version} provided @@ -166,7 +166,7 @@ org.apache.flink - flink-statebackend-rocksdb_${scala.binary.version} + flink-statebackend-rocksdb ${flink.version} provided @@ -174,7 +174,7 @@ org.apache.flink - flink-connector-hive_2.11 + flink-connector-hive_${scala.binary.version} ${flink.version} provided diff --git a/flink-streaming-core/src/main/java/com/flink/streaming/core/JobApplication.java b/flink-streaming-core/src/main/java/com/flink/streaming/core/JobApplication.java index 7208c66b..2a882415 100644 --- a/flink-streaming-core/src/main/java/com/flink/streaming/core/JobApplication.java +++ b/flink-streaming-core/src/main/java/com/flink/streaming/core/JobApplication.java @@ -16,7 +16,6 @@ import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.JobID; import org.apache.flink.api.java.utils.ParameterTool; -import org.apache.flink.calcite.shaded.com.google.common.base.Preconditions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; @@ -60,7 +59,6 @@ public static void main(String[] args) { LOG.info("[SQL_BATCH]本次任务是批任务"); //批处理 settings = EnvironmentSettings.newInstance() - .useBlinkPlanner() .inBatchMode() .build(); tEnv = TableEnvironment.create(settings); @@ -70,7 +68,6 @@ public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); settings = EnvironmentSettings.newInstance() - .useBlinkPlanner() .inStreamingMode() .build(); tEnv = StreamTableEnvironment.create(env, settings); @@ -97,7 +94,9 @@ public static void main(String[] args) { private static JobRunParam buildParam(String[] args) throws Exception { ParameterTool parameterTool = ParameterTool.fromArgs(args); String sqlPath = parameterTool.get("sql"); - Preconditions.checkNotNull(sqlPath, "-sql参数 不能为空"); + if (StringUtils.isEmpty(sqlPath)) { + throw new NullPointerException("-sql参数 不能为空"); + } JobRunParam jobRunParam = new JobRunParam(); jobRunParam.setSqlPath(sqlPath); jobRunParam.setCheckPointParam(CheckPointParams.buildCheckPointParam(parameterTool)); diff --git a/flink-streaming-core/src/main/java/com/flink/streaming/core/execute/ExecuteSql.java b/flink-streaming-core/src/main/java/com/flink/streaming/core/execute/ExecuteSql.java index 775414da..3f187de5 100644 --- a/flink-streaming-core/src/main/java/com/flink/streaming/core/execute/ExecuteSql.java +++ b/flink-streaming-core/src/main/java/com/flink/streaming/core/execute/ExecuteSql.java @@ -12,9 +12,9 @@ import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.internal.TableEnvironmentInternal; import org.apache.flink.table.delegation.Parser; -import org.apache.flink.table.operations.CatalogSinkModifyOperation; import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.SinkModifyOperation; import org.apache.flink.table.operations.command.SetOperation; /** @@ -61,6 +61,7 @@ public static JobID exeSql(List sqlList, TableEnvironment tEnv) { break; case "BeginStatementSetOperation": + case "EndStatementSetOperation": System.out.println("####stmt= " + stmt); log.info("####stmt={}", stmt); break; @@ -89,10 +90,11 @@ public static JobID exeSql(List sqlList, TableEnvironment tEnv) { case "NopOperation": ((TableEnvironmentInternal) tEnv).executeInternal(parser.parse(stmt).get(0)); break; - case "CatalogSinkModifyOperation": - modifyOperationList.add((CatalogSinkModifyOperation) operation); + case "SinkModifyOperation": + modifyOperationList.add((SinkModifyOperation) operation); break; default: + log.error("不支持此Operation类型 {}", operation.getClass().getSimpleName()); throw new RuntimeException("不支持该语法 sql=" + stmt); } } diff --git a/flink-streaming-core/src/test/java/Demo.java b/flink-streaming-core/src/test/java/Demo.java index d6078057..dced9086 100644 --- a/flink-streaming-core/src/test/java/Demo.java +++ b/flink-streaming-core/src/test/java/Demo.java @@ -23,7 +23,6 @@ public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); settings = EnvironmentSettings.newInstance() - .useBlinkPlanner() .inStreamingMode() .build(); tEnv = StreamTableEnvironment.create(env, settings); diff --git a/flink-streaming-core/src/test/java/Test.java b/flink-streaming-core/src/test/java/Test.java index 2ce97b15..36afedd8 100644 --- a/flink-streaming-core/src/test/java/Test.java +++ b/flink-streaming-core/src/test/java/Test.java @@ -65,7 +65,6 @@ public static void main(String[] args) throws IOException { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); settings = EnvironmentSettings.newInstance() - .useBlinkPlanner() .inStreamingMode() .build(); tEnv = StreamTableEnvironment.create(env, settings); diff --git a/flink-streaming-validation/src/main/java/com/flink/streaming/sql/validation/SqlValidation.java b/flink-streaming-validation/src/main/java/com/flink/streaming/sql/validation/SqlValidation.java index a96a337a..7c232bbd 100644 --- a/flink-streaming-validation/src/main/java/com/flink/streaming/sql/validation/SqlValidation.java +++ b/flink-streaming-validation/src/main/java/com/flink/streaming/sql/validation/SqlValidation.java @@ -46,7 +46,6 @@ public static void explainStmt(List stmtList) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance() - .useBlinkPlanner() .inStreamingMode() .build(); @@ -102,7 +101,6 @@ public static void preCheckSql(List sql) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance() - .useBlinkPlanner() .inStreamingMode() .build(); diff --git a/flink-streaming-web/src/main/resources/application.properties b/flink-streaming-web/src/main/resources/application.properties index 7882f465..407a402f 100644 --- a/flink-streaming-web/src/main/resources/application.properties +++ b/flink-streaming-web/src/main/resources/application.properties @@ -8,7 +8,7 @@ spring.http.encoding.force=true mybatis.mapper-locations=classpath:mapper/*.xml spring.servlet.multipart.max-file-size=1024MB spring.servlet.multipart.max-request-size=1024MB -#### +####jdbc连接池A spring.datasource.url=jdbc:mysql://localhost:3306/flink_web?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=false spring.datasource.username=root spring.datasource.password=root123456 diff --git a/pom.xml b/pom.xml index b9b3e3d1..9d03c132 100644 --- a/pom.xml +++ b/pom.xml @@ -38,8 +38,8 @@ 1.8 UTF-8 1.5.0.RELEASE - 1.14.3 - 2.11 + 1.15.3 + 2.12 2.2.8.RELEASE