Skip to content

Commit

Permalink
Merge pull request #50 from zhp8341/dev
Browse files Browse the repository at this point in the history
1、支持离线批任务 2、jar任务支持savepoint
  • Loading branch information
zhp8341 authored Jun 2, 2021
2 parents 4cec9e0 + b79aa5a commit a3e1836
Show file tree
Hide file tree
Showing 25 changed files with 1,141 additions and 86 deletions.
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ https://xie.infoq.cn/article/1af0cb75be056fea788e6c86b

**主要功能**:包含任务配置、启/停任务、告警、日志等功能,支持sql语法提示,格式化、sql语句校验。

**目的**:减少开发、降低成本 完全实现sql化 流计算任务。
**目的**:减少开发、降低成本 完全实现sql化 流计算任务。 😂

由于前端UI是本人自己写的,不够专业,样式有点丑 请多多包含!!!

### 1、主要功能

Expand All @@ -30,6 +32,7 @@ https://xie.infoq.cn/article/1af0cb75be056fea788e6c86b
* **[7] 支持自定义Jar提交任务。**
* **[8] 支持多版本flink版本(需要用户编译对应flink版本)。**
* **[9] 支持自动、手动savepoint备份,并且从savepoint恢复任务。**
* **[10] 支持批任务如:hive。**

**目前flink版本已经升级到1.12**

Expand Down Expand Up @@ -80,6 +83,8 @@ https://xie.infoq.cn/article/1af0cb75be056fea788e6c86b

8、[catalog 使用示例](/docs/catalog.md)

9、[hive批任务 使用示例](/docs/sql_demo/demo_batch.md)


### 3.2 hello-word demo

Expand Down
146 changes: 146 additions & 0 deletions docs/manual-batch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@

### 1、新增任务配置说明

a: 任务名称(*必选)
~~~~
任务名称不能超过50个字符 并且 任务名称仅能含数字,字母和下划线
~~~~

b: 运行模式

YARN_PER( yarn独立模式 https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/deployment/yarn_setup.html#run-a-single-flink-job-on-yarn)


STANDALONE(独立集群 https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/deployment/cluster_setup.html)


LOCAL(本地集群 https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/deployment/local.html )


<font color=red size=5>LOCAL 需要在本地单机启动flink 服务 ./bin/start-cluster.sh </font>


c: flink运行配置

<font color=red size=5>1、YARN_PER模式 </font>

~~~~
参数(和官方保持一致)但是只支持 -yD -p -yjm -yn -ytm -ys -yqu(必选)
-ys slot个数。
-yn task manager 数量。
-yjm job manager 的堆内存大小。
-ytm task manager 的堆内存大小。
-yqu yarn队列明
-p 并行度
-yD 如-yD taskmanager.heap.mb=518
详见官方文档
如: -yqu flink -yjm 1024m -ytm 2048m -p 1 -ys 1
~~~~

<font color=red size=5>2、LOCAL模式 </font>
~~~~
无需配置
~~~~


<font color=red size=5>3、STANDALONE模式 </font>
~~~~
-d,--detached If present, runs the job in detached
mode
-p,--parallelism <parallelism> The parallelism with which to run the
program. Optional flag to override the
default value specified in the
configuration.
-s,--fromSavepoint <savepointPath> Path to a savepoint to restore the job
from (for example
hdfs:///flink/savepoint-1537).
其他运行参数可通过 flink -h查看
~~~~





d: 三方地址
~~~~
填写连接器或者udf等jar
如:
http://ccblog.cn/jars/flink-connector-jdbc_2.11-1.12.0.jar
http://ccblog.cn/jars/flink-sql-connector-kafka_2.11-1.12.0.jar
http://ccblog.cn/jars/flink-streaming-udf.jar
http://ccblog.cn/jars/mysql-connector-java-5.1.25.jar
地址填写后 udf可以在sql语句里面直接写
CREATE FUNCTION jsonHasKey as 'com.xx.udf.JsonHasKeyUDF';
~~~~
![图片](http://img.ccblog.cn/flink/9.png)

多个url使用换行


udf 开发demo 详见 [https://github.com/zhp8341/flink-streaming-udf](https://github.com/zhp8341/flink-streaming-udf)


e: sql语句
#### 提前先在hive test的库下创建好test的表
~~~~
create table test(
id int,
name string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';
~~~~


~~~~sql
CREATE CATALOG testmyhive WITH (
'type' = 'hive',
'default-database' = 'test',
'hive-conf-dir' = '/alidata/server/zhp/catalog/config'
);
USE CATALOG testmyhive;

insert into test.test values(4,'n2');

~~~~


### 2、系统设置

~~~~
系统设置有三个必选项
1、flink-streaming-platform-web应用安装的目录(必选)
这个是应用的安装目录
如 /root/flink-streaming-platform-web/
2、flink安装目录(必选)
--flink客户端的目录 如: /usr/local/flink-1.12.0/
3、yarn的rm Http地址
--hadoop yarn的rm Http地址 http://hadoop003:8088/
4、flink_rest_http_address
LOCAL模式使用 flink http的地址
5、flink_rest_ha_http_address
STANDALONE模式 支持HA的 可以填写多个地址 ;用分隔
~~~~


![图片](http://img.ccblog.cn/flink/5.png)









21 changes: 21 additions & 0 deletions docs/sql_demo/demo_batch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#### 提前先在hive test的库下创建好test的表
~~~~
create table test(
id int,
name string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';
~~~~


~~~~sql
CREATE CATALOG testmyhive WITH (
'type' = 'hive',
'default-database' = 'test',
'hive-conf-dir' = '/alidata/server/zhp/catalog/config'
);
USE CATALOG testmyhive;

insert into test.test values(4,'n2');

~~~~
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.flink.streaming.web.enums;
package com.flink.streaming.common.enums;

import lombok.Getter;

Expand All @@ -11,7 +11,7 @@
@Getter
public enum JobTypeEnum {

SQL(0), JAR(1);
SQL_STREAMING(0), JAR(1), SQL_BATCH(2);

private int code;

Expand All @@ -31,4 +31,5 @@ public static JobTypeEnum getJobTypeEnum(Integer code) {

return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@


import com.flink.streaming.common.constant.SystemConstant;
import com.flink.streaming.common.enums.JobTypeEnum;
import com.flink.streaming.common.model.SqlCommandCall;
import com.flink.streaming.common.sql.SqlFileParser;
import com.flink.streaming.core.checkpoint.CheckPointParams;
import com.flink.streaming.core.checkpoint.FsCheckPoint;
import com.flink.streaming.core.execute.ExecuteSql;
import com.flink.streaming.core.model.JobRunParam;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.calcite.shaded.com.google.common.base.Preconditions;
Expand All @@ -31,7 +33,6 @@
* @date 2020-06-23
* @time 00:33
*/

public class JobApplication {

private static final Logger log = LoggerFactory.getLogger(JobApplication.class);
Expand All @@ -43,38 +44,52 @@ public static void main(String[] args) {

JobRunParam jobRunParam = buildParam(args);

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();

TableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

//设置checkPoint
FsCheckPoint.setCheckpoint(env, jobRunParam.getCheckPointParam());

List<String> sql = Files.readAllLines(Paths.get(jobRunParam.getSqlPath()));

List<SqlCommandCall> sqlCommandCallList = SqlFileParser.fileToSql(sql);

StatementSet statementSet = tEnv.createStatementSet();
EnvironmentSettings settings = null;

TableEnvironment tEnv = null;

if (jobRunParam.getJobTypeEnum() != null && JobTypeEnum.SQL_BATCH.equals(jobRunParam.getJobTypeEnum())) {
log.info("[SQL_BATCH]本次任务是批任务");
//批处理
settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inBatchMode()
.build();
tEnv = TableEnvironment.create(settings);
} else {
log.info("[SQL_STREAMING]本次任务是流任务");
//默认是流 流处理 目的是兼容之前版本
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
tEnv = StreamTableEnvironment.create(env, settings);
//设置checkPoint
FsCheckPoint.setCheckpoint(env, jobRunParam.getCheckPointParam());

ExecuteSql.exeSql(sqlCommandCallList, tEnv, statementSet);
}

StatementSet statementSet = tEnv.createStatementSet();

ExecuteSql.exeSql(sqlCommandCallList, tEnv, statementSet);

TableResult tableResult = statementSet.execute();
if (tableResult == null || tableResult.getJobClient().get() == null ||
tableResult.getJobClient().get().getJobID() == null) {

if (tableResult == null || tableResult.getJobClient().get() == null
|| tableResult.getJobClient().get().getJobID() == null) {
throw new RuntimeException("任务运行失败 没有获取到JobID");
}
JobID jobID=tableResult.getJobClient().get().getJobID();
JobID jobID = tableResult.getJobClient().get().getJobID();

System.out.println(SystemConstant.QUERY_JOBID_KEY_WORD + jobID);
System.out.println(SystemConstant.QUERY_JOBID_KEY_WORD + jobID);

log.info(SystemConstant.QUERY_JOBID_KEY_WORD + "{}",jobID);
log.info(SystemConstant.QUERY_JOBID_KEY_WORD + "{}", jobID);

} catch (Exception e) {
System.err.println("任务执行失败:" + e.getMessage());
Expand All @@ -92,6 +107,10 @@ private static JobRunParam buildParam(String[] args) throws Exception {
JobRunParam jobRunParam = new JobRunParam();
jobRunParam.setSqlPath(sqlPath);
jobRunParam.setCheckPointParam(CheckPointParams.buildCheckPointParam(parameterTool));
String type = parameterTool.get("type");
if (StringUtils.isNotEmpty(type)) {
jobRunParam.setJobTypeEnum(JobTypeEnum.getJobTypeEnum(Integer.valueOf(type)));
}
return jobRunParam;
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.flink.streaming.core.model;

import com.flink.streaming.common.enums.JobTypeEnum;
import com.flink.streaming.common.model.CheckPointParam;
import lombok.Data;

Expand All @@ -16,6 +17,10 @@ public class JobRunParam {
*/
private String sqlPath;

/**
* 任务类型
*/
private JobTypeEnum jobTypeEnum;

/**
* CheckPoint 参数
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ public class MessageConstants {

public static final String MESSAGE_009 = "没有获取到savepointPath路径目录 任务:{}";

public static final String MESSAGE_010 = "无法获取 user.name";

public static final String MESSAGE_010 = "批任务不支持savePoint 任务:{}";

public static final String MESSAGE_011 = "无法获取 user.name";


}
Loading

0 comments on commit a3e1836

Please sign in to comment.