Skip to content

Commit

Permalink
yarn per模式新增 -yD参数 已经停止接口替换
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuhp committed Nov 25, 2020
1 parent 7f04d85 commit 482168a
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 15 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -180,13 +180,14 @@ c: flink运行配置

~~~~
参数(和官方保持一致)但是只支持 -p -yjm -yn -ytm -ys -yqu(必选)
参数(和官方保持一致)但是只支持 -yD -p -yjm -yn -ytm -ys -yqu(必选)
-ys slot个数。
-yn task manager 数量。
-yjm job manager 的堆内存大小。
-ytm task manager 的堆内存大小。
-yqu yarn队列明
-p 并行度
-yD 如-yD taskmanager.heap.mb=518
详见官方文档
如: -yqu flink -yjm 1024m -ytm 2048m -p 1 -ys 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void start(Long id, Long savepointId, String userName) {
}
//TODO
// if (StringUtils.isNotEmpty(jobConfigDTO.getJobId())) {
// httpRequestAdapter.stopJobByJobId(jobConfigDTO.getJobId());
//
// }

Map<String, String> systemConfigMap = SystemConfigDTO.toMap(systemConfigService.getSystemConfig(SysConfigEnumType.SYS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
@Slf4j
public class JobYarnServerAOImpl implements JobServerAO {

//最大重试次数
private static final Integer tryTimes = 3;

@Autowired
private JobConfigService jobConfigService;
Expand Down Expand Up @@ -93,7 +95,7 @@ public void start(Long id, Long savepointId, String userName) {
throw new BizException("请先开启任务");
}
if (StringUtils.isNotEmpty(jobConfigDTO.getJobId())) {
httpRequestAdapter.stopJobByJobId(jobConfigDTO.getJobId());
this.stop(jobConfigDTO);
}
try {
String queueName = YarnUtil.getQueueName(jobConfigDTO.getFlinkRunConfig());
Expand Down Expand Up @@ -171,7 +173,7 @@ public void stop(Long id, String userName) {
}

//2、停止任务
httpRequestAdapter.stopJobByJobId(jobConfigDTO.getJobId());
this.stop(jobConfigDTO);

JobConfigDTO jobConfig = new JobConfigDTO();
jobConfig.setStauts(JobConfigStatus.STOP);
Expand All @@ -190,7 +192,7 @@ public void savepoint(Long id) {
throw new BizException(SysErrorEnum.JOB_CONFIG_JOB_IS_NOT_EXIST);
}
if (StringUtils.isEmpty(jobConfigDTO.getFlinkCheckpointConfig())) {
log.warn(" FlinkCheckpointConfig is error jobConfigDTO={}", jobConfigDTO);
log.warn(" FlinkCheckpointConfig is null jobConfigDTO={}", jobConfigDTO);
return;
}
if (StringUtils.isEmpty(jobConfigDTO.getJobId())) {
Expand Down Expand Up @@ -354,4 +356,20 @@ private void checkSysConfig(Map<String, String> systemConfigMap) {
throw new BizException(SysErrorEnum.SYSTEM_CONFIG_IS_NULL_FLINK_STREAMING_PLATFORM_WEB_HOME);
}
}


private void stop(JobConfigDTO jobConfigDTO){
Integer retryNum = 1;
while (retryNum <= tryTimes) {
JobYarnInfo jobYarnInfo= flinkHttpRequestAdapter.getJobInfoForPerYarnByAppId(jobConfigDTO.getJobId());
if (jobYarnInfo != null && "RUNNING".equals(jobYarnInfo.getStatus())) {
log.info("执行停止操作 jobYarnInfo={} retryNum={} id={}",jobYarnInfo,retryNum,jobConfigDTO.getJobId());
flinkHttpRequestAdapter.cancelJobForYarnByAppId(jobConfigDTO.getJobId(), jobYarnInfo.getId());
}else {
log.info("任务已经停止 jobYarnInfo={} id={}",jobYarnInfo,jobConfigDTO.getJobId());
break;
}
retryNum++;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,17 @@ public void checkYarnJobByStop() {
if (SysErrorEnum.YARN_CODE.getCode().equals(be.getCode())) {
continue;
}
log.error("getAppIdByYarn is error ", be);
log.error("[BizException]getAppIdByYarn is error ", be);
} catch (Exception e) {
log.error("getAppIdByYarn is error ", e);
log.error("[Exception]getAppIdByYarn is error ", e);
continue;
}
if (!StringUtils.isEmpty(appId)) {
httpRequestAdapter.stopJobByJobId(appId);
JobYarnInfo jobYarnInfo= flinkHttpRequestAdapter.getJobInfoForPerYarnByAppId(appId);
if (jobYarnInfo != null && "RUNNING".equals(jobYarnInfo.getStatus())) {
log.info("执行停止操作 jobYarnInfo={} id={}",jobYarnInfo,appId);
flinkHttpRequestAdapter.cancelJobForYarnByAppId(appId, jobYarnInfo.getId());
}
alart(SystemConstants.buildDingdingMessage("kill掉yarn上任务保持数据一致性 任务名称:" +
jobConfigDTO.getJobName()), jobConfigDTO.getId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public static RestResult checkFlinkRunConfig(String flinkRunConfig) {
}
} catch (UnrecognizedOptionException e) {
log.error("checkFlinkRunConfig is error", e);
return RestResult.error("flink运行配置参数校验通不过,不允许使用参数:" + e.getOption() + " 参数只支持 -p -yjm -yn -ytm -ys -yqu");
return RestResult.error("flink运行配置参数校验通不过,不允许使用参数:" + e.getOption() + " 参数只支持 -p -yjm -yn -ytm -ys -yqu -yD");
} catch (Exception e) {
log.error("checkFlinkRunConfig is error", e);
return RestResult.error("flink运行配置参数校验通不过");
Expand Down Expand Up @@ -124,6 +124,7 @@ public static CommandLine getFlinkRunByCli(String flinkRunConfig) throws ParseEx
options.addOption("yn", false, "");
options.addOption("ytm", false, "");
options.addOption("ys", false, "");
options.addOption("yD", false, "");
options.addOption(SystemConstants.YQU, true, "");
CommandLineParser parser = new DefaultParser();
return parser.parse(options, config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ private static RestResult checkFlinkRunConfig(String flinkRunConfig) {
options.addOption("yn", false, "");
options.addOption("ytm", false, "");
options.addOption("ys", false, "");
options.addOption("yD", false, "");
new DefaultParser().parse(options, config);
} catch (UnrecognizedOptionException e) {
e.printStackTrace();
return RestResult.error("flink运行配置参数校验通不过,不允许使用参数:" + e.getOption() + " 参数只支持 -p -yjm -yn -ytm -ys");
return RestResult.error("flink运行配置参数校验通不过,不允许使用参数:" + e.getOption() + " 参数只支持 -p -yjm -yn -ytm -ys -yD");
} catch (Exception e) {
e.printStackTrace();
return RestResult.error("flink运行配置参数校验通不过");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,6 @@ public void getAppId() {
System.out.println(appId);
}

@Test
public void stopJobByJobId() {
httpRequestAdapter.stopJobByJobId("application_1592398631005_10004");
}

@Test
public void getJobStateByJobId() {
YarnStateEnum yarnStateEnum = httpRequestAdapter.getJobStateByJobId("application_1592398631005_10004");
Expand Down

0 comments on commit 482168a

Please sign in to comment.