Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rest): 增加使用 rest api 提交sql 任务 #54

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,7 @@ Icon
Network Trash Folder
Temporary Items
.apdisk

lib/flink-streaming-core.jar
sql
data
24 changes: 24 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
FROM openjdk:8

WORKDIR /opt/flink

WORKDIR application

ENV MYSQL_HOST="127.0.0.1" \
MYSQL_PORT="3306" \
MYSQL_USERNAME="root" \
MYSQL_PASSWORD="root" \
SERVER_PORT=8080 \
LOG_HOME="/application/logs" \
JVM_OPTS="" \
SPRING_ENV="prod"

COPY flink-streaming-core/target/flink-streaming-core.jar /application/lib/flink-streaming-core.jar

COPY flink-streaming-web/target/flink-streaming-web.jar /application/flink-streaming-web.jar

CMD java $JAVA_OPTS \
-jar /application/flink-streaming-web.jar \
--spring.profiles.active=${SPRING_ENV} --server.port=${SERVER_PORT}


7 changes: 7 additions & 0 deletions build-docker.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/bin/sh

cd `dirname $0 `

mvn clean package

docker build -t flink-streaming-web:0.0.1 .
39 changes: 39 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
version: '3'
services:
flink-stream-web:
image: flink-streaming-web:0.0.1
ports:
- "8080:8080"
environment:
MYSQL_HOST: 192.168.200.159
MYSQL_PORT: 3306
MYSQL_USERNAME: root
MYSQL_PASSWORD: 123456
JVM_OPTS: ""
SPRING_ENV: prod
SERVER_PORT: 8080
volumes:
- ./data/sql:/application/sql
jobmanager:
image: flink:1.12.0-scala_2.11
ports:
- "8081:8081"
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
volumes:
# - ./data/flink/:/opt/flink/
- ./data/sql:/application/sql
taskmanager:
image: flink:1.12.0-scala_2.11
depends_on:
- jobmanager
command: taskmanager
scale: 1
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 2
8 changes: 8 additions & 0 deletions docs/manual-sql.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ b: 运行模式

LOCAL(本地集群 https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/deployment/local.html )

REST(使用 rest api 进行任务提交)

<font color=red size=5>LOCAL 需要在本地单机启动flink 服务 ./bin/start-cluster.sh </font>

Expand Down Expand Up @@ -62,7 +63,14 @@ c: flink运行配置
其他运行参数可通过 flink -h查看
~~~~

<font color=red size=5>4、REST模式 </font>

- sql 目录和jobmanager挂载到一起,使 sql 文件可以直接写到 jobmanager 中
- 使用flink 的 rest 接口提交任务(预先将 core.jar 提交到 flink 中记录下 jarId)
```bash
curl -X POST -H "Expect:" -F "jarfile=@/Users/earthchen/study/flink/flink-streaming-platform-web/lib/flink-streaming-core.jar" http://127.0.0.1:8081/jars/upload
```
- 将 jarid 配置到系统配置中


d: Checkpoint信息
Expand Down
3 changes: 2 additions & 1 deletion docs/sql/flink_web.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@

create database if not exists `flink_web` ;
use `flink_web`;
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.flink.streaming.web.common;

/**
* @author zhuhuipei
* @Description:
* @date 2020-09-18
* @time 23:55
*/
public class FlinkRestUriConstants {

public final static String JARS = "jars";
public static String getUriJobsUploadForStandalone() {
return JARS + "/upload";
}

public static String getUriJobsRunForStandalone() {
return JARS + "/upload";
}

public static String getUriJobsRunForStandalone(String jobId) {
return JARS + "/" + jobId + "/run";
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.flink.streaming.web.enums.DeployModeEnum;
import com.flink.streaming.web.model.dto.JobConfigDTO;
import com.flink.streaming.web.model.dto.JobRunParamDTO;
import com.flink.streaming.web.model.flink.JobRunRequestInfo;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

Expand All @@ -17,7 +18,7 @@
@Slf4j
public class CommandUtil {

private static final String APP_CLASS_NAME = "com.flink.streaming.core.JobApplication";
public static final String APP_CLASS_NAME = "com.flink.streaming.core.JobApplication";

/**
* 本地/Standalone Cluster模式
Expand Down Expand Up @@ -67,6 +68,42 @@ public static String buildRunCommandForCluster(JobRunParamDTO jobRunParamDTO,
return command.toString();
}


public static JobRunRequestInfo buildRunRestJobInfoForCluster(JobRunParamDTO jobRunParamDTO,
JobConfigDTO jobConfigDTO,String savepointPath) throws Exception {
JobRunRequestInfo requestInfo = new JobRunRequestInfo();
// todo parallelism allowNonRestoredState 参数从哪传
requestInfo.setEntryClass(jobConfigDTO.getCustomMainClass());
if (StringUtils.isNotBlank(savepointPath)) {
requestInfo.setSavepointPath(savepointPath);
}
StringBuilder command = new StringBuilder();

if (StringUtils.isNotEmpty(jobConfigDTO.getExtJarPath())) {
String[] urls = jobConfigDTO.getExtJarPath().split(SystemConstant.LINE_FEED);
for (String url : urls) {
command.append(" -C ").append(url.trim()).append(" ");
}
}
switch (jobConfigDTO.getJobTypeEnum()) {
case SQL_BATCH:
case SQL_STREAMING:
requestInfo.setEntryClass(CommandUtil.APP_CLASS_NAME);
command.append(" -sql ").append(jobRunParamDTO.getSqlPath()).append(" ");
if (StringUtils.isNotEmpty(jobRunParamDTO.getFlinkCheckpointConfig())) {
command.append(" ").append(jobRunParamDTO.getFlinkCheckpointConfig());
}
command.append(" -type ").append(jobConfigDTO.getJobTypeEnum().getCode()).append(" ");
break;
case JAR:
command.append(" ").append(jobConfigDTO.getCustomArgs());
break;
}
requestInfo.setProgramArgs(command.toString());
log.info("buildRestRunJobInfo={}", requestInfo);
return requestInfo;
}

/**
* jar并且构建运行命令
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
* @time 20:41
*/
public enum DeployModeEnum {
YARN_PER, STANDALONE, LOCAL;
YARN_PER, STANDALONE, LOCAL, REST;

public static DeployModeEnum getModel(String model) {
if (StringUtils.isEmpty(model)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ public enum SysConfigEnum {
"flink Rest & web frontend 地址(Local Cluster模式)",
SysConfigEnumType.SYS.name()),

FLINK_SQL_CORE_JAR_ID("flink_rest_sql_core_jar_id", "sql core jar 提交到 flink 后返回的 jarId",
SysConfigEnumType.SYS.name()),


FLINK_REST_HA_HTTP_ADDRESS("flink_rest_ha_http_address",
"flink Rest & web frontend HA 地址(Standalone Cluster模式 支持HA 可以填写多个地址 ;用分隔)",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ public JobRunParamDTO(String flinkBinPath,
this.flinkCheckpointConfig = flinkCheckpointConfig;
}

public static JobRunParamDTO buildJobRunParam(Map<String, String> systemConfigMap, JobConfigDTO jobConfigDTO, String sqlPath) {
public static JobRunParamDTO buildJobRunParam(Map<String, String> systemConfigMap, JobConfigDTO jobConfigDTO,
String sqlPath) {

String flinkBinPath = SystemConstants.buildFlinkBin(systemConfigMap.get(SysConfigEnum.FLINK_HOME.getKey()));

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package com.flink.streaming.web.model.flink;

/**
* @author earthchen
* @date 2021/6/24
**/
public class JobRunRequestInfo {

private String entryClass;

private String programArgs;

private Integer parallelism;

private String savepointPath;

private String allowNonRestoredState;

public String getEntryClass() {
return entryClass;
}

public void setEntryClass(String entryClass) {
this.entryClass = entryClass;
}

public String getProgramArgs() {
return programArgs;
}

public void setProgramArgs(String programArgs) {
this.programArgs = programArgs;
}

public Integer getParallelism() {
return parallelism;
}

public void setParallelism(Integer parallelism) {
this.parallelism = parallelism;
}

public String getSavepointPath() {
return savepointPath;
}

public void setSavepointPath(String savepointPath) {
this.savepointPath = savepointPath;
}

public String getAllowNonRestoredState() {
return allowNonRestoredState;
}

public void setAllowNonRestoredState(String allowNonRestoredState) {
this.allowNonRestoredState = allowNonRestoredState;
}

@Override
public String toString() {
return "JobRunInfo{" +
"entryClass='" + entryClass + '\'' +
", programArgs='" + programArgs + '\'' +
", parallelism=" + parallelism +
", savepointPath='" + savepointPath + '\'' +
", allowNonRestoredState='" + allowNonRestoredState + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.flink.streaming.web.rpc.CommandRpcClinetAdapter;
import com.flink.streaming.web.rpc.FlinkRestRpcAdapter;
import com.flink.streaming.web.rpc.YarnRestRpcAdapter;
import com.flink.streaming.web.model.flink.JobRunRequestInfo;
import com.flink.streaming.web.rpc.model.JobStandaloneInfo;
import com.flink.streaming.web.service.JobConfigService;
import com.flink.streaming.web.service.JobRunLogService;
Expand Down Expand Up @@ -87,7 +88,8 @@ public void checkStart(JobConfigDTO jobConfigDTO) {
break;
}

Map<String, String> systemConfigMap = SystemConfigDTO.toMap(systemConfigService.getSystemConfig(SysConfigEnumType.SYS));
Map<String, String> systemConfigMap =
SystemConfigDTO.toMap(systemConfigService.getSystemConfig(SysConfigEnumType.SYS));
this.checkSysConfig(systemConfigMap, jobConfigDTO.getDeployModeEnum());


Expand Down Expand Up @@ -150,10 +152,12 @@ public Long insertJobRunLog(JobConfigDTO jobConfigDTO, String userName) {
@Override
public JobRunParamDTO writeSqlToFile(JobConfigDTO jobConfigDTO) {

Map<String, String> systemConfigMap = SystemConfigDTO.toMap(systemConfigService.getSystemConfig(SysConfigEnumType.SYS));
Map<String, String> systemConfigMap =
SystemConfigDTO.toMap(systemConfigService.getSystemConfig(SysConfigEnumType.SYS));

String sqlPath = FileUtils.getSqlHome(systemConfigMap.get(SysConfigEnum.FLINK_STREAMING_PLATFORM_WEB_HOME.getKey()))
+ FileUtils.createFileName(String.valueOf(jobConfigDTO.getId()));
String sqlPath =
FileUtils.getSqlHome(systemConfigMap.get(SysConfigEnum.FLINK_STREAMING_PLATFORM_WEB_HOME.getKey()))
+ FileUtils.createFileName(String.valueOf(jobConfigDTO.getId()));
FileUtils.writeText(sqlPath, jobConfigDTO.getFlinkSql(), Boolean.FALSE);

return JobRunParamDTO.buildJobRunParam(systemConfigMap, jobConfigDTO, sqlPath);
Expand Down Expand Up @@ -196,10 +200,17 @@ public void run() {
case LOCAL:
case STANDALONE:
//1、构建执行命令
command = CommandUtil.buildRunCommandForCluster(jobRunParamDTO, jobConfigDTO, savepointPath);
command = CommandUtil.buildRunCommandForCluster(jobRunParamDTO, jobConfigDTO,
savepointPath);
//2、提交任务
appId = this.submitJobForStandalone(command, jobConfigDTO, localLog);

break;
case REST:
// todo 提交任务到 flink 中
JobRunRequestInfo requestInfo = CommandUtil.buildRunRestJobInfoForCluster(jobRunParamDTO,
jobConfigDTO, savepointPath);
appId = this.submitJobForStandaloneRpc(jobConfigDTO, localLog, requestInfo);
break;
}

Expand Down Expand Up @@ -321,7 +332,30 @@ private void updateStatusAndLog(JobConfigDTO jobConfig, Long jobRunLogId,
private String submitJobForStandalone(String command, JobConfigDTO jobConfig, StringBuilder localLog)
throws Exception {

String appId = commandRpcClinetAdapter.submitJob(command, localLog, jobRunLogId, jobConfig.getDeployModeEnum());
String appId = commandRpcClinetAdapter.submitJob(command, localLog, jobRunLogId,
jobConfig.getDeployModeEnum());
JobStandaloneInfo jobStandaloneInfo = flinkRestRpcAdapter.getJobInfoForStandaloneByAppId(appId,
jobConfig.getDeployModeEnum());

if (jobStandaloneInfo == null || StringUtils.isNotEmpty(jobStandaloneInfo.getErrors())) {
log.error("[submitJobForStandalone] is error jobStandaloneInfo={}", jobStandaloneInfo);
localLog.append("\n 任务失败 appId=" + appId);
throw new BizException("任务失败");
} else {
if (!SystemConstants.STATUS_RUNNING.equals(jobStandaloneInfo.getState())) {
localLog.append("\n 任务失败 appId=" + appId).append("状态是:" + jobStandaloneInfo.getState());
throw new BizException("[submitJobForStandalone]任务失败");
}
}
return appId;
}

private String submitJobForStandaloneRpc(JobConfigDTO jobConfig,
StringBuilder localLog, JobRunRequestInfo requestInfo) {

String appId =
flinkRestRpcAdapter.runJarByJarId(systemConfigService.getSystemConfigByKey(SysConfigEnum.FLINK_SQL_CORE_JAR_ID.getKey()),
requestInfo, jobConfig.getDeployModeEnum());
JobStandaloneInfo jobStandaloneInfo = flinkRestRpcAdapter.getJobInfoForStandaloneByAppId(appId,
jobConfig.getDeployModeEnum());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,8 @@ private JobServerAO getJobServerAO(Long id) {
case STANDALONE:
log.info(" STANDALONE模式启动 {}", deployModeEnum);
return jobStandaloneServerAO;
case REST:
return jobStandaloneServerAO;
default:
throw new RuntimeException("不支持该模式系统");
}
Expand Down
Loading