Skip to content

Commit

Permalink
fix:stop任务
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuhp committed Oct 29, 2020
1 parent e76777d commit c5e470b
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 8 deletions.
18 changes: 15 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -233,12 +233,24 @@ udf 开发demo 详见 [https://github.com/zhp8341/flink-streaming-udf](https://

## 三、配置demo

详见 [https://github.com/zhp8341/flink-streaming-platform-web/tree/master/docs/sql_demo](https://github.com/zhp8341/flink-streaming-platform-web/tree/master/docs/sql_demo)


[demo1 单流kafka写入mysqld 参考 ](https://github.com/zhp8341/flink-streaming-platform-web/tree/master/docs/sql_demo/demo_1.md)

[demo2 双流kafka写入mysql 参考](https://github.com/zhp8341/flink-streaming-platform-web/tree/master/docs/sql_demo/demo_2.md)

[demo3 kafka和mysql维表实时关联写入mysql 参考](https://github.com/zhp8341/flink-streaming-platform-web/tree/master/docs/sql_demo/demo_3.md)

[demo4 滚动窗口 ](https://github.com/zhp8341/flink-streaming-platform-web/tree/master/docs/sql_demo/demo_4.md)

[demo5 滑动窗口](https://github.com/zhp8341/flink-streaming-platform-web/tree/master/docs/sql_demo/demo_5.md)




```sql

CREATE FUNCTION jsonHasKey as com.yt.udf.JsonHasKeyUDF;
CREATE FUNCTION jsonHasKey as com.xx.udf.JsonHasKeyUDF;
-- 如果使用udf 函数必须配置 udf地址

create table flink_test_6 (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ public static void main(String[] args) throws Exception {
try {
JobRunParam jobRunParam = buildParam(args);


StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Expand Down Expand Up @@ -95,9 +94,6 @@ public static void main(String[] args) throws Exception {

}




/**
* 设置Configuration
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public String getAppIdByYarn(String jobName,String queueName) {

@Override
public void stopJobByJobId(String appId) {
log.info("执行stopJobByJobId appId={}",appId);
if (StringUtils.isEmpty(appId)) {
throw new BizException(SysErrorEnum.PARAM_IS_NULL_YARN_APPID);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void checkYarnJobByStop() {
continue;
}
if (!StringUtils.isEmpty(appId)) {
jobServerAO.stop(jobConfigDTO.getId(), "sys");
httpRequestAdapter.stopJobByJobId(appId);
alart(SystemConstants.buildDingdingMessage("kill掉yarn上任务保持数据一致性 任务名称:" +
jobConfigDTO.getJobName()), jobConfigDTO.getId());
}
Expand Down

0 comments on commit c5e470b

Please sign in to comment.