diff --git a/docs/compile.md b/docs/compile.md
index ee4b214f..f5189132 100644
--- a/docs/compile.md
+++ b/docs/compile.md
@@ -1,4 +1,4 @@
-目前web客户端支持的flink版本是1.14.3,如果需要调整flink版本可下载源码
+目前web客户端支持的flink版本是1.15.3,如果需要调整flink版本可下载源码
然后修改pom里面的版本号 https://github.com/zhp8341/flink-streaming-platform-web/blob/master/pom.xml
~~~~
1.12.0
diff --git a/docs/deploy.md b/docs/deploy.md
index 546ba54e..fa4eca19 100644
--- a/docs/deploy.md
+++ b/docs/deploy.md
@@ -24,10 +24,10 @@ mysql版本 5.6+
#### 1、flink客户端安装
下载对应版本
-https://www.apache.org/dyn/closer.lua/flink/flink-1.14.3/flink-1.14.3-bin-scala_2.11.tgz 然后解压
+https://www.apache.org/dyn/closer.lua/flink/flink-1.15.3/flink-1.15.3-bin-scala_2.11.tgz 然后解压
-a: /flink-1.14.3/conf
+a: /flink-1.15.3/conf
**1、YARN_PER模式**
diff --git a/flink-streaming-core/pom.xml b/flink-streaming-core/pom.xml
index 3cb5547a..5fac07d6 100644
--- a/flink-streaming-core/pom.xml
+++ b/flink-streaming-core/pom.xml
@@ -97,7 +97,7 @@
org.apache.flink
- flink-table-api-java-bridge_${scala.binary.version}
+ flink-table-api-java-bridge
${flink.version}
provided
@@ -123,7 +123,7 @@
org.apache.flink
- flink-streaming-java_${scala.binary.version}
+ flink-streaming-java
${flink.version}
provided
@@ -152,7 +152,7 @@
org.apache.flink
- flink-runtime-web_${scala.binary.version}
+ flink-runtime-web
${flink.version}
provided
@@ -166,7 +166,7 @@
org.apache.flink
- flink-statebackend-rocksdb_${scala.binary.version}
+ flink-statebackend-rocksdb
${flink.version}
provided
@@ -174,7 +174,7 @@
org.apache.flink
- flink-connector-hive_2.11
+ flink-connector-hive_${scala.binary.version}
${flink.version}
provided
diff --git a/flink-streaming-core/src/main/java/com/flink/streaming/core/JobApplication.java b/flink-streaming-core/src/main/java/com/flink/streaming/core/JobApplication.java
index 7208c66b..2a882415 100644
--- a/flink-streaming-core/src/main/java/com/flink/streaming/core/JobApplication.java
+++ b/flink-streaming-core/src/main/java/com/flink/streaming/core/JobApplication.java
@@ -16,7 +16,6 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.calcite.shaded.com.google.common.base.Preconditions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
@@ -60,7 +59,6 @@ public static void main(String[] args) {
LOG.info("[SQL_BATCH]本次任务是批任务");
//批处理
settings = EnvironmentSettings.newInstance()
- .useBlinkPlanner()
.inBatchMode()
.build();
tEnv = TableEnvironment.create(settings);
@@ -70,7 +68,6 @@ public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
settings = EnvironmentSettings.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
tEnv = StreamTableEnvironment.create(env, settings);
@@ -97,7 +94,9 @@ public static void main(String[] args) {
private static JobRunParam buildParam(String[] args) throws Exception {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String sqlPath = parameterTool.get("sql");
- Preconditions.checkNotNull(sqlPath, "-sql参数 不能为空");
+ if (StringUtils.isEmpty(sqlPath)) {
+ throw new NullPointerException("-sql参数 不能为空");
+ }
JobRunParam jobRunParam = new JobRunParam();
jobRunParam.setSqlPath(sqlPath);
jobRunParam.setCheckPointParam(CheckPointParams.buildCheckPointParam(parameterTool));
diff --git a/flink-streaming-core/src/main/java/com/flink/streaming/core/execute/ExecuteSql.java b/flink-streaming-core/src/main/java/com/flink/streaming/core/execute/ExecuteSql.java
index 775414da..3f187de5 100644
--- a/flink-streaming-core/src/main/java/com/flink/streaming/core/execute/ExecuteSql.java
+++ b/flink-streaming-core/src/main/java/com/flink/streaming/core/execute/ExecuteSql.java
@@ -12,9 +12,9 @@
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.delegation.Parser;
-import org.apache.flink.table.operations.CatalogSinkModifyOperation;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.SinkModifyOperation;
import org.apache.flink.table.operations.command.SetOperation;
/**
@@ -61,6 +61,7 @@ public static JobID exeSql(List sqlList, TableEnvironment tEnv) {
break;
case "BeginStatementSetOperation":
+ case "EndStatementSetOperation":
System.out.println("####stmt= " + stmt);
log.info("####stmt={}", stmt);
break;
@@ -89,10 +90,11 @@ public static JobID exeSql(List sqlList, TableEnvironment tEnv) {
case "NopOperation":
((TableEnvironmentInternal) tEnv).executeInternal(parser.parse(stmt).get(0));
break;
- case "CatalogSinkModifyOperation":
- modifyOperationList.add((CatalogSinkModifyOperation) operation);
+ case "SinkModifyOperation":
+ modifyOperationList.add((SinkModifyOperation) operation);
break;
default:
+ log.error("不支持此Operation类型 {}", operation.getClass().getSimpleName());
throw new RuntimeException("不支持该语法 sql=" + stmt);
}
}
diff --git a/flink-streaming-core/src/test/java/Demo.java b/flink-streaming-core/src/test/java/Demo.java
index d6078057..dced9086 100644
--- a/flink-streaming-core/src/test/java/Demo.java
+++ b/flink-streaming-core/src/test/java/Demo.java
@@ -23,7 +23,6 @@ public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
settings = EnvironmentSettings.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
tEnv = StreamTableEnvironment.create(env, settings);
diff --git a/flink-streaming-core/src/test/java/Test.java b/flink-streaming-core/src/test/java/Test.java
index 2ce97b15..36afedd8 100644
--- a/flink-streaming-core/src/test/java/Test.java
+++ b/flink-streaming-core/src/test/java/Test.java
@@ -65,7 +65,6 @@ public static void main(String[] args) throws IOException {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
settings = EnvironmentSettings.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
tEnv = StreamTableEnvironment.create(env, settings);
diff --git a/flink-streaming-validation/src/main/java/com/flink/streaming/sql/validation/SqlValidation.java b/flink-streaming-validation/src/main/java/com/flink/streaming/sql/validation/SqlValidation.java
index a96a337a..7c232bbd 100644
--- a/flink-streaming-validation/src/main/java/com/flink/streaming/sql/validation/SqlValidation.java
+++ b/flink-streaming-validation/src/main/java/com/flink/streaming/sql/validation/SqlValidation.java
@@ -46,7 +46,6 @@ public static void explainStmt(List stmtList) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
@@ -102,7 +101,6 @@ public static void preCheckSql(List sql) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance()
- .useBlinkPlanner()
.inStreamingMode()
.build();
diff --git a/flink-streaming-web/src/main/resources/application.properties b/flink-streaming-web/src/main/resources/application.properties
index 92669d6b..407a402f 100644
--- a/flink-streaming-web/src/main/resources/application.properties
+++ b/flink-streaming-web/src/main/resources/application.properties
@@ -8,10 +8,10 @@ spring.http.encoding.force=true
mybatis.mapper-locations=classpath:mapper/*.xml
spring.servlet.multipart.max-file-size=1024MB
spring.servlet.multipart.max-request-size=1024MB
-####jdbc连接池
+####jdbc连接池A
spring.datasource.url=jdbc:mysql://localhost:3306/flink_web?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=false
spring.datasource.username=root
-spring.datasource.password=root
+spring.datasource.password=root123456
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
spring.datasource.druid.initial-size=5
diff --git a/pom.xml b/pom.xml
index b9b3e3d1..9d03c132 100644
--- a/pom.xml
+++ b/pom.xml
@@ -38,8 +38,8 @@
1.8
UTF-8
1.5.0.RELEASE
- 1.14.3
- 2.11
+ 1.15.3
+ 2.12
2.2.8.RELEASE