Skip to content

Commit

Permalink
Merge pull request #100 from zhp8341/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
zhp8341 authored Apr 8, 2023
2 parents c4af50a + 892f2ec commit 88b130b
Show file tree
Hide file tree
Showing 10 changed files with 20 additions and 23 deletions.
2 changes: 1 addition & 1 deletion docs/compile.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
目前web客户端支持的flink版本是1.14.3,如果需要调整flink版本可下载源码
目前web客户端支持的flink版本是1.15.3,如果需要调整flink版本可下载源码
然后修改pom里面的版本号 https://github.com/zhp8341/flink-streaming-platform-web/blob/master/pom.xml
~~~~
<flink.version>1.12.0</flink.version> <!--flink版本-->
Expand Down
4 changes: 2 additions & 2 deletions docs/deploy.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ mysql版本 5.6+
#### 1、flink客户端安装

下载对应版本
https://www.apache.org/dyn/closer.lua/flink/flink-1.14.3/flink-1.14.3-bin-scala_2.11.tgz 然后解压
https://www.apache.org/dyn/closer.lua/flink/flink-1.15.3/flink-1.15.3-bin-scala_2.11.tgz 然后解压


a: /flink-1.14.3/conf
a: /flink-1.15.3/conf

**1、YARN_PER模式**

Expand Down
10 changes: 5 additions & 5 deletions flink-streaming-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
Expand All @@ -123,7 +123,7 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
<exclusions>
Expand Down Expand Up @@ -152,7 +152,7 @@

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
<artifactId>flink-runtime-web</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
<exclusions>
Expand All @@ -166,15 +166,15 @@
<!-- rocksdb-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
<artifactId>flink-statebackend-rocksdb</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<!-- Hive Connector的支持,仅在编译时生效-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_2.11</artifactId>
<artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.calcite.shaded.com.google.common.base.Preconditions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
Expand Down Expand Up @@ -60,7 +59,6 @@ public static void main(String[] args) {
LOG.info("[SQL_BATCH]本次任务是批任务");
//批处理
settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inBatchMode()
.build();
tEnv = TableEnvironment.create(settings);
Expand All @@ -70,7 +68,6 @@ public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
tEnv = StreamTableEnvironment.create(env, settings);
Expand All @@ -97,7 +94,9 @@ public static void main(String[] args) {
private static JobRunParam buildParam(String[] args) throws Exception {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String sqlPath = parameterTool.get("sql");
Preconditions.checkNotNull(sqlPath, "-sql参数 不能为空");
if (StringUtils.isEmpty(sqlPath)) {
throw new NullPointerException("-sql参数 不能为空");
}
JobRunParam jobRunParam = new JobRunParam();
jobRunParam.setSqlPath(sqlPath);
jobRunParam.setCheckPointParam(CheckPointParams.buildCheckPointParam(parameterTool));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.operations.CatalogSinkModifyOperation;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.SinkModifyOperation;
import org.apache.flink.table.operations.command.SetOperation;

/**
Expand Down Expand Up @@ -61,6 +61,7 @@ public static JobID exeSql(List<String> sqlList, TableEnvironment tEnv) {
break;

case "BeginStatementSetOperation":
case "EndStatementSetOperation":
System.out.println("####stmt= " + stmt);
log.info("####stmt={}", stmt);
break;
Expand Down Expand Up @@ -89,10 +90,11 @@ public static JobID exeSql(List<String> sqlList, TableEnvironment tEnv) {
case "NopOperation":
((TableEnvironmentInternal) tEnv).executeInternal(parser.parse(stmt).get(0));
break;
case "CatalogSinkModifyOperation":
modifyOperationList.add((CatalogSinkModifyOperation) operation);
case "SinkModifyOperation":
modifyOperationList.add((SinkModifyOperation) operation);
break;
default:
log.error("不支持此Operation类型 {}", operation.getClass().getSimpleName());
throw new RuntimeException("不支持该语法 sql=" + stmt);
}
}
Expand Down
1 change: 0 additions & 1 deletion flink-streaming-core/src/test/java/Demo.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
tEnv = StreamTableEnvironment.create(env, settings);
Expand Down
1 change: 0 additions & 1 deletion flink-streaming-core/src/test/java/Test.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ public static void main(String[] args) throws IOException {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
tEnv = StreamTableEnvironment.create(env, settings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ public static void explainStmt(List<String> stmtList) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();

Expand Down Expand Up @@ -102,7 +101,6 @@ public static void preCheckSql(List<String> sql) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();

Expand Down
4 changes: 2 additions & 2 deletions flink-streaming-web/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ spring.http.encoding.force=true
mybatis.mapper-locations=classpath:mapper/*.xml
spring.servlet.multipart.max-file-size=1024MB
spring.servlet.multipart.max-request-size=1024MB
####jdbc连接池
####jdbc连接池A
spring.datasource.url=jdbc:mysql://localhost:3306/flink_web?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=false
spring.datasource.username=root
spring.datasource.password=root
spring.datasource.password=root123456
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
spring.datasource.druid.initial-size=5
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
<java_target_version>1.8</java_target_version>
<file_encoding>UTF-8</file_encoding>
<flink_streaming_version>1.5.0.RELEASE</flink_streaming_version>
<flink.version>1.14.3</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<flink.version>1.15.3</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<spring.boot.version>2.2.8.RELEASE</spring.boot.version>
</properties>

Expand Down

0 comments on commit 88b130b

Please sign in to comment.