Skip to content

Commit

Permalink
[Feature][scaleph-application-flink] support session job by flink kub…
Browse files Browse the repository at this point in the history
…ernetes operator (#734)

* feature: add doris dependency for seatunnel doris connector

* feature: update flink session job handler

* feature: update flink session job handler

* feature: update flink session job handler

---------

Co-authored-by: wangqi <wangqi@xinxuan.net>
  • Loading branch information
kalencaya and wangqi authored Jun 4, 2024
1 parent ee27e78 commit cd0a54d
Show file tree
Hide file tree
Showing 14 changed files with 348 additions and 35 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

name: Release-Docker-Flink-CDC

on:
workflow_dispatch:
inputs:
flinkKubernetesOperatorVersion:
description: 'flink-kubernetes operator version'
required: true
default: '1.8.0'
type: choice
options:
- 1.8.0

env:
HUB: ghcr.io/flowerfine/flink-kubernetes-operator
FLINK_KUBERNETES_OPERATOR_VERSION: ${{ inputs.flinkKubernetesOperatorVersion }}

jobs:
build:
runs-on: ubuntu-latest
permissions:
contents: read
packages: write
timeout-minutes: 360
steps:
- name: Checkout
uses: actions/checkout@v4
with:
fetch-depth: 1
- name: Log in to the Container registry
uses: docker/login-action@3
with:
registry: ${{ env.HUB }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
with:
platforms: amd64,arm64
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Build image
uses: docker/build-push-action@v5
with:
no-cache: false
cache-from: type=gha
cache-to: type=gha
push: true
context: .
build-args: |
FLINK_KUBERNETES_OPERATOR_VERSION=${{ env.FLINK_KUBERNETES_OPERATOR_VERSION }}
platforms: linux/amd64,linux/arm64
file: tools/docker/build/flink-kubernetes-operator/Dockerfile
tags: ${{ env.HUB }}:${{ env.FLINK_KUBERNETES_OPERATOR_VERSION }}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import cn.sliew.milky.common.filter.ActionListener;
import cn.sliew.milky.common.util.JacksonUtil;
import cn.sliew.scaleph.application.flink.operator.status.FlinkDeploymentStatus;
import cn.sliew.scaleph.application.flink.operator.status.FlinkSessionJobStatus;
import cn.sliew.scaleph.application.flink.service.WsFlinkKubernetesJobInstanceService;
import cn.sliew.scaleph.application.flink.service.WsFlinkKubernetesJobService;
import cn.sliew.scaleph.application.flink.service.dto.WsFlinkKubernetesJobInstanceDTO;
import cn.sliew.scaleph.common.dict.flink.kubernetes.DeploymentKind;
import cn.sliew.scaleph.workflow.engine.action.ActionContext;
import cn.sliew.scaleph.workflow.engine.action.ActionResult;
import cn.sliew.scaleph.workflow.engine.workflow.AbstractWorkFlow;
Expand Down Expand Up @@ -65,18 +67,41 @@ private void doProcess(Long jobId) {
if (jobInstanceDTOOptional.isEmpty()) {
return;
}
WsFlinkKubernetesJobInstanceDTO jobInstanceDTO = jobInstanceDTOOptional.get();
WsFlinkKubernetesJobInstanceDTO jobInstanceDTO = wsFlinkKubernetesJobInstanceService.selectOne(jobInstanceDTOOptional.get().getId());
Optional<GenericKubernetesResource> optional = wsFlinkKubernetesJobInstanceService.getStatus(jobInstanceDTO.getId());
if (optional.isPresent()) {
String json = JacksonUtil.toJsonString(optional.get().get("status"));
FlinkDeploymentStatus status = JacksonUtil.parseJsonString(json, FlinkDeploymentStatus.class);
wsFlinkKubernetesJobInstanceService.updateStatus(jobInstanceDTO.getId(), status);
} else {
wsFlinkKubernetesJobInstanceService.clearStatus(jobInstanceDTO.getId());
DeploymentKind deploymentKind = jobInstanceDTO.getWsFlinkKubernetesJob().getDeploymentKind();
switch (deploymentKind) {
case FLINK_DEPLOYMENT:
doProcessFlinkDeployment(jobInstanceDTO, optional);
break;
case FLINK_SESSION_JOB:
doProcessFlinkSessionJob(jobInstanceDTO, optional);
break;
default:
}
} catch (Exception e) {
log.error("update flink kubernetes job status error! id: {}", jobId, e);
}
}

private void doProcessFlinkDeployment(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO, Optional<GenericKubernetesResource> optional) {
if (optional.isPresent()) {
String json = JacksonUtil.toJsonString(optional.get().get("status"));
FlinkDeploymentStatus status = JacksonUtil.parseJsonString(json, FlinkDeploymentStatus.class);
wsFlinkKubernetesJobInstanceService.updateStatus(jobInstanceDTO.getId(), status);
} else {
wsFlinkKubernetesJobInstanceService.clearStatus(jobInstanceDTO.getId());
}
}

private void doProcessFlinkSessionJob(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO, Optional<GenericKubernetesResource> optional) {
if (optional.isPresent()) {
String json = JacksonUtil.toJsonString(optional.get().get("status"));
FlinkSessionJobStatus status = JacksonUtil.parseJsonString(json, FlinkSessionJobStatus.class);
wsFlinkKubernetesJobInstanceService.updateStatus(jobInstanceDTO.getId(), status);
} else {
wsFlinkKubernetesJobInstanceService.clearStatus(jobInstanceDTO.getId());
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.sliew.scaleph.application.flink.resource.definition.job.instance;

import cn.sliew.scaleph.application.flink.operator.spec.FlinkSessionJobSpec;
import cn.sliew.scaleph.application.flink.operator.spec.JobSpec;
import cn.sliew.scaleph.application.flink.service.dto.WsFlinkKubernetesJobInstanceDTO;
import cn.sliew.scaleph.common.dict.flink.FlinkJobType;
import cn.sliew.scaleph.common.dict.flink.kubernetes.DeploymentKind;
import cn.sliew.scaleph.dao.entity.master.ws.WsArtifactFlinkJar;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

@Component
public class FlinkSessionJobArtifactHandler implements ArtifactHandler {

@Override
public boolean support(DeploymentKind deploymentKind) {
return deploymentKind == DeploymentKind.FLINK_SESSION_JOB;
}

@Override
public boolean support(FlinkJobType flinkJobType) {
switch (flinkJobType) {
case JAR:
return true;
default:
return false;
}
}

public void handle(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO, Object spec) {
FlinkSessionJobSpec flinkSessionJobSpec = (FlinkSessionJobSpec) spec;
switch (jobInstanceDTO.getWsFlinkKubernetesJob().getType()) {
case JAR:
addJarArtifact(jobInstanceDTO, flinkSessionJobSpec);
break;
default:
}
}

private void addJarArtifact(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO, FlinkSessionJobSpec spec) {
WsArtifactFlinkJar artifactFlinkJar = jobInstanceDTO.getWsFlinkKubernetesJob().getArtifactFlinkJar();
JobSpec jobSpec = new JobSpec();
jobSpec.setJarURI(artifactFlinkJar.getPath());
jobSpec.setEntryClass(artifactFlinkJar.getEntryClass());
jobSpec.setArgs(StringUtils.split(artifactFlinkJar.getJarParams(), " "));
spec.setJob(jobSpec);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import cn.sliew.scaleph.application.flink.resource.definition.sessioncluster.FlinkSessionCluster;
import cn.sliew.scaleph.application.flink.operator.spec.AbstractFlinkSpec;
import cn.sliew.scaleph.application.flink.operator.spec.FlinkSessionJobSpec;
import cn.sliew.scaleph.application.flink.resource.handler.FileSystemParamHandler;
import cn.sliew.scaleph.application.flink.resource.handler.FlinkRuntimeModeHandler;
import cn.sliew.scaleph.application.flink.resource.handler.FlinkStateStorageHandler;
import cn.sliew.scaleph.application.flink.service.dto.WsFlinkKubernetesJobInstanceDTO;
Expand All @@ -38,13 +39,16 @@ public class FlinkSessionJobSpecHandler {
private ArtifactConverterFactory artifactConverterFactory;
@Autowired
private FlinkStateStorageHandler flinkStateStorageHandler;
@Autowired
private FileSystemParamHandler fileSystemParamHandler;

public FlinkSessionJobSpec handle(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO, FlinkSessionCluster flinkSessionCluster, FlinkSessionJobSpec flinkSessionJobSpec) throws Exception {
FlinkSessionJobSpec spec = Optional.ofNullable(flinkSessionJobSpec).orElse(new FlinkSessionJobSpec());
addDeploymentName(spec, flinkSessionCluster);
setRuntimeMode(jobInstanceDTO, spec);
addArtifact(jobInstanceDTO, spec);
enableFlinkStateStore(jobInstanceDTO, spec);
setFileSystemParam(spec);
return spec;
}

Expand All @@ -64,4 +68,8 @@ private void enableFlinkStateStore(WsFlinkKubernetesJobInstanceDTO jobInstanceDT
flinkStateStorageHandler.handle(jobInstanceDTO.getInstanceId(), spec);
}

private void setFileSystemParam(FlinkSessionJobSpec spec) {
fileSystemParamHandler.handle(spec);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.sliew.scaleph.application.flink.resource.handler;

import cn.sliew.scaleph.application.flink.operator.spec.AbstractFlinkSpec;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

@Component
public class FileSystemParamHandler {

@Autowired
private FileSystemPluginHandler fileSystemPluginHandler;

public void handle(AbstractFlinkSpec spec) {
Map<String, String> configuration = Optional.ofNullable(spec.getFlinkConfiguration()).orElse(new HashMap<>());
fileSystemPluginHandler.addFileSystemConfigOption(configuration);
spec.setFlinkConfiguration(configuration);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class FileSystemPluginHandler {
private static final String S3_ACCESS_KEY = "s3.access-key";
private static final String S3_SECRET_KEY = "s3.secret-key";
private static final String S3_PATH_STYLE_ACCESS = "s3.path.style.access";
private static final String FS_ALLOWED_FALLBACK_FILESYSTEM = "fs.allowed-fallback-filesystems";

private static final String FILE_SYSTEM_ENV_NAME = "ENABLE_BUILT_IN_PLUGINS";
private static final String S3_FILE_SYSTEM_TEMPLATE = "flink-s3-fs-hadoop-%s.jar";
Expand Down Expand Up @@ -69,12 +70,13 @@ private void handlePodTemplate(WsFlinkKubernetesJobDTO jobDTO, PodBuilder builde
spec.endSpec();
}

private void addFileSystemConfigOption(Map<String, String> flinkConfiguration) {
void addFileSystemConfigOption(Map<String, String> flinkConfiguration) {
if (s3FileSystemProperties != null) {
flinkConfiguration.put(S3_ENDPOINT, NetUtils.replaceLocalhost(s3FileSystemProperties.getEndpoint()));
flinkConfiguration.put(S3_ACCESS_KEY, s3FileSystemProperties.getAccessKey());
flinkConfiguration.put(S3_SECRET_KEY, s3FileSystemProperties.getSecretKey());
flinkConfiguration.put(S3_PATH_STYLE_ACCESS, "true"); // container
flinkConfiguration.put(FS_ALLOWED_FALLBACK_FILESYSTEM, "s3"); // container
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ public interface FlinkKubernetesOperatorService {

Optional<GenericKubernetesResource> getJob(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO) throws Exception;

Optional<GenericKubernetesResource> getFlinkDeployment(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO) throws Exception;

Optional<GenericKubernetesResource> getFlinkSessionJob(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO) throws Exception;

void shutdownJob(Long clusterCredentialId, String job) throws Exception;

void applyJob(Long clusterCredentialId, String job) throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package cn.sliew.scaleph.application.flink.service;

import cn.sliew.scaleph.application.flink.operator.status.FlinkSessionJobStatus;
import cn.sliew.scaleph.application.flink.service.dto.WsFlinkKubernetesJobInstanceDTO;
import cn.sliew.scaleph.application.flink.service.dto.WsFlinkKubernetesJobInstanceSavepointDTO;
import cn.sliew.scaleph.application.flink.service.param.WsFlinkKubernetesJobInstanceDeployParam;
Expand Down Expand Up @@ -64,5 +65,7 @@ public interface WsFlinkKubernetesJobInstanceService {

int updateStatus(Long id, FlinkDeploymentStatus status);

int updateStatus(Long id, FlinkSessionJobStatus status);

int clearStatus(Long id);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import cn.sliew.scaleph.application.flink.resource.definition.deployment.FlinkDeployment;
import cn.sliew.scaleph.application.flink.resource.definition.sessioncluster.FlinkSessionCluster;
import cn.sliew.scaleph.application.flink.service.FlinkKubernetesOperatorService;
import cn.sliew.scaleph.application.flink.service.dto.WsFlinkKubernetesDeploymentDTO;
import cn.sliew.scaleph.application.flink.service.dto.WsFlinkKubernetesJobDTO;
import cn.sliew.scaleph.application.flink.service.dto.WsFlinkKubernetesJobInstanceDTO;
import cn.sliew.scaleph.application.flink.service.dto.WsFlinkKubernetesSessionClusterDTO;
Expand Down Expand Up @@ -77,27 +78,37 @@ public void shutdownSessionCluster(Long clusterCredentialId, FlinkSessionCluster

@Override
public Optional<GenericKubernetesResource> getJob(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO) throws Exception {
VersionGroupKind versionAndGroup = new VersionGroupKind();
versionAndGroup.setApiVersion(Constant.API_VERSION);
versionAndGroup.setKind(Constant.FLINK_DEPLOYMENT);
versionAndGroup.setName(jobInstanceDTO.getInstanceId());

final WsFlinkKubernetesJobDTO jobDto = jobInstanceDTO.getWsFlinkKubernetesJob();
Long clusterCredentialId = null;
String namespace = null;
WsFlinkKubernetesJobDTO jobDto = jobInstanceDTO.getWsFlinkKubernetesJob();
switch (jobDto.getDeploymentKind()) {
case FLINK_DEPLOYMENT:
clusterCredentialId = jobDto.getFlinkDeployment().getClusterCredentialId();
namespace = jobDto.getFlinkDeployment().getNamespace();
break;
return getFlinkDeployment(jobInstanceDTO);
case FLINK_SESSION_JOB:
clusterCredentialId = jobDto.getFlinkSessionCluster().getClusterCredentialId();
namespace = jobDto.getFlinkSessionCluster().getNamespace();
break;
return getFlinkSessionJob(jobInstanceDTO);
default:
return Optional.empty();
}
versionAndGroup.setNamespace(namespace);
return objectService.getResource(clusterCredentialId, versionAndGroup);
}

@Override
public Optional<GenericKubernetesResource> getFlinkDeployment(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO) throws Exception {
WsFlinkKubernetesDeploymentDTO flinkDeployment = jobInstanceDTO.getWsFlinkKubernetesJob().getFlinkDeployment();
VersionGroupKind versionAndGroup = new VersionGroupKind();
versionAndGroup.setApiVersion(Constant.API_VERSION);
versionAndGroup.setKind(Constant.FLINK_DEPLOYMENT);
versionAndGroup.setName(jobInstanceDTO.getInstanceId());
versionAndGroup.setNamespace(flinkDeployment.getNamespace());
return objectService.getResource(flinkDeployment.getClusterCredentialId(), versionAndGroup);
}

@Override
public Optional<GenericKubernetesResource> getFlinkSessionJob(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO) throws Exception {
WsFlinkKubernetesSessionClusterDTO flinkSessionCluster = jobInstanceDTO.getWsFlinkKubernetesJob().getFlinkSessionCluster();
VersionGroupKind versionAndGroup = new VersionGroupKind();
versionAndGroup.setApiVersion(Constant.API_VERSION);
versionAndGroup.setKind(Constant.FLINK_SESSION_JOB);
versionAndGroup.setName(jobInstanceDTO.getInstanceId());
versionAndGroup.setNamespace(flinkSessionCluster.getNamespace());
return objectService.getResource(flinkSessionCluster.getClusterCredentialId(), versionAndGroup);
}

@Override
Expand Down
Loading

0 comments on commit cd0a54d

Please sign in to comment.