Skip to content

Commit

Permalink
[Feature][scaleph-workflow] update scaleph workflow (#699)
Browse files Browse the repository at this point in the history
* feature: flink kuberentes job detail web

* feature: flink kuberentes job detail web

* feature: add queue module

* feature: update workflow

* feature: upgrade workflow implementions

* feature: upgrade workflow implementions

* feature: upgrade workflow implementions

* feature: upgrade workflow implementions

* feature: upgrade workflow implementions

* feature: upgrade workflow implementions

* feature: upgrade workflow implementions

* feature: upgrade workflow implementions

* feature: upgrade workflow implementions

* feature: update workflow

* feature: add redission dependency

* feature: add pekko dependency

* feature: add pekko dependency

* feature: add pekko dependency

---------

Co-authored-by: wangqi <wangqi@xinxuan.net>
  • Loading branch information
kalencaya and wangqi authored Mar 25, 2024
1 parent 335a54a commit cff0451
Show file tree
Hide file tree
Showing 71 changed files with 1,761 additions and 201 deletions.
50 changes: 35 additions & 15 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -152,15 +152,16 @@
<postgresql.version>42.3.3</postgresql.version>
<druid.version>0.22.1</druid.version>
<calcite-druid.version>1.29.0</calcite-druid.version>
<config.version>1.3.3</config.version>
<config.version>1.4.3</config.version>
<okhttp.version>4.10.0</okhttp.version>
<guava.version>32.1.3-jre</guava.version>
<minio.version>8.3.8</minio.version>
<milky.version>1.0.12</milky.version>
<milky.version>1.0.13-SNAPSHOT</milky.version>
<sakura.version>1.0.2-SNAPSHOT</sakura.version>
<hadoop.version>3.3.4</hadoop.version>
<hive.version>3.1.3</hive.version>
<akka.version>2.6.21</akka.version>
<pekko.version>1.0.3-M1</pekko.version>
<protobuf.version>3.21.5</protobuf.version>
<netty.version>4.1.82.Final</netty.version>
<flink.version>1.18.1</flink.version>
Expand All @@ -171,6 +172,8 @@
<spring-cloud-openfeign.version>3.1.7</spring-cloud-openfeign.version>
<zjsonpatch.version>0.4.14</zjsonpatch.version>
<kogito.version>2.44.0.Alpha</kogito.version>
<redisson.version>3.27.2</redisson.version>
<cola.version>4.3.2</cola.version>
</properties>

<dependencyManagement>
Expand All @@ -196,13 +199,6 @@
<type>pom</type>
<scope>import</scope>
</dependency>
<!--<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-bom_${scala.binary.version}</artifactId>
<version>${akka.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>-->
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-bom</artifactId>
Expand Down Expand Up @@ -623,16 +619,27 @@
</dependency>

<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor-typed_${scala.binary.version}</artifactId>
<version>${akka.version}</version>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-actor-typed_${scala.binary.version}</artifactId>
<version>${pekko.version}</version>
</dependency>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-cluster-typed_${scala.binary.version}</artifactId>
<version>${pekko.version}</version>
</dependency>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-stream_${scala.binary.version}</artifactId>
<version>${pekko.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-stream_${scala.binary.version}</artifactId>
<version>${akka.version}</version>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-serialization-jackson_${scala.binary.version}</artifactId>
<version>${pekko.version}</version>
</dependency>


<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
Expand Down Expand Up @@ -731,6 +738,18 @@
<artifactId>kogito-serverless-workflow-runtime</artifactId>
<version>${kogito.version}</version>
</dependency>

<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>${redisson.version}</version>
</dependency>

<dependency>
<groupId>com.alibaba.cola</groupId>
<artifactId>cola-component-statemachine</artifactId>
<version>${cola.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

Expand All @@ -744,6 +763,7 @@
<include>**/*.yml</include>
<include>**/*.xml</include>
<include>**/*.properties</include>
<include>**/*.conf</include>
<include>**/*.html</include>
<include>**/*.js</include>
<include>**/*.css</include>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.sliew.scaleph.api.config;

import org.apache.pekko.actor.typed.ActorSystem;
import org.apache.pekko.actor.typed.SpawnProtocol;
import org.apache.pekko.actor.typed.javadsl.Behaviors;
import org.apache.pekko.cluster.ClusterEvent;
import org.apache.pekko.cluster.typed.Cluster;
import org.apache.pekko.cluster.typed.Join;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

@Configuration
public class PekkoConfig {

@Value("${spring.application.name}")
private String application;

@Bean(destroyMethod = "terminate")
public ActorSystem<SpawnProtocol.Command> actorSystem() {
ActorSystem<SpawnProtocol.Command> actorSystem = ActorSystem.create(Behaviors.setup(ctx -> SpawnProtocol.create()), application);
actorSystem.whenTerminated().onComplete(done -> {
if (done.isSuccess()) {
actorSystem.log().info("pekko ActorSystem terminate success!");
} else {
actorSystem.log().error("pekko ActorSystem terminate failure!", done.failed().get());
}
return done.get();
}, actorSystem.executionContext());
return actorSystem;
}

@Bean
public Cluster actorCluster(ActorSystem actorSystem) {
return Cluster.get(actorSystem);
}

@Component
public static class PekkoClusterBootstrap implements ApplicationRunner {

@Autowired
private Cluster cluster;

@Override
public void run(ApplicationArguments args) throws Exception {
cluster.manager().tell(Join.create(cluster.selfMember().address()));
ClusterEvent.CurrentClusterState state = cluster.state();
// cluster.subscriptions().tell(Subscribe.create(null, ClusterEvent.MemberEvent.class));
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import cn.sliew.scaleph.engine.flink.kubernetes.service.param.*;
import cn.sliew.scaleph.system.model.ResponseVO;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import io.fabric8.kubernetes.api.model.GenericKubernetesResource;
import io.fabric8.kubernetes.client.utils.Serialization;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -71,7 +73,7 @@ public ResponseEntity<ResponseVO<WsFlinkKubernetesJobDTO>> selectOne(@PathVariab
@Logging
@GetMapping("/asYaml/{id}")
@Operation(summary = "查询 YAML 格式 Job", description = "查询 YAML 格式 Job")
public ResponseEntity<ResponseVO<String>> asYaml(@PathVariable("id") Long id) throws Exception {
public ResponseEntity<ResponseVO<String>> asYaml(@PathVariable("id") Long id) {
String yaml = wsFlinkKubernetesJobInstanceService.mockYaml(id);
return new ResponseEntity(ResponseVO.success(yaml), HttpStatus.OK);
}
Expand Down Expand Up @@ -111,7 +113,7 @@ public ResponseEntity<ResponseVO> deleteBatch(@RequestBody List<Long> ids) {
@Logging
@GetMapping("{jobInstanceId}/flinkui")
@Operation(summary = "获取 flink-ui 链接", description = "获取 flink-ui 链接")
public ResponseEntity<ResponseVO<URI>> getFlinkUI(@PathVariable("jobInstanceId") Long jobInstanceId) throws Exception {
public ResponseEntity<ResponseVO<URI>> getFlinkUI(@PathVariable("jobInstanceId") Long jobInstanceId) {
URI endpoint = flinkJobManagerEndpointService.getJobManagerEndpoint(jobInstanceId);
return new ResponseEntity<>(ResponseVO.success(endpoint), HttpStatus.OK);
}
Expand Down Expand Up @@ -167,15 +169,15 @@ public ResponseEntity<ResponseVO> resume(@PathVariable("id") Long id) throws Exc
@Logging
@GetMapping("instances")
@Operation(summary = "获取任务实例列表", description = "获取任务实例列表")
public ResponseEntity<Page<WsFlinkKubernetesJobInstanceDTO>> listInstances(@Valid WsFlinkKubernetesJobInstanceListParam param) throws Exception {
public ResponseEntity<Page<WsFlinkKubernetesJobInstanceDTO>> listInstances(@Valid WsFlinkKubernetesJobInstanceListParam param) {
Page<WsFlinkKubernetesJobInstanceDTO> result = wsFlinkKubernetesJobInstanceService.list(param);
return new ResponseEntity<>(result, HttpStatus.OK);
}

@Logging
@GetMapping("instances/current")
@Operation(summary = "获取任务当前实例", description = "获取任务当前实例")
public ResponseEntity<ResponseVO<WsFlinkKubernetesJobInstanceDTO>> currentInstance(@RequestParam("wsFlinkKubernetesJobId") Long wsFlinkKubernetesJobId) throws Exception {
public ResponseEntity<ResponseVO<WsFlinkKubernetesJobInstanceDTO>> currentInstance(@RequestParam("wsFlinkKubernetesJobId") Long wsFlinkKubernetesJobId) {
Optional<WsFlinkKubernetesJobInstanceDTO> optional = wsFlinkKubernetesJobInstanceService.selectCurrent(wsFlinkKubernetesJobId);
if (optional.isPresent()) {
return new ResponseEntity<>(ResponseVO.success(optional.get()), HttpStatus.OK);
Expand All @@ -191,10 +193,21 @@ public ResponseEntity<ResponseVO<Object>> instanceAsYaml(@PathVariable("id") Lon
return new ResponseEntity(ResponseVO.success(dto), HttpStatus.OK);
}

@Logging
@GetMapping("/instances/status/asYaml/{id}")
@Operation(summary = "查询 YAML 格式 Job 状态", description = "查询 YAML 格式 Job 状态")
public ResponseEntity<ResponseVO<String>> instanceStatusAsYaml(@PathVariable("id") Long id) {
Optional<GenericKubernetesResource> optional = wsFlinkKubernetesJobInstanceService.getStatusWithoutManagedFields(id);
if (optional.isPresent()) {
return new ResponseEntity(ResponseVO.success(Serialization.asYaml(optional.get())), HttpStatus.OK);
}
return new ResponseEntity(ResponseVO.success(), HttpStatus.OK);
}

@Logging
@GetMapping("/instances/savepoint")
@Operation(summary = "查询 Job 实例 savepoint", description = "查询 Job 实例 savepoint")
public ResponseEntity<Page<WsFlinkKubernetesJobInstanceSavepointDTO>> getSavepoint(@Valid WsFlinkKubernetesJobInstanceSavepointListParam param) throws Exception {
public ResponseEntity<Page<WsFlinkKubernetesJobInstanceSavepointDTO>> getSavepoint(@Valid WsFlinkKubernetesJobInstanceSavepointListParam param) {
Page<WsFlinkKubernetesJobInstanceSavepointDTO> result = wsFlinkKubernetesJobInstanceService.selectSavepoint(param);
return new ResponseEntity(result, HttpStatus.OK);
}
Expand Down
25 changes: 25 additions & 0 deletions scaleph-api/src/main/resources/application.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
pekko {
log-config-on-start = on
actor {
provider = "cluster"
default-dispatcher {
fork-join-executor {
parallelism-min = 8
parallelism-factor = 3.0
parallelism-max = 128
}
}
}

cluster {
log-info = on
log-info-verbose = on
min-nr-of-members = 1
}

serialization.jackson {
deserialization-features {
FAIL_ON_UNKNOWN_PROPERTIES = off
}
}
}
4 changes: 2 additions & 2 deletions scaleph-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@
</dependency>

<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor-typed_${scala.binary.version}</artifactId>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-actor-typed_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,15 @@ public enum Constants {
/**
* 验证码 key
*/
public static final String AUTH_CODE_KEY = "authCode-key_";
public static final String AUTH_CODE_KEY = "authCode-key:";
/**
* 在线用户TOKEN标识
*/
public static final String ONLINE_TOKEN_KEY = "online-token_";
public static final String ONLINE_TOKEN_KEY = "online-token:";
/**
* 在线用户标识
*/
public static final String ONLINE_USER_KEY = "online-user_";
public static final String ONLINE_USER_KEY = "online-user:";
/**
* 用户token key
*/
Expand All @@ -71,30 +71,8 @@ public enum Constants {
/**
* schedule job and group
*/
public static final String JOB_PREFIX = "job-";
public static final String JOB_GROUP_PREFIX = "jobGrp-";
public static final String TRIGGER_PREFIX = "trigger-";
public static final String TRIGGER_GROUP_PREFIX = "triggerGrp-";
public static final String INTERNAL_GROUP = "sysInternal";
public static final String JOB_LOG_KEY = "traceLog";
public static final String ETL_JOB_PREFIX = "job-";
public static final String CRON_EVERY_THREE_SECONDS = "/3 * * * * ? ";
public static final String JOB_PARAM_JOB_INFO = "JOB_INFO";
public static final String JOB_PARAM_PROJECT_INFO = "PROJECT_INFO";
/**
* 作业流程步骤属性相关
*/
public static final String CLUSTER_DEPLOY_TARGET = "deploy_target";
public static final String JOB_ID = "jobId";
public static final String JOB_NAME = "job.name";
public static final String JOB_STEP_CODE = "stepCode";
public static final String JOB_STEP_TITLE = "stepTitle";
public static final String JOB_STEP_ATTRS = "stepAttrs";
public static final String JOB_GRAPH = "jobGraph";
public static final String JOB_STEP_ATTR_DATASOURCE = "dataSource";
public static final String JOB_STEP_ATTR_DRIVER = "driver";
public static final String JOB_STEP_ATTR_URL = "url";
public static final String JOB_STEP_ATTR_USERNAME = "username";
public static final String JOB_STEP_ATTR_PASSWORD = "password";

public static final String JOB_PREFIX = "job";
public static final String JOB_GROUP_PREFIX = "jobGrp";
public static final String TRIGGER_PREFIX = "trigger";
public static final String TRIGGER_GROUP_PREFIX = "triggerGrp";
}
Loading

0 comments on commit cff0451

Please sign in to comment.