Skip to content

Commit

Permalink
[Feature][scaleph-ui-react] add flink-cdc web (#688)
Browse files Browse the repository at this point in the history
* feature: update dataservice config web

* feature: update sql script

* feature: add flink-cdc web

---------

Co-authored-by: wangqi <wangqi@xinxuan.net>
  • Loading branch information
kalencaya and wangqi authored Feb 11, 2024
1 parent a48db8d commit 59229ff
Show file tree
Hide file tree
Showing 43 changed files with 1,780 additions and 10 deletions.
17 changes: 17 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@
<kubernetes.client.version>6.2.0</kubernetes.client.version>
<spring-cloud-openfeign.version>3.1.7</spring-cloud-openfeign.version>
<zjsonpatch.version>0.4.14</zjsonpatch.version>
<kogito.version>2.44.0.Alpha</kogito.version>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -299,6 +300,11 @@
<artifactId>scaleph-engine-seatunnel</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>scaleph-engine-flink-cdc</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>scaleph-engine-flink-kubernetes</artifactId>
Expand Down Expand Up @@ -712,6 +718,17 @@
<artifactId>zjsonpatch</artifactId>
<version>${zjsonpatch.version}</version>
</dependency>

<dependency>
<groupId>org.kie.kogito</groupId>
<artifactId>kogito-spring-boot-starter</artifactId>
<version>${kogito.version}</version>
</dependency>
<dependency>
<groupId>org.kie.kogito</groupId>
<artifactId>kogito-serverless-workflow-runtime</artifactId>
<version>${kogito.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.sliew.scaleph.api.controller.ws;

import cn.sliew.scaleph.api.annotation.Logging;
import cn.sliew.scaleph.common.exception.ScalephException;
import cn.sliew.scaleph.dag.xflow.dnd.DndDTO;
import cn.sliew.scaleph.engine.flink.cdc.service.FlinkCDCDagService;
import cn.sliew.scaleph.engine.flink.cdc.service.FlinkCDCJobService;
import cn.sliew.scaleph.engine.flink.cdc.service.dto.WsFlinkArtifactCDCDTO;
import cn.sliew.scaleph.engine.flink.cdc.service.param.WsFlinkArtifactCDCAddParam;
import cn.sliew.scaleph.engine.flink.cdc.service.param.WsFlinkArtifactCDCListParam;
import cn.sliew.scaleph.engine.flink.cdc.service.param.WsFlinkArtifactCDCSelectListParam;
import cn.sliew.scaleph.engine.flink.cdc.service.param.WsFlinkArtifactCDCUpdateParam;
import cn.sliew.scaleph.plugin.framework.exception.PluginException;
import cn.sliew.scaleph.system.model.ResponseVO;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;

import javax.validation.Valid;
import java.util.List;

@Tag(name = "Flink CDC")
@RestController
@RequestMapping(path = "/api/flink-cdc")
public class WsFlinkCDCController {

@Autowired
private FlinkCDCDagService flinkCDCDagService;
@Autowired
private FlinkCDCJobService flinkCDCJobService;

@Logging
@GetMapping("/dag/dnd")
@Operation(summary = "查询DAG节点元信息", description = "后端统一返回节点信息")
public ResponseEntity<ResponseVO<List<DndDTO>>> loadNodeMeta() throws PluginException {
List<DndDTO> dnds = flinkCDCDagService.getDnds();
return new ResponseEntity<>(ResponseVO.success(dnds), HttpStatus.OK);
}

@Logging
@GetMapping
@Operation(summary = "查询 fink cdc 列表", description = "分页查询 fink cdc 列表")
public ResponseEntity<Page<WsFlinkArtifactCDCDTO>> listJob(@Valid WsFlinkArtifactCDCListParam param) {
Page<WsFlinkArtifactCDCDTO> page = flinkCDCJobService.listByPage(param);
return new ResponseEntity<>(page, HttpStatus.OK);
}

@Logging
@GetMapping("/all")
@Operation(summary = "查询 fink cdc 列表", description = "查询 fink cdc 列表")
public ResponseEntity<List<WsFlinkArtifactCDCDTO>> listAll(@Valid WsFlinkArtifactCDCSelectListParam param) {
List<WsFlinkArtifactCDCDTO> result = flinkCDCJobService.listAll(param);
return new ResponseEntity<>(result, HttpStatus.OK);
}

@Logging
@PutMapping
@Operation(summary = "新增 fink cdc", description = "新增 fink cdc,不涉及 DAG")
public ResponseEntity<ResponseVO<WsFlinkArtifactCDCDTO>> simpleAddJob(@Validated @RequestBody WsFlinkArtifactCDCAddParam param) {
WsFlinkArtifactCDCDTO wsFlinkArtifactCDCDTO = flinkCDCJobService.insert(param);
return new ResponseEntity<>(ResponseVO.success(wsFlinkArtifactCDCDTO), HttpStatus.CREATED);
}

@Logging
@PostMapping
@Operation(summary = "修改 fink cdc", description = "只修改 fink cdc 属性,不涉及 DAG")
public ResponseEntity<ResponseVO> simpleEditJob(@Validated @RequestBody WsFlinkArtifactCDCUpdateParam param) {
flinkCDCJobService.update(param);
return new ResponseEntity<>(ResponseVO.success(), HttpStatus.OK);
}

@Logging
@DeleteMapping("{id}")
@Operation(summary = "删除 fink cdc", description = "删除 fink cdc")
public ResponseEntity<ResponseVO> deleteJob(@PathVariable("id") Long id) throws ScalephException {
flinkCDCJobService.delete(id);
return new ResponseEntity<>(ResponseVO.success(), HttpStatus.OK);
}

@Logging
@DeleteMapping("batch")
@Operation(summary = "批量删除 fink cdc", description = "批量删除 fink cdc")
public ResponseEntity<ResponseVO> deleteBatch(@RequestBody List<Long> ids) throws ScalephException {
flinkCDCJobService.deleteBatch(ids);
return new ResponseEntity<>(ResponseVO.success(), HttpStatus.OK);
}

@Logging
@DeleteMapping("all")
@Operation(summary = "批量删除 fink cdc", description = "批量删除 fink cdc")
public ResponseEntity<ResponseVO> deleteAll(@RequestParam("flinkArtifactId") Long flinkArtifactId) throws ScalephException {
flinkCDCJobService.deleteAll(flinkArtifactId);
return new ResponseEntity<>(ResponseVO.success(), HttpStatus.OK);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import cn.sliew.scaleph.common.dict.dataservice.QueryType;
import cn.sliew.scaleph.common.dict.ds.RedisMode;
import cn.sliew.scaleph.common.dict.flink.*;
import cn.sliew.scaleph.common.dict.flink.cdc.FlinkCDCVersion;
import cn.sliew.scaleph.common.dict.flink.kubernetes.*;
import cn.sliew.scaleph.common.dict.image.ImagePullPolicy;
import cn.sliew.scaleph.common.dict.job.*;
Expand Down Expand Up @@ -110,6 +111,8 @@ public enum DictType implements DictDefinition {

IMAGE_PULL_POLICY("image_pull_policy", "Image Pull Policy", ImagePullPolicy.class),

FLINK_CDC_VERSION("flink_cdc_version", "Flink CDC Version", FlinkCDCVersion.class),

WORKFLOW_TYPE("workflow_type", "Workflow Type", WorkflowType.class),
WORKFLOW_EXECUTE_TYPE("workflow_execute_type", "Workflow Execute Type", WorkflowExecuteType.class),
WORKFLOW_INSTANCE_STATE("workflow_instance_state", "Workflow Instance State", WorkflowInstanceState.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,9 @@ public String getValue() {
public String getLabel() {
return label;
}

public static FlinkCDCVersion current() {
return values()[values().length - 1];
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.sliew.scaleph.dao.entity.master.ws;

import cn.sliew.scaleph.common.dict.common.YesOrNo;
import cn.sliew.scaleph.common.dict.flink.FlinkVersion;
import cn.sliew.scaleph.common.dict.flink.cdc.FlinkCDCVersion;
import cn.sliew.scaleph.dao.entity.BaseDO;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;

/**
* flink artifact cdc
*/
@Data
@TableName("ws_flink_artifact_cdc")
@Schema(name = "WsFlinkArtifactCDC", description = "flink artifact cdc")
public class WsFlinkArtifactCDC extends BaseDO {

private static final long serialVersionUID = 1L;

@Schema(description = "作业artifact id")
@TableField("flink_artifact_id")
private Long flinkArtifactId;

@TableField(exist = false)
private WsFlinkArtifact wsFlinkArtifact;

@Schema(description = "flink版本")
@TableField("flink_version")
private FlinkVersion flinkVersion;

@Schema(description = "作业引擎")
@TableField("flink_cdc_version")
private FlinkCDCVersion flinkCDCVersion;

@TableField("dag_id")
private Long dagId;

@Schema(description = "current artifact")
@TableField("`current`")
private YesOrNo current;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.sliew.scaleph.dao.mapper.master.ws;

import cn.sliew.scaleph.common.dict.flink.FlinkVersion;
import cn.sliew.scaleph.dao.entity.master.ws.WsFlinkArtifactCDC;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.ibatis.annotations.Param;
import org.springframework.stereotype.Repository;

import java.util.List;

/**
* flink artifact cdc Mapper 接口
*/
@Repository
public interface WsFlinkArtifactCDCMapper extends BaseMapper<WsFlinkArtifactCDC> {

Page<WsFlinkArtifactCDC> list(Page<WsFlinkArtifactCDC> page,
@Param("projectId") Long projectId,
@Param("name") String name,
@Param("flinkVersion") FlinkVersion flinkVersion);

List<WsFlinkArtifactCDC> listAll(@Param("projectId") Long projectId,
@Param("name") String name);

WsFlinkArtifactCDC selectOne(@Param("id") Long id);

WsFlinkArtifactCDC selectCurrent(@Param("artifactId") Long artifactId);
}
Loading

0 comments on commit 59229ff

Please sign in to comment.