From 1002b7cae2e07e0ea1322c6a4b315c3521f361d4 Mon Sep 17 00:00:00 2001 From: earthchen Date: Fri, 25 Jun 2021 17:40:43 +0800 Subject: [PATCH] =?UTF-8?q?feat(rest):=20=E5=A2=9E=E5=8A=A0=E4=BD=BF?= =?UTF-8?q?=E7=94=A8=20rest=20api=20=E6=8F=90=E4=BA=A4sql=20=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 4 ++ Dockerfile | 24 +++++++ build-docker.sh | 7 ++ docker-compose.yaml | 39 +++++++++++ docs/manual-sql.md | 8 +++ docs/sql/flink_web.sql | 3 +- .../web/common/FlinkRestUriConstants.java | 25 +++++++ .../web/common/util/CommandUtil.java | 39 ++++++++++- .../streaming/web/enums/DeployModeEnum.java | 2 +- .../streaming/web/enums/SysConfigEnum.java | 3 + .../web/model/dto/JobRunParamDTO.java | 3 +- .../web/model/flink/JobRunRequestInfo.java | 69 +++++++++++++++++++ .../web/ao/impl/JobBaseServiceAOImpl.java | 46 +++++++++++-- .../api/JobConfigApiController.java | 2 + .../web/rpc/FlinkRestRpcAdapter.java | 11 +++ .../web/rpc/impl/FlinkRestRpcAdapterImpl.java | 52 ++++++++++++++ .../streaming/web/rpc/model/JobRunInfo.java | 25 +++++++ .../web/rpc/model/UploadJarInfo.java | 46 +++++++++++++ .../service/impl/SystemConfigServiceImpl.java | 1 + .../resources/application-prod.properties | 5 +- .../src/main/resources/application.properties | 12 ++-- .../templates/screen/job_config/addPage.ftl | 1 + .../templates/screen/job_config/editPage.ftl | 1 + http/flink-rest-client.http | 28 ++++++++ 24 files changed, 437 insertions(+), 19 deletions(-) create mode 100644 Dockerfile create mode 100644 build-docker.sh create mode 100644 docker-compose.yaml create mode 100644 flink-streaming-web-common/src/main/java/com/flink/streaming/web/common/FlinkRestUriConstants.java create mode 100644 flink-streaming-web-common/src/main/java/com/flink/streaming/web/model/flink/JobRunRequestInfo.java create mode 100644 flink-streaming-web/src/main/java/com/flink/streaming/web/rpc/model/JobRunInfo.java create mode 100644 flink-streaming-web/src/main/java/com/flink/streaming/web/rpc/model/UploadJarInfo.java create mode 100644 http/flink-rest-client.http diff --git a/.gitignore b/.gitignore index b8cd3c4a..dfa00c3a 100644 --- a/.gitignore +++ b/.gitignore @@ -140,3 +140,7 @@ Icon Network Trash Folder Temporary Items .apdisk + +lib/flink-streaming-core.jar +sql +data diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 00000000..f8abb2a1 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,24 @@ +FROM openjdk:8 + +WORKDIR /opt/flink + +WORKDIR application + +ENV MYSQL_HOST="127.0.0.1" \ + MYSQL_PORT="3306" \ + MYSQL_USERNAME="root" \ + MYSQL_PASSWORD="root" \ + SERVER_PORT=8080 \ + LOG_HOME="/application/logs" \ + JVM_OPTS="" \ + SPRING_ENV="prod" + +COPY flink-streaming-core/target/flink-streaming-core.jar /application/lib/flink-streaming-core.jar + +COPY flink-streaming-web/target/flink-streaming-web.jar /application/flink-streaming-web.jar + +CMD java $JAVA_OPTS \ + -jar /application/flink-streaming-web.jar \ + --spring.profiles.active=${SPRING_ENV} --server.port=${SERVER_PORT} + + diff --git a/build-docker.sh b/build-docker.sh new file mode 100644 index 00000000..3034d3c6 --- /dev/null +++ b/build-docker.sh @@ -0,0 +1,7 @@ +#!/bin/sh + +cd `dirname $0 ` + +mvn clean package + +docker build -t flink-streaming-web:0.0.1 . \ No newline at end of file diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 00000000..92c4160e --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,39 @@ +version: '3' +services: + flink-stream-web: + image: flink-streaming-web:0.0.1 + ports: + - "8080:8080" + environment: + MYSQL_HOST: 192.168.200.159 + MYSQL_PORT: 3306 + MYSQL_USERNAME: root + MYSQL_PASSWORD: 123456 + JVM_OPTS: "" + SPRING_ENV: prod + SERVER_PORT: 8080 + volumes: + - ./data/sql:/application/sql + jobmanager: + image: flink:1.12.0-scala_2.11 + ports: + - "8081:8081" + command: jobmanager + environment: + - | + FLINK_PROPERTIES= + jobmanager.rpc.address: jobmanager + volumes: +# - ./data/flink/:/opt/flink/ + - ./data/sql:/application/sql + taskmanager: + image: flink:1.12.0-scala_2.11 + depends_on: + - jobmanager + command: taskmanager + scale: 1 + environment: + - | + FLINK_PROPERTIES= + jobmanager.rpc.address: jobmanager + taskmanager.numberOfTaskSlots: 2 diff --git a/docs/manual-sql.md b/docs/manual-sql.md index f544dae3..28bf493c 100644 --- a/docs/manual-sql.md +++ b/docs/manual-sql.md @@ -16,6 +16,7 @@ b: 运行模式 LOCAL(本地集群 https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/deployment/local.html ) + REST(使用 rest api 进行任务提交) LOCAL 需要在本地单机启动flink 服务 ./bin/start-cluster.sh @@ -62,7 +63,14 @@ c: flink运行配置 其他运行参数可通过 flink -h查看 ~~~~ +4、REST模式 +- sql 目录和jobmanager挂载到一起,使 sql 文件可以直接写到 jobmanager 中 +- 使用flink 的 rest 接口提交任务(预先将 core.jar 提交到 flink 中记录下 jarId) +```bash +curl -X POST -H "Expect:" -F "jarfile=@/Users/earthchen/study/flink/flink-streaming-platform-web/lib/flink-streaming-core.jar" http://127.0.0.1:8081/jars/upload +``` +- 将 jarid 配置到系统配置中 d: Checkpoint信息 diff --git a/docs/sql/flink_web.sql b/docs/sql/flink_web.sql index c2a38411..7e5f5d38 100644 --- a/docs/sql/flink_web.sql +++ b/docs/sql/flink_web.sql @@ -1,4 +1,5 @@ - +create database if not exists `flink_web` ; +use `flink_web`; SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS = 0; diff --git a/flink-streaming-web-common/src/main/java/com/flink/streaming/web/common/FlinkRestUriConstants.java b/flink-streaming-web-common/src/main/java/com/flink/streaming/web/common/FlinkRestUriConstants.java new file mode 100644 index 00000000..c961fc7e --- /dev/null +++ b/flink-streaming-web-common/src/main/java/com/flink/streaming/web/common/FlinkRestUriConstants.java @@ -0,0 +1,25 @@ +package com.flink.streaming.web.common; + +/** + * @author zhuhuipei + * @Description: + * @date 2020-09-18 + * @time 23:55 + */ +public class FlinkRestUriConstants { + + public final static String JARS = "jars"; + public static String getUriJobsUploadForStandalone() { + return JARS + "/upload"; + } + + public static String getUriJobsRunForStandalone() { + return JARS + "/upload"; + } + + public static String getUriJobsRunForStandalone(String jobId) { + return JARS + "/" + jobId + "/run"; + } + + +} diff --git a/flink-streaming-web-common/src/main/java/com/flink/streaming/web/common/util/CommandUtil.java b/flink-streaming-web-common/src/main/java/com/flink/streaming/web/common/util/CommandUtil.java index 0a0eb4b2..29e85602 100644 --- a/flink-streaming-web-common/src/main/java/com/flink/streaming/web/common/util/CommandUtil.java +++ b/flink-streaming-web-common/src/main/java/com/flink/streaming/web/common/util/CommandUtil.java @@ -5,6 +5,7 @@ import com.flink.streaming.web.enums.DeployModeEnum; import com.flink.streaming.web.model.dto.JobConfigDTO; import com.flink.streaming.web.model.dto.JobRunParamDTO; +import com.flink.streaming.web.model.flink.JobRunRequestInfo; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -17,7 +18,7 @@ @Slf4j public class CommandUtil { - private static final String APP_CLASS_NAME = "com.flink.streaming.core.JobApplication"; + public static final String APP_CLASS_NAME = "com.flink.streaming.core.JobApplication"; /** * 本地/Standalone Cluster模式 @@ -67,6 +68,42 @@ public static String buildRunCommandForCluster(JobRunParamDTO jobRunParamDTO, return command.toString(); } + + public static JobRunRequestInfo buildRunRestJobInfoForCluster(JobRunParamDTO jobRunParamDTO, + JobConfigDTO jobConfigDTO,String savepointPath) throws Exception { + JobRunRequestInfo requestInfo = new JobRunRequestInfo(); + // todo parallelism allowNonRestoredState 参数从哪传 + requestInfo.setEntryClass(jobConfigDTO.getCustomMainClass()); + if (StringUtils.isNotBlank(savepointPath)) { + requestInfo.setSavepointPath(savepointPath); + } + StringBuilder command = new StringBuilder(); + + if (StringUtils.isNotEmpty(jobConfigDTO.getExtJarPath())) { + String[] urls = jobConfigDTO.getExtJarPath().split(SystemConstant.LINE_FEED); + for (String url : urls) { + command.append(" -C ").append(url.trim()).append(" "); + } + } + switch (jobConfigDTO.getJobTypeEnum()) { + case SQL_BATCH: + case SQL_STREAMING: + requestInfo.setEntryClass(CommandUtil.APP_CLASS_NAME); + command.append(" -sql ").append(jobRunParamDTO.getSqlPath()).append(" "); + if (StringUtils.isNotEmpty(jobRunParamDTO.getFlinkCheckpointConfig())) { + command.append(" ").append(jobRunParamDTO.getFlinkCheckpointConfig()); + } + command.append(" -type ").append(jobConfigDTO.getJobTypeEnum().getCode()).append(" "); + break; + case JAR: + command.append(" ").append(jobConfigDTO.getCustomArgs()); + break; + } + requestInfo.setProgramArgs(command.toString()); + log.info("buildRestRunJobInfo={}", requestInfo); + return requestInfo; + } + /** * jar并且构建运行命令 * diff --git a/flink-streaming-web-common/src/main/java/com/flink/streaming/web/enums/DeployModeEnum.java b/flink-streaming-web-common/src/main/java/com/flink/streaming/web/enums/DeployModeEnum.java index 704a5cc5..9d05bb0d 100644 --- a/flink-streaming-web-common/src/main/java/com/flink/streaming/web/enums/DeployModeEnum.java +++ b/flink-streaming-web-common/src/main/java/com/flink/streaming/web/enums/DeployModeEnum.java @@ -10,7 +10,7 @@ * @time 20:41 */ public enum DeployModeEnum { - YARN_PER, STANDALONE, LOCAL; + YARN_PER, STANDALONE, LOCAL, REST; public static DeployModeEnum getModel(String model) { if (StringUtils.isEmpty(model)) { diff --git a/flink-streaming-web-common/src/main/java/com/flink/streaming/web/enums/SysConfigEnum.java b/flink-streaming-web-common/src/main/java/com/flink/streaming/web/enums/SysConfigEnum.java index 2e0f2c2a..9d0126a5 100644 --- a/flink-streaming-web-common/src/main/java/com/flink/streaming/web/enums/SysConfigEnum.java +++ b/flink-streaming-web-common/src/main/java/com/flink/streaming/web/enums/SysConfigEnum.java @@ -34,6 +34,9 @@ public enum SysConfigEnum { "flink Rest & web frontend 地址(Local Cluster模式)", SysConfigEnumType.SYS.name()), + FLINK_SQL_CORE_JAR_ID("flink_rest_sql_core_jar_id", "sql core jar 提交到 flink 后返回的 jarId", + SysConfigEnumType.SYS.name()), + FLINK_REST_HA_HTTP_ADDRESS("flink_rest_ha_http_address", "flink Rest & web frontend HA 地址(Standalone Cluster模式 支持HA 可以填写多个地址 ;用分隔)", diff --git a/flink-streaming-web-common/src/main/java/com/flink/streaming/web/model/dto/JobRunParamDTO.java b/flink-streaming-web-common/src/main/java/com/flink/streaming/web/model/dto/JobRunParamDTO.java index f24392cf..9b2595dd 100644 --- a/flink-streaming-web-common/src/main/java/com/flink/streaming/web/model/dto/JobRunParamDTO.java +++ b/flink-streaming-web-common/src/main/java/com/flink/streaming/web/model/dto/JobRunParamDTO.java @@ -59,7 +59,8 @@ public JobRunParamDTO(String flinkBinPath, this.flinkCheckpointConfig = flinkCheckpointConfig; } - public static JobRunParamDTO buildJobRunParam(Map systemConfigMap, JobConfigDTO jobConfigDTO, String sqlPath) { + public static JobRunParamDTO buildJobRunParam(Map systemConfigMap, JobConfigDTO jobConfigDTO, + String sqlPath) { String flinkBinPath = SystemConstants.buildFlinkBin(systemConfigMap.get(SysConfigEnum.FLINK_HOME.getKey())); diff --git a/flink-streaming-web-common/src/main/java/com/flink/streaming/web/model/flink/JobRunRequestInfo.java b/flink-streaming-web-common/src/main/java/com/flink/streaming/web/model/flink/JobRunRequestInfo.java new file mode 100644 index 00000000..1ca348c7 --- /dev/null +++ b/flink-streaming-web-common/src/main/java/com/flink/streaming/web/model/flink/JobRunRequestInfo.java @@ -0,0 +1,69 @@ +package com.flink.streaming.web.model.flink; + +/** + * @author earthchen + * @date 2021/6/24 + **/ +public class JobRunRequestInfo { + + private String entryClass; + + private String programArgs; + + private Integer parallelism; + + private String savepointPath; + + private String allowNonRestoredState; + + public String getEntryClass() { + return entryClass; + } + + public void setEntryClass(String entryClass) { + this.entryClass = entryClass; + } + + public String getProgramArgs() { + return programArgs; + } + + public void setProgramArgs(String programArgs) { + this.programArgs = programArgs; + } + + public Integer getParallelism() { + return parallelism; + } + + public void setParallelism(Integer parallelism) { + this.parallelism = parallelism; + } + + public String getSavepointPath() { + return savepointPath; + } + + public void setSavepointPath(String savepointPath) { + this.savepointPath = savepointPath; + } + + public String getAllowNonRestoredState() { + return allowNonRestoredState; + } + + public void setAllowNonRestoredState(String allowNonRestoredState) { + this.allowNonRestoredState = allowNonRestoredState; + } + + @Override + public String toString() { + return "JobRunInfo{" + + "entryClass='" + entryClass + '\'' + + ", programArgs='" + programArgs + '\'' + + ", parallelism=" + parallelism + + ", savepointPath='" + savepointPath + '\'' + + ", allowNonRestoredState='" + allowNonRestoredState + '\'' + + '}'; + } +} diff --git a/flink-streaming-web/src/main/java/com/flink/streaming/web/ao/impl/JobBaseServiceAOImpl.java b/flink-streaming-web/src/main/java/com/flink/streaming/web/ao/impl/JobBaseServiceAOImpl.java index 039ce496..9bf9c4fe 100644 --- a/flink-streaming-web/src/main/java/com/flink/streaming/web/ao/impl/JobBaseServiceAOImpl.java +++ b/flink-streaming-web/src/main/java/com/flink/streaming/web/ao/impl/JobBaseServiceAOImpl.java @@ -19,6 +19,7 @@ import com.flink.streaming.web.rpc.CommandRpcClinetAdapter; import com.flink.streaming.web.rpc.FlinkRestRpcAdapter; import com.flink.streaming.web.rpc.YarnRestRpcAdapter; +import com.flink.streaming.web.model.flink.JobRunRequestInfo; import com.flink.streaming.web.rpc.model.JobStandaloneInfo; import com.flink.streaming.web.service.JobConfigService; import com.flink.streaming.web.service.JobRunLogService; @@ -87,7 +88,8 @@ public void checkStart(JobConfigDTO jobConfigDTO) { break; } - Map systemConfigMap = SystemConfigDTO.toMap(systemConfigService.getSystemConfig(SysConfigEnumType.SYS)); + Map systemConfigMap = + SystemConfigDTO.toMap(systemConfigService.getSystemConfig(SysConfigEnumType.SYS)); this.checkSysConfig(systemConfigMap, jobConfigDTO.getDeployModeEnum()); @@ -150,10 +152,12 @@ public Long insertJobRunLog(JobConfigDTO jobConfigDTO, String userName) { @Override public JobRunParamDTO writeSqlToFile(JobConfigDTO jobConfigDTO) { - Map systemConfigMap = SystemConfigDTO.toMap(systemConfigService.getSystemConfig(SysConfigEnumType.SYS)); + Map systemConfigMap = + SystemConfigDTO.toMap(systemConfigService.getSystemConfig(SysConfigEnumType.SYS)); - String sqlPath = FileUtils.getSqlHome(systemConfigMap.get(SysConfigEnum.FLINK_STREAMING_PLATFORM_WEB_HOME.getKey())) - + FileUtils.createFileName(String.valueOf(jobConfigDTO.getId())); + String sqlPath = + FileUtils.getSqlHome(systemConfigMap.get(SysConfigEnum.FLINK_STREAMING_PLATFORM_WEB_HOME.getKey())) + + FileUtils.createFileName(String.valueOf(jobConfigDTO.getId())); FileUtils.writeText(sqlPath, jobConfigDTO.getFlinkSql(), Boolean.FALSE); return JobRunParamDTO.buildJobRunParam(systemConfigMap, jobConfigDTO, sqlPath); @@ -196,10 +200,17 @@ public void run() { case LOCAL: case STANDALONE: //1、构建执行命令 - command = CommandUtil.buildRunCommandForCluster(jobRunParamDTO, jobConfigDTO, savepointPath); + command = CommandUtil.buildRunCommandForCluster(jobRunParamDTO, jobConfigDTO, + savepointPath); //2、提交任务 appId = this.submitJobForStandalone(command, jobConfigDTO, localLog); + break; + case REST: + // todo 提交任务到 flink 中 + JobRunRequestInfo requestInfo = CommandUtil.buildRunRestJobInfoForCluster(jobRunParamDTO, + jobConfigDTO, savepointPath); + appId = this.submitJobForStandaloneRpc(jobConfigDTO, localLog, requestInfo); break; } @@ -321,7 +332,30 @@ private void updateStatusAndLog(JobConfigDTO jobConfig, Long jobRunLogId, private String submitJobForStandalone(String command, JobConfigDTO jobConfig, StringBuilder localLog) throws Exception { - String appId = commandRpcClinetAdapter.submitJob(command, localLog, jobRunLogId, jobConfig.getDeployModeEnum()); + String appId = commandRpcClinetAdapter.submitJob(command, localLog, jobRunLogId, + jobConfig.getDeployModeEnum()); + JobStandaloneInfo jobStandaloneInfo = flinkRestRpcAdapter.getJobInfoForStandaloneByAppId(appId, + jobConfig.getDeployModeEnum()); + + if (jobStandaloneInfo == null || StringUtils.isNotEmpty(jobStandaloneInfo.getErrors())) { + log.error("[submitJobForStandalone] is error jobStandaloneInfo={}", jobStandaloneInfo); + localLog.append("\n 任务失败 appId=" + appId); + throw new BizException("任务失败"); + } else { + if (!SystemConstants.STATUS_RUNNING.equals(jobStandaloneInfo.getState())) { + localLog.append("\n 任务失败 appId=" + appId).append("状态是:" + jobStandaloneInfo.getState()); + throw new BizException("[submitJobForStandalone]任务失败"); + } + } + return appId; + } + + private String submitJobForStandaloneRpc(JobConfigDTO jobConfig, + StringBuilder localLog, JobRunRequestInfo requestInfo) { + + String appId = + flinkRestRpcAdapter.runJarByJarId(systemConfigService.getSystemConfigByKey(SysConfigEnum.FLINK_SQL_CORE_JAR_ID.getKey()), + requestInfo, jobConfig.getDeployModeEnum()); JobStandaloneInfo jobStandaloneInfo = flinkRestRpcAdapter.getJobInfoForStandaloneByAppId(appId, jobConfig.getDeployModeEnum()); diff --git a/flink-streaming-web/src/main/java/com/flink/streaming/web/controller/api/JobConfigApiController.java b/flink-streaming-web/src/main/java/com/flink/streaming/web/controller/api/JobConfigApiController.java index b39809fc..1c075d21 100644 --- a/flink-streaming-web/src/main/java/com/flink/streaming/web/controller/api/JobConfigApiController.java +++ b/flink-streaming-web/src/main/java/com/flink/streaming/web/controller/api/JobConfigApiController.java @@ -315,6 +315,8 @@ private JobServerAO getJobServerAO(Long id) { case STANDALONE: log.info(" STANDALONE模式启动 {}", deployModeEnum); return jobStandaloneServerAO; + case REST: + return jobStandaloneServerAO; default: throw new RuntimeException("不支持该模式系统"); } diff --git a/flink-streaming-web/src/main/java/com/flink/streaming/web/rpc/FlinkRestRpcAdapter.java b/flink-streaming-web/src/main/java/com/flink/streaming/web/rpc/FlinkRestRpcAdapter.java index 0aea699b..66378341 100644 --- a/flink-streaming-web/src/main/java/com/flink/streaming/web/rpc/FlinkRestRpcAdapter.java +++ b/flink-streaming-web/src/main/java/com/flink/streaming/web/rpc/FlinkRestRpcAdapter.java @@ -1,8 +1,11 @@ package com.flink.streaming.web.rpc; import com.flink.streaming.web.enums.DeployModeEnum; +import com.flink.streaming.web.model.flink.JobRunRequestInfo; import com.flink.streaming.web.rpc.model.JobStandaloneInfo; +import java.io.File; + /** * @author zhuhuipei * @Description: @@ -11,6 +14,14 @@ */ public interface FlinkRestRpcAdapter { + String uploadJarAndReturnJarId(File file, DeployModeEnum deployModeEnum); + + default String uploadJarAndReturnJarId(String filePath, DeployModeEnum deployModeEnum) { + return uploadJarAndReturnJarId(new File(filePath), deployModeEnum); + } + + String runJarByJarId(String jarId, JobRunRequestInfo info, DeployModeEnum deployModeEnum); + /** * Standalone 模式下获取状态 diff --git a/flink-streaming-web/src/main/java/com/flink/streaming/web/rpc/impl/FlinkRestRpcAdapterImpl.java b/flink-streaming-web/src/main/java/com/flink/streaming/web/rpc/impl/FlinkRestRpcAdapterImpl.java index 56233e34..7863eae6 100644 --- a/flink-streaming-web/src/main/java/com/flink/streaming/web/rpc/impl/FlinkRestRpcAdapterImpl.java +++ b/flink-streaming-web/src/main/java/com/flink/streaming/web/rpc/impl/FlinkRestRpcAdapterImpl.java @@ -1,19 +1,32 @@ package com.flink.streaming.web.rpc.impl; import com.alibaba.fastjson.JSON; +import com.flink.streaming.web.common.FlinkRestUriConstants; import com.flink.streaming.web.common.FlinkYarnRestUriConstants; import com.flink.streaming.web.common.util.HttpUtil; import com.flink.streaming.web.enums.DeployModeEnum; import com.flink.streaming.web.enums.SysErrorEnum; import com.flink.streaming.web.exceptions.BizException; import com.flink.streaming.web.rpc.FlinkRestRpcAdapter; +import com.flink.streaming.web.rpc.model.JobRunInfo; +import com.flink.streaming.web.model.flink.JobRunRequestInfo; import com.flink.streaming.web.rpc.model.JobStandaloneInfo; +import com.flink.streaming.web.rpc.model.UploadJarInfo; import com.flink.streaming.web.service.SystemConfigService; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.core.io.FileSystemResource; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpHeaders; +import org.springframework.http.MediaType; import org.springframework.stereotype.Service; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; import org.springframework.web.client.HttpClientErrorException; +import org.springframework.web.client.RestTemplate; + +import java.io.File; /** * @author zhuhuipei @@ -29,6 +42,45 @@ public class FlinkRestRpcAdapterImpl implements FlinkRestRpcAdapter { private SystemConfigService systemConfigService; + @Override + public String uploadJarAndReturnJarId(File file, DeployModeEnum deployModeEnum) { + String url = HttpUtil.buildUrl(systemConfigService.getFlinkHttpAddress(deployModeEnum), + FlinkRestUriConstants.getUriJobsUploadForStandalone()); + RestTemplate restTemplate = HttpUtil.buildRestTemplate(HttpUtil.TIME_OUT_1_M); + //设置请求头 + HttpHeaders headers = new HttpHeaders(); + MediaType type = MediaType.parseMediaType("multipart/form-data"); + headers.setContentType(type); + + //设置请求体,注意是LinkedMultiValueMap + FileSystemResource fileSystemResource = new FileSystemResource(file); + MultiValueMap form = new LinkedMultiValueMap<>(); + form.add("jarfile", fileSystemResource); + // 用HttpEntity封装整个请求报文 + HttpEntity> files = new HttpEntity<>(form, headers); + String res = restTemplate.postForObject(url, files, String.class); + UploadJarInfo info = JSON.parseObject(res, UploadJarInfo.class); + if (info == null) { + throw new BizException("上传 jar 失败"); + } + return info.getJarId(); + } + + @Override + public String runJarByJarId(String jarId, JobRunRequestInfo jobRunRequestInfo, DeployModeEnum deployModeEnum) { + String url = HttpUtil.buildUrl(systemConfigService.getFlinkHttpAddress(deployModeEnum), + FlinkRestUriConstants.getUriJobsRunForStandalone(jarId)); + log.info("runJarByJarId url={}", url); + HttpEntity request = new HttpEntity<>(JSON.toJSONString(jobRunRequestInfo)); + String res = + HttpUtil.buildRestTemplate(HttpUtil.TIME_OUT_1_M).postForEntity(url, request, String.class).getBody(); + JobRunInfo info = JSON.parseObject(res, JobRunInfo.class); + if (info == null) { + throw new BizException("run jar 失败"); + } + return info.getJobId(); + } + @Override public JobStandaloneInfo getJobInfoForStandaloneByAppId(String appId, DeployModeEnum deployModeEnum) { if (StringUtils.isEmpty(appId)) { diff --git a/flink-streaming-web/src/main/java/com/flink/streaming/web/rpc/model/JobRunInfo.java b/flink-streaming-web/src/main/java/com/flink/streaming/web/rpc/model/JobRunInfo.java new file mode 100644 index 00000000..db3de860 --- /dev/null +++ b/flink-streaming-web/src/main/java/com/flink/streaming/web/rpc/model/JobRunInfo.java @@ -0,0 +1,25 @@ +package com.flink.streaming.web.rpc.model; + +/** + * @author earthchen + * @date 2021/6/24 + **/ +public class JobRunInfo { + + private String jobId; + + public String getJobId() { + return jobId; + } + + public void setJobId(String jobId) { + this.jobId = jobId; + } + + @Override + public String toString() { + return "JobRunInfo{" + + "jobId='" + jobId + '\'' + + '}'; + } +} diff --git a/flink-streaming-web/src/main/java/com/flink/streaming/web/rpc/model/UploadJarInfo.java b/flink-streaming-web/src/main/java/com/flink/streaming/web/rpc/model/UploadJarInfo.java new file mode 100644 index 00000000..bcb22e61 --- /dev/null +++ b/flink-streaming-web/src/main/java/com/flink/streaming/web/rpc/model/UploadJarInfo.java @@ -0,0 +1,46 @@ +package com.flink.streaming.web.rpc.model; + +/** + * @author earthchen + * @date 2021/6/24 + **/ +public class UploadJarInfo { + + private String filename; + + private String status; + + public String getFilename() { + return filename; + } + + public void setFilename(String filename) { + this.filename = filename; + } + + public String getStatus() { + return status; + } + + public void setStatus(String status) { + this.status = status; + } + + @Override + public String toString() { + return "UploadJarInfo{" + + "filename='" + filename + '\'' + + ", status='" + status + '\'' + + '}'; + } + + public String getJarId() { + String[] ans = filename.split("/"); + if (ans.length == 0) { + return null; + } + return ans[ans.length - 1]; + } + + +} diff --git a/flink-streaming-web/src/main/java/com/flink/streaming/web/service/impl/SystemConfigServiceImpl.java b/flink-streaming-web/src/main/java/com/flink/streaming/web/service/impl/SystemConfigServiceImpl.java index 21be4123..186f575f 100644 --- a/flink-streaming-web/src/main/java/com/flink/streaming/web/service/impl/SystemConfigServiceImpl.java +++ b/flink-streaming-web/src/main/java/com/flink/streaming/web/service/impl/SystemConfigServiceImpl.java @@ -98,6 +98,7 @@ public String getFlinkHttpAddress(DeployModeEnum deployModeEnum) { return urlLocal.trim(); } throw new BizException("网络异常 url=" + urlLocal); + case REST: case STANDALONE: String urlHA = this.getSystemConfigByKey(SysConfigEnum.FLINK_REST_HA_HTTP_ADDRESS.getKey()); if (StringUtils.isEmpty(urlHA)) { diff --git a/flink-streaming-web/src/main/resources/application-prod.properties b/flink-streaming-web/src/main/resources/application-prod.properties index 90cb5280..bcc69627 100644 --- a/flink-streaming-web/src/main/resources/application-prod.properties +++ b/flink-streaming-web/src/main/resources/application-prod.properties @@ -1,6 +1,5 @@ -spring.profiles.active=prod -logging.config= classpath:logging/logback-${spring.profiles.active}.xml +#logging.config=classpath:logging/logback-prod.xml -spring.devtools.livereload.enabled= false +spring.devtools.livereload.enabled=false diff --git a/flink-streaming-web/src/main/resources/application.properties b/flink-streaming-web/src/main/resources/application.properties index 8f7bf1fd..8de02995 100644 --- a/flink-streaming-web/src/main/resources/application.properties +++ b/flink-streaming-web/src/main/resources/application.properties @@ -1,6 +1,6 @@ -server.port=8080 +server.port=8082 spring.profiles.active=dev -logging.config= classpath:logging/logback-${spring.profiles.active}.xml +#logging.config=classpath:logging/logback-${spring.profiles.active}.xml spring.devtools.livereload.enabled= true @@ -19,9 +19,9 @@ mybatis.mapper-locations=classpath:mapper/*.xml ####jdbc连接池 -spring.datasource.url=jdbc:mysql://localhost:3306/flink_web?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=false -spring.datasource.username=root -spring.datasource.password=root +spring.datasource.url=jdbc:mysql://${MYSQL_HOST:localhost}:${MYSQL_PORT:3306}/${MYSQL_DATABASE:flink_web}?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=${MYSQL_SSL:false} +spring.datasource.username=${MYSQL_USERNAME:root} +spring.datasource.password=${MYSQL_PASSWORD:123456} spring.datasource.driver-class-name=com.mysql.jdbc.Driver spring.datasource.type=com.alibaba.druid.pool.DruidDataSource spring.datasource.druid.initial-size=5 @@ -48,7 +48,7 @@ spring.datasource.druid.stat-view-servlet.login-password=123456 #日志 -log.file.root= ../logs/ +log.file.root= ${LOG_HOME:../logs/} log.file.maxHistory= 20 log.file.maxSize= 200MB diff --git a/flink-streaming-web/src/main/resources/templates/screen/job_config/addPage.ftl b/flink-streaming-web/src/main/resources/templates/screen/job_config/addPage.ftl index 7b55f222..3f97116a 100644 --- a/flink-streaming-web/src/main/resources/templates/screen/job_config/addPage.ftl +++ b/flink-streaming-web/src/main/resources/templates/screen/job_config/addPage.ftl @@ -89,6 +89,7 @@ +
diff --git a/flink-streaming-web/src/main/resources/templates/screen/job_config/editPage.ftl b/flink-streaming-web/src/main/resources/templates/screen/job_config/editPage.ftl index 3759f096..ad757f0f 100644 --- a/flink-streaming-web/src/main/resources/templates/screen/job_config/editPage.ftl +++ b/flink-streaming-web/src/main/resources/templates/screen/job_config/editPage.ftl @@ -108,6 +108,7 @@ +
diff --git a/http/flink-rest-client.http b/http/flink-rest-client.http new file mode 100644 index 00000000..608b27dc --- /dev/null +++ b/http/flink-rest-client.http @@ -0,0 +1,28 @@ +### 上传 jar +#curl -X POST -H "Expect:" -F "jarfile=@/Users/earthchen/study/flink/flink-streaming-platform-web/lib/flink-streaming-core.jar" http://127.0.0.1:8081/jars/upload +POST http://localhost:8081/jars/upload +Content-Type: multipart/form-data; boundary=WebAppBoundary + +--WebAppBoundary +Content-Disposition: name="jarfile"; + +< /Users/earthchen/study/flink/flink-streaming-platform-web/lib/flink-streaming-core.jar +--WebAppBoundary-- + + + +### 运行 job +POST http://localhost:8081/jars/f3b73878-7764-401e-8992-e6507b6569d2_flink-streaming-core.jar/run +Content-Type: application/json + +{ + "entryClass": "com.flink.streaming.core.JobApplication", + "programArgs": "--sql /Users/earthchen/code-tool/flink/upload/sql/test.sql --type 0" +} + +### 查询 jars +GET http://localhost:8081/jars + + +# command =/Users/earthchen/code-tool/flink/flink-1.12.4/bin/flink run -d -c com.flink.streaming.core.JobApplication /Users/earthchen/study/flink/flink-streaming-platform-web/lib/flink-streaming-core.jar -sql /Users/earthchen/study/flink/flink-streaming-platform-web/sql/job_sql_1.sql -type 0 +