diff --git a/pom.xml b/pom.xml index 778067f94..4b11b086e 100644 --- a/pom.xml +++ b/pom.xml @@ -169,6 +169,7 @@ 6.2.0 3.1.7 0.4.14 + 2.44.0.Alpha @@ -299,6 +300,11 @@ scaleph-engine-seatunnel ${project.version} + + ${project.groupId} + scaleph-engine-flink-cdc + ${project.version} + ${project.groupId} scaleph-engine-flink-kubernetes @@ -712,6 +718,17 @@ zjsonpatch ${zjsonpatch.version} + + + org.kie.kogito + kogito-spring-boot-starter + ${kogito.version} + + + org.kie.kogito + kogito-serverless-workflow-runtime + ${kogito.version} + diff --git a/scaleph-api/src/main/java/cn/sliew/scaleph/api/controller/ws/WsFlinkCDCController.java b/scaleph-api/src/main/java/cn/sliew/scaleph/api/controller/ws/WsFlinkCDCController.java new file mode 100644 index 000000000..bb8b8baca --- /dev/null +++ b/scaleph-api/src/main/java/cn/sliew/scaleph/api/controller/ws/WsFlinkCDCController.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.api.controller.ws; + +import cn.sliew.scaleph.api.annotation.Logging; +import cn.sliew.scaleph.common.exception.ScalephException; +import cn.sliew.scaleph.dag.xflow.dnd.DndDTO; +import cn.sliew.scaleph.engine.flink.cdc.service.FlinkCDCDagService; +import cn.sliew.scaleph.engine.flink.cdc.service.FlinkCDCJobService; +import cn.sliew.scaleph.engine.flink.cdc.service.dto.WsFlinkArtifactCDCDTO; +import cn.sliew.scaleph.engine.flink.cdc.service.param.WsFlinkArtifactCDCAddParam; +import cn.sliew.scaleph.engine.flink.cdc.service.param.WsFlinkArtifactCDCListParam; +import cn.sliew.scaleph.engine.flink.cdc.service.param.WsFlinkArtifactCDCSelectListParam; +import cn.sliew.scaleph.engine.flink.cdc.service.param.WsFlinkArtifactCDCUpdateParam; +import cn.sliew.scaleph.plugin.framework.exception.PluginException; +import cn.sliew.scaleph.system.model.ResponseVO; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.tags.Tag; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.validation.annotation.Validated; +import org.springframework.web.bind.annotation.*; + +import javax.validation.Valid; +import java.util.List; + +@Tag(name = "Flink CDC") +@RestController +@RequestMapping(path = "/api/flink-cdc") +public class WsFlinkCDCController { + + @Autowired + private FlinkCDCDagService flinkCDCDagService; + @Autowired + private FlinkCDCJobService flinkCDCJobService; + + @Logging + @GetMapping("/dag/dnd") + @Operation(summary = "查询DAG节点元信息", description = "后端统一返回节点信息") + public ResponseEntity>> loadNodeMeta() throws PluginException { + List dnds = flinkCDCDagService.getDnds(); + return new ResponseEntity<>(ResponseVO.success(dnds), HttpStatus.OK); + } + + @Logging + @GetMapping + @Operation(summary = "查询 fink cdc 列表", description = "分页查询 fink cdc 列表") + public ResponseEntity> listJob(@Valid WsFlinkArtifactCDCListParam param) { + Page page = flinkCDCJobService.listByPage(param); + return new ResponseEntity<>(page, HttpStatus.OK); + } + + @Logging + @GetMapping("/all") + @Operation(summary = "查询 fink cdc 列表", description = "查询 fink cdc 列表") + public ResponseEntity> listAll(@Valid WsFlinkArtifactCDCSelectListParam param) { + List result = flinkCDCJobService.listAll(param); + return new ResponseEntity<>(result, HttpStatus.OK); + } + + @Logging + @PutMapping + @Operation(summary = "新增 fink cdc", description = "新增 fink cdc,不涉及 DAG") + public ResponseEntity> simpleAddJob(@Validated @RequestBody WsFlinkArtifactCDCAddParam param) { + WsFlinkArtifactCDCDTO wsFlinkArtifactCDCDTO = flinkCDCJobService.insert(param); + return new ResponseEntity<>(ResponseVO.success(wsFlinkArtifactCDCDTO), HttpStatus.CREATED); + } + + @Logging + @PostMapping + @Operation(summary = "修改 fink cdc", description = "只修改 fink cdc 属性,不涉及 DAG") + public ResponseEntity simpleEditJob(@Validated @RequestBody WsFlinkArtifactCDCUpdateParam param) { + flinkCDCJobService.update(param); + return new ResponseEntity<>(ResponseVO.success(), HttpStatus.OK); + } + + @Logging + @DeleteMapping("{id}") + @Operation(summary = "删除 fink cdc", description = "删除 fink cdc") + public ResponseEntity deleteJob(@PathVariable("id") Long id) throws ScalephException { + flinkCDCJobService.delete(id); + return new ResponseEntity<>(ResponseVO.success(), HttpStatus.OK); + } + + @Logging + @DeleteMapping("batch") + @Operation(summary = "批量删除 fink cdc", description = "批量删除 fink cdc") + public ResponseEntity deleteBatch(@RequestBody List ids) throws ScalephException { + flinkCDCJobService.deleteBatch(ids); + return new ResponseEntity<>(ResponseVO.success(), HttpStatus.OK); + } + + @Logging + @DeleteMapping("all") + @Operation(summary = "批量删除 fink cdc", description = "批量删除 fink cdc") + public ResponseEntity deleteAll(@RequestParam("flinkArtifactId") Long flinkArtifactId) throws ScalephException { + flinkCDCJobService.deleteAll(flinkArtifactId); + return new ResponseEntity<>(ResponseVO.success(), HttpStatus.OK); + } +} \ No newline at end of file diff --git a/scaleph-common/src/main/java/cn/sliew/scaleph/common/dict/DictType.java b/scaleph-common/src/main/java/cn/sliew/scaleph/common/dict/DictType.java index 89d48a574..cf84b57e1 100644 --- a/scaleph-common/src/main/java/cn/sliew/scaleph/common/dict/DictType.java +++ b/scaleph-common/src/main/java/cn/sliew/scaleph/common/dict/DictType.java @@ -27,6 +27,7 @@ import cn.sliew.scaleph.common.dict.dataservice.QueryType; import cn.sliew.scaleph.common.dict.ds.RedisMode; import cn.sliew.scaleph.common.dict.flink.*; +import cn.sliew.scaleph.common.dict.flink.cdc.FlinkCDCVersion; import cn.sliew.scaleph.common.dict.flink.kubernetes.*; import cn.sliew.scaleph.common.dict.image.ImagePullPolicy; import cn.sliew.scaleph.common.dict.job.*; @@ -110,6 +111,8 @@ public enum DictType implements DictDefinition { IMAGE_PULL_POLICY("image_pull_policy", "Image Pull Policy", ImagePullPolicy.class), + FLINK_CDC_VERSION("flink_cdc_version", "Flink CDC Version", FlinkCDCVersion.class), + WORKFLOW_TYPE("workflow_type", "Workflow Type", WorkflowType.class), WORKFLOW_EXECUTE_TYPE("workflow_execute_type", "Workflow Execute Type", WorkflowExecuteType.class), WORKFLOW_INSTANCE_STATE("workflow_instance_state", "Workflow Instance State", WorkflowInstanceState.class), diff --git a/scaleph-common/src/main/java/cn/sliew/scaleph/common/dict/flink/cdc/FlinkCDCVersion.java b/scaleph-common/src/main/java/cn/sliew/scaleph/common/dict/flink/cdc/FlinkCDCVersion.java index 0415093cb..cd4be89d6 100644 --- a/scaleph-common/src/main/java/cn/sliew/scaleph/common/dict/flink/cdc/FlinkCDCVersion.java +++ b/scaleph-common/src/main/java/cn/sliew/scaleph/common/dict/flink/cdc/FlinkCDCVersion.java @@ -56,4 +56,9 @@ public String getValue() { public String getLabel() { return label; } + + public static FlinkCDCVersion current() { + return values()[values().length - 1]; + } + } diff --git a/scaleph-dao/src/main/java/cn/sliew/scaleph/dao/entity/master/ws/WsFlinkArtifactCDC.java b/scaleph-dao/src/main/java/cn/sliew/scaleph/dao/entity/master/ws/WsFlinkArtifactCDC.java new file mode 100644 index 000000000..38bf81614 --- /dev/null +++ b/scaleph-dao/src/main/java/cn/sliew/scaleph/dao/entity/master/ws/WsFlinkArtifactCDC.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.dao.entity.master.ws; + +import cn.sliew.scaleph.common.dict.common.YesOrNo; +import cn.sliew.scaleph.common.dict.flink.FlinkVersion; +import cn.sliew.scaleph.common.dict.flink.cdc.FlinkCDCVersion; +import cn.sliew.scaleph.dao.entity.BaseDO; +import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableName; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; + +/** + * flink artifact cdc + */ +@Data +@TableName("ws_flink_artifact_cdc") +@Schema(name = "WsFlinkArtifactCDC", description = "flink artifact cdc") +public class WsFlinkArtifactCDC extends BaseDO { + + private static final long serialVersionUID = 1L; + + @Schema(description = "作业artifact id") + @TableField("flink_artifact_id") + private Long flinkArtifactId; + + @TableField(exist = false) + private WsFlinkArtifact wsFlinkArtifact; + + @Schema(description = "flink版本") + @TableField("flink_version") + private FlinkVersion flinkVersion; + + @Schema(description = "作业引擎") + @TableField("flink_cdc_version") + private FlinkCDCVersion flinkCDCVersion; + + @TableField("dag_id") + private Long dagId; + + @Schema(description = "current artifact") + @TableField("`current`") + private YesOrNo current; +} diff --git a/scaleph-dao/src/main/java/cn/sliew/scaleph/dao/mapper/master/ws/WsFlinkArtifactCDCMapper.java b/scaleph-dao/src/main/java/cn/sliew/scaleph/dao/mapper/master/ws/WsFlinkArtifactCDCMapper.java new file mode 100644 index 000000000..c15fd2ccf --- /dev/null +++ b/scaleph-dao/src/main/java/cn/sliew/scaleph/dao/mapper/master/ws/WsFlinkArtifactCDCMapper.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.dao.mapper.master.ws; + +import cn.sliew.scaleph.common.dict.flink.FlinkVersion; +import cn.sliew.scaleph.dao.entity.master.ws.WsFlinkArtifactCDC; +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import org.apache.ibatis.annotations.Param; +import org.springframework.stereotype.Repository; + +import java.util.List; + +/** + * flink artifact cdc Mapper 接口 + */ +@Repository +public interface WsFlinkArtifactCDCMapper extends BaseMapper { + + Page list(Page page, + @Param("projectId") Long projectId, + @Param("name") String name, + @Param("flinkVersion") FlinkVersion flinkVersion); + + List listAll(@Param("projectId") Long projectId, + @Param("name") String name); + + WsFlinkArtifactCDC selectOne(@Param("id") Long id); + + WsFlinkArtifactCDC selectCurrent(@Param("artifactId") Long artifactId); +} diff --git a/scaleph-dao/src/main/resources/cn/sliew/scaleph/dao/mapper/master/ws/WsFlinkArtifactCDCMapper.xml b/scaleph-dao/src/main/resources/cn/sliew/scaleph/dao/mapper/master/ws/WsFlinkArtifactCDCMapper.xml new file mode 100644 index 000000000..bc4a74e68 --- /dev/null +++ b/scaleph-dao/src/main/resources/cn/sliew/scaleph/dao/mapper/master/ws/WsFlinkArtifactCDCMapper.xml @@ -0,0 +1,105 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + id, + creator, + create_time, + editor, + update_time, + flink_artifact_id, flink_version, flink_cdc_version, dag_id, `current` + + + + + + + + + + + diff --git a/scaleph-engine/pom.xml b/scaleph-engine/pom.xml index 9e6915302..f2b1e0652 100644 --- a/scaleph-engine/pom.xml +++ b/scaleph-engine/pom.xml @@ -39,6 +39,7 @@ scaleph-sql-template scaleph-engine-flink-kubernetes scaleph-engine-sql-gateway + scaleph-engine-flink-cdc \ No newline at end of file diff --git a/scaleph-engine/scaleph-engine-flink-cdc/pom.xml b/scaleph-engine/scaleph-engine-flink-cdc/pom.xml new file mode 100644 index 000000000..962708066 --- /dev/null +++ b/scaleph-engine/scaleph-engine-flink-cdc/pom.xml @@ -0,0 +1,49 @@ + + + + + 4.0.0 + + cn.sliew + scaleph-engine + 2.0.3-SNAPSHOT + ../pom.xml + + scaleph-engine-flink-cdc + + + + ${project.parent.groupId} + scaleph-privilege + + + ${project.parent.groupId} + scaleph-dag + + + ${project.parent.groupId} + scaleph-project + + + ${project.parent.groupId} + scaleph-plugin-flinkcdc + + + \ No newline at end of file diff --git a/scaleph-engine/scaleph-engine-flink-cdc/src/main/java/cn/sliew/scaleph/engine/flink/cdc/dag/dnd/FlinkCDCDagDndDTO.java b/scaleph-engine/scaleph-engine-flink-cdc/src/main/java/cn/sliew/scaleph/engine/flink/cdc/dag/dnd/FlinkCDCDagDndDTO.java new file mode 100644 index 000000000..251e62914 --- /dev/null +++ b/scaleph-engine/scaleph-engine-flink-cdc/src/main/java/cn/sliew/scaleph/engine/flink/cdc/dag/dnd/FlinkCDCDagDndDTO.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.engine.flink.cdc.dag.dnd; + +import cn.sliew.scaleph.dag.xflow.dnd.DndDTO; +import lombok.Data; + +@Data +public class FlinkCDCDagDndDTO extends DndDTO { + +} diff --git a/scaleph-engine/scaleph-engine-flink-cdc/src/main/java/cn/sliew/scaleph/engine/flink/cdc/dag/dnd/FlinkCDCDagDndMeta.java b/scaleph-engine/scaleph-engine-flink-cdc/src/main/java/cn/sliew/scaleph/engine/flink/cdc/dag/dnd/FlinkCDCDagDndMeta.java new file mode 100644 index 000000000..f5567ffb8 --- /dev/null +++ b/scaleph-engine/scaleph-engine-flink-cdc/src/main/java/cn/sliew/scaleph/engine/flink/cdc/dag/dnd/FlinkCDCDagDndMeta.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.engine.flink.cdc.dag.dnd; + +import cn.sliew.scaleph.dag.xflow.dnd.DndMeta; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; + +@Data +@Schema(name = "Flink CDC 节点元数据", description = "Flink CDC 节点元数据") +public class FlinkCDCDagDndMeta extends DndMeta { + + /** + * @see cn.sliew.scaleph.common.dict.flink.cdc.FlinkCDCPluginName + */ + @Schema(description = "connector name") + private String name; + + /** + * @see cn.sliew.scaleph.common.dict.flink.cdc.FlinkCDCPluginType + */ + @Schema(description = "connector type") + private String type; +} diff --git a/scaleph-engine/scaleph-engine-flink-cdc/src/main/java/cn/sliew/scaleph/engine/flink/cdc/service/FlinkCDCConnectorService.java b/scaleph-engine/scaleph-engine-flink-cdc/src/main/java/cn/sliew/scaleph/engine/flink/cdc/service/FlinkCDCConnectorService.java new file mode 100644 index 000000000..214b4d066 --- /dev/null +++ b/scaleph-engine/scaleph-engine-flink-cdc/src/main/java/cn/sliew/scaleph/engine/flink/cdc/service/FlinkCDCConnectorService.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.engine.flink.cdc.service; + +import cn.sliew.scaleph.common.dict.flink.cdc.FlinkCDCPluginName; +import cn.sliew.scaleph.common.dict.flink.cdc.FlinkCDCPluginType; +import cn.sliew.scaleph.plugin.flink.cdc.FlinkCDCPipilineConnectorPlugin; +import cn.sliew.scaleph.plugin.framework.core.PluginInfo; +import cn.sliew.scaleph.plugin.framework.exception.PluginException; + +import java.util.Properties; +import java.util.Set; + +public interface FlinkCDCConnectorService { + + Set getAvailableConnectors(FlinkCDCPluginType type); + + FlinkCDCPipilineConnectorPlugin getConnector(PluginInfo pluginInfo) throws PluginException; + + FlinkCDCPipilineConnectorPlugin getConnector(FlinkCDCPluginType type, FlinkCDCPluginName name) throws PluginException; + + FlinkCDCPipilineConnectorPlugin newConnector(String name, Properties properties); + +} diff --git a/scaleph-engine/scaleph-engine-flink-cdc/src/main/java/cn/sliew/scaleph/engine/flink/cdc/service/FlinkCDCDagService.java b/scaleph-engine/scaleph-engine-flink-cdc/src/main/java/cn/sliew/scaleph/engine/flink/cdc/service/FlinkCDCDagService.java new file mode 100644 index 000000000..355593a5c --- /dev/null +++ b/scaleph-engine/scaleph-engine-flink-cdc/src/main/java/cn/sliew/scaleph/engine/flink/cdc/service/FlinkCDCDagService.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.engine.flink.cdc.service; + +import cn.sliew.scaleph.dag.service.DagDndService; +import cn.sliew.scaleph.dag.xflow.dnd.DndDTO; + +import java.util.List; + +public interface FlinkCDCDagService extends DagDndService { + + Long initialize(); + + void destroy(Long dagId); + + Object getDag(Long dagId); + + void update(Object param); + + List getDnds(); +} diff --git a/scaleph-engine/scaleph-engine-flink-cdc/src/main/java/cn/sliew/scaleph/engine/flink/cdc/service/FlinkCDCJobService.java b/scaleph-engine/scaleph-engine-flink-cdc/src/main/java/cn/sliew/scaleph/engine/flink/cdc/service/FlinkCDCJobService.java new file mode 100644 index 000000000..61d75bf97 --- /dev/null +++ b/scaleph-engine/scaleph-engine-flink-cdc/src/main/java/cn/sliew/scaleph/engine/flink/cdc/service/FlinkCDCJobService.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.engine.flink.cdc.service; + +import cn.sliew.scaleph.common.exception.ScalephException; +import cn.sliew.scaleph.engine.flink.cdc.service.dto.WsFlinkArtifactCDCDTO; +import cn.sliew.scaleph.engine.flink.cdc.service.param.WsFlinkArtifactCDCAddParam; +import cn.sliew.scaleph.engine.flink.cdc.service.param.WsFlinkArtifactCDCListParam; +import cn.sliew.scaleph.engine.flink.cdc.service.param.WsFlinkArtifactCDCSelectListParam; +import cn.sliew.scaleph.engine.flink.cdc.service.param.WsFlinkArtifactCDCUpdateParam; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; + +import java.util.List; + +public interface FlinkCDCJobService { + + Page listByPage(WsFlinkArtifactCDCListParam param); + + List listAll(WsFlinkArtifactCDCSelectListParam param); + + List listAllByArtifact(Long artifactId); + + WsFlinkArtifactCDCDTO selectOne(Long id); + + WsFlinkArtifactCDCDTO selectCurrent(Long artifactId); + + String preview(Long id) throws Exception; + + WsFlinkArtifactCDCDTO insert(WsFlinkArtifactCDCAddParam param); + + int update(WsFlinkArtifactCDCUpdateParam param); + + int delete(Long id) throws ScalephException; + + int deleteBatch(List ids) throws ScalephException; + + int deleteAll(Long flinkArtifactId) throws ScalephException; +} diff --git a/scaleph-engine/scaleph-engine-flink-cdc/src/main/java/cn/sliew/scaleph/engine/flink/cdc/service/convert/WsFlinkArtifactCDCConvert.java b/scaleph-engine/scaleph-engine-flink-cdc/src/main/java/cn/sliew/scaleph/engine/flink/cdc/service/convert/WsFlinkArtifactCDCConvert.java new file mode 100644 index 000000000..6846fa2ac --- /dev/null +++ b/scaleph-engine/scaleph-engine-flink-cdc/src/main/java/cn/sliew/scaleph/engine/flink/cdc/service/convert/WsFlinkArtifactCDCConvert.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.engine.flink.cdc.service.convert; + +import cn.sliew.scaleph.common.convert.BaseConvert; +import cn.sliew.scaleph.dao.entity.master.ws.WsFlinkArtifactCDC; +import cn.sliew.scaleph.engine.flink.cdc.service.dto.WsFlinkArtifactCDCDTO; +import cn.sliew.scaleph.project.service.convert.WsFlinkArtifactConvert; +import org.mapstruct.Mapper; +import org.mapstruct.ReportingPolicy; +import org.mapstruct.factory.Mappers; +import org.springframework.beans.BeanUtils; + +@Mapper(unmappedTargetPolicy = ReportingPolicy.IGNORE) +public interface WsFlinkArtifactCDCConvert extends BaseConvert { + WsFlinkArtifactCDCConvert INSTANCE = Mappers.getMapper(WsFlinkArtifactCDCConvert.class); + + @Override + default WsFlinkArtifactCDC toDo(WsFlinkArtifactCDCDTO dto) { + WsFlinkArtifactCDC entity = new WsFlinkArtifactCDC(); + BeanUtils.copyProperties(dto, entity); + entity.setWsFlinkArtifact(WsFlinkArtifactConvert.INSTANCE.toDo(dto.getWsFlinkArtifact())); + entity.setFlinkArtifactId(dto.getWsFlinkArtifact().getId()); + return entity; + } + + @Override + default WsFlinkArtifactCDCDTO toDto(WsFlinkArtifactCDC entity) { + WsFlinkArtifactCDCDTO dto = new WsFlinkArtifactCDCDTO(); + BeanUtils.copyProperties(entity, dto); + dto.setWsFlinkArtifact(WsFlinkArtifactConvert.INSTANCE.toDto(entity.getWsFlinkArtifact())); + return dto; + } +} diff --git a/scaleph-engine/scaleph-engine-flink-cdc/src/main/java/cn/sliew/scaleph/engine/flink/cdc/service/dto/WsFlinkArtifactCDCDTO.java b/scaleph-engine/scaleph-engine-flink-cdc/src/main/java/cn/sliew/scaleph/engine/flink/cdc/service/dto/WsFlinkArtifactCDCDTO.java new file mode 100644 index 000000000..fa67d75fd --- /dev/null +++ b/scaleph-engine/scaleph-engine-flink-cdc/src/main/java/cn/sliew/scaleph/engine/flink/cdc/service/dto/WsFlinkArtifactCDCDTO.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.engine.flink.cdc.service.dto; + +import cn.sliew.scaleph.common.dict.common.YesOrNo; +import cn.sliew.scaleph.common.dict.flink.FlinkVersion; +import cn.sliew.scaleph.common.dict.flink.cdc.FlinkCDCVersion; +import cn.sliew.scaleph.project.service.dto.WsFlinkArtifactDTO; +import cn.sliew.scaleph.system.model.BaseDTO; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; + +@Data +@Schema(name = "WsFlinkArtifactCDC对象", description = "flink artifact cdc") +public class WsFlinkArtifactCDCDTO extends BaseDTO { + + @Schema(description = "作业artifact") + private WsFlinkArtifactDTO wsFlinkArtifact; + + @Schema(description = "flink版本") + private FlinkVersion flinkVersion; + + @Schema(description = "flink cdc 版本") + private FlinkCDCVersion flinkCDCVersion; + + @Schema(description = "dag id") + private Long dagId; + + @Schema(description = "`current`") + private YesOrNo current; +} diff --git a/scaleph-engine/scaleph-engine-flink-cdc/src/main/java/cn/sliew/scaleph/engine/flink/cdc/service/impl/FlinkCDCConnectorServiceImpl.java b/scaleph-engine/scaleph-engine-flink-cdc/src/main/java/cn/sliew/scaleph/engine/flink/cdc/service/impl/FlinkCDCConnectorServiceImpl.java new file mode 100644 index 000000000..39f19875f --- /dev/null +++ b/scaleph-engine/scaleph-engine-flink-cdc/src/main/java/cn/sliew/scaleph/engine/flink/cdc/service/impl/FlinkCDCConnectorServiceImpl.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.engine.flink.cdc.service.impl; + +import cn.sliew.scaleph.common.dict.flink.cdc.FlinkCDCPluginName; +import cn.sliew.scaleph.common.dict.flink.cdc.FlinkCDCPluginType; +import cn.sliew.scaleph.engine.flink.cdc.service.FlinkCDCConnectorService; +import cn.sliew.scaleph.plugin.flink.cdc.FlinkCDCPipelineConnectorManager; +import cn.sliew.scaleph.plugin.flink.cdc.FlinkCDCPipilineConnectorPlugin; +import cn.sliew.scaleph.plugin.framework.core.PluginInfo; +import cn.sliew.scaleph.plugin.framework.exception.PluginException; +import org.springframework.stereotype.Service; + +import java.util.Properties; +import java.util.Set; + +@Service +public class FlinkCDCConnectorServiceImpl implements FlinkCDCConnectorService { + + private final FlinkCDCPipelineConnectorManager connectorManager = new FlinkCDCPipelineConnectorManager(); + + @Override + public Set getAvailableConnectors(FlinkCDCPluginType type) { + return connectorManager.getAvailableConnectors(type); + } + + @Override + public FlinkCDCPipilineConnectorPlugin getConnector(PluginInfo pluginInfo) throws PluginException { + return connectorManager.getConnector(pluginInfo); + } + + @Override + public FlinkCDCPipilineConnectorPlugin getConnector(FlinkCDCPluginType type, FlinkCDCPluginName name) throws PluginException { + return connectorManager.getConnector(type, name); + } + + @Override + public FlinkCDCPipilineConnectorPlugin newConnector(String name, Properties properties) { + return connectorManager.newConnector(name, properties); + } +} diff --git a/scaleph-engine/scaleph-engine-flink-cdc/src/main/java/cn/sliew/scaleph/engine/flink/cdc/service/impl/FlinkCDCDagServiceImpl.java b/scaleph-engine/scaleph-engine-flink-cdc/src/main/java/cn/sliew/scaleph/engine/flink/cdc/service/impl/FlinkCDCDagServiceImpl.java new file mode 100644 index 000000000..bc5c5793b --- /dev/null +++ b/scaleph-engine/scaleph-engine-flink-cdc/src/main/java/cn/sliew/scaleph/engine/flink/cdc/service/impl/FlinkCDCDagServiceImpl.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.engine.flink.cdc.service.impl; + +import cn.sliew.scaleph.common.dict.flink.cdc.FlinkCDCPluginType; +import cn.sliew.scaleph.dag.service.DagService; +import cn.sliew.scaleph.dag.service.param.DagSimpleAddParam; +import cn.sliew.scaleph.dag.xflow.dnd.DndDTO; +import cn.sliew.scaleph.dag.xflow.dnd.DndPortDTO; +import cn.sliew.scaleph.dag.xflow.dnd.DndPortGroupEnum; +import cn.sliew.scaleph.engine.flink.cdc.dag.dnd.FlinkCDCDagDndDTO; +import cn.sliew.scaleph.engine.flink.cdc.dag.dnd.FlinkCDCDagDndMeta; +import cn.sliew.scaleph.engine.flink.cdc.service.FlinkCDCConnectorService; +import cn.sliew.scaleph.engine.flink.cdc.service.FlinkCDCDagService; +import cn.sliew.scaleph.plugin.flink.cdc.FlinkCDCPipilineConnectorPlugin; +import cn.sliew.scaleph.plugin.framework.core.PluginInfo; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.*; + +@Service +public class FlinkCDCDagServiceImpl implements FlinkCDCDagService { + + @Autowired + private DagService dagService; + @Autowired + private FlinkCDCConnectorService flinkCDCConnectorService; + + @Override + public Long initialize() { + return dagService.insert(new DagSimpleAddParam()); + } + + @Override + public void destroy(Long dagId) { + dagService.delete(dagId); + } + + @Override + public Object getDag(Long dagId) { + return null; + } + + @Override + public void update(Object param) { + + } + + @Override + public List getDnds() { + List dnds = new ArrayList(); + for (FlinkCDCPluginType pluginType : FlinkCDCPluginType.values()) { + dnds.add(buildFlinkCDCPluginType(pluginType)); + } + return dnds; + } + + private FlinkCDCDagDndDTO buildFlinkCDCPluginType(FlinkCDCPluginType pluginType) { + FlinkCDCDagDndDTO category = new FlinkCDCDagDndDTO(); + category.setKey(pluginType.getValue()); + category.setTitle(pluginType.getLabel()); + category.setDocString(pluginType.getRemark()); + category.setIsLeaf(false); + List children = new ArrayList(); + Set plugins = flinkCDCConnectorService.getAvailableConnectors(pluginType); + plugins.stream().sorted(Comparator.comparing(plugin -> plugin.getPluginName().ordinal())).forEachOrdered(plugin -> { + FlinkCDCDagDndDTO child = buildFlinkCDCConnector(category, plugin); + FlinkCDCDagDndMeta meta = buildPluginInfo(plugin); + child.setMeta(meta); + List ports; + switch (pluginType) { + case SOURCE: + ports = getSourcePorts(child.getKey()); + break; + case SINK: + ports = getSinkPorts(child.getKey()); + break; + case ROUTE: + ports = getRoutePorts(child.getKey()); + break; + default: + ports = Collections.emptyList(); + } + child.setPorts(ports); + children.add(child); + }); + category.setChildren(children); + return category; + } + + private FlinkCDCDagDndDTO buildFlinkCDCConnector(FlinkCDCDagDndDTO category, FlinkCDCPipilineConnectorPlugin plugin) { + PluginInfo pluginInfo = plugin.getPluginInfo(); + FlinkCDCDagDndDTO node = new FlinkCDCDagDndDTO(); + node.setCategory(category.getKey()); + node.setKey(pluginInfo.getName()); + node.setTitle(plugin.getPluginName().getLabel()); + node.setDocString(pluginInfo.getDescription()); + node.setIsLeaf(true); + return node; + } + + private FlinkCDCDagDndMeta buildPluginInfo(FlinkCDCPipilineConnectorPlugin connector) { + FlinkCDCDagDndMeta meta = new FlinkCDCDagDndMeta(); + meta.setName(connector.getPluginName().getValue()); + meta.setType(connector.getPluginType().getValue()); + return meta; + } + + + private List getSourcePorts(String key) { + List ports = new ArrayList<>(); + DndPortDTO portDTO = new DndPortDTO(); + portDTO.setId(key + "-" + DndPortGroupEnum.bottom.name()); + portDTO.setGroup(DndPortGroupEnum.bottom.name()); + ports.add(portDTO); + return ports; + } + + private List getSinkPorts(String key) { + List ports = new ArrayList<>(); + DndPortDTO portDTO = new DndPortDTO(); + portDTO.setId(key + "-" + DndPortGroupEnum.top.name()); + portDTO.setGroup(DndPortGroupEnum.top.name()); + ports.add(portDTO); + return ports; + } + + private List getRoutePorts(String key) { + List ports = new ArrayList<>(); + DndPortDTO sourcePort = new DndPortDTO(); + sourcePort.setId(key + "-" + DndPortGroupEnum.top.name()); + sourcePort.setGroup(DndPortGroupEnum.top.name()); + ports.add(sourcePort); + + DndPortDTO sinkPort = new DndPortDTO(); + sinkPort.setId(key + "-" + DndPortGroupEnum.bottom.name()); + sinkPort.setGroup(DndPortGroupEnum.bottom.name()); + ports.add(sinkPort); + return ports; + } +} diff --git a/scaleph-engine/scaleph-engine-flink-cdc/src/main/java/cn/sliew/scaleph/engine/flink/cdc/service/impl/FlinkCDCJobServiceImpl.java b/scaleph-engine/scaleph-engine-flink-cdc/src/main/java/cn/sliew/scaleph/engine/flink/cdc/service/impl/FlinkCDCJobServiceImpl.java new file mode 100644 index 000000000..6536ba15a --- /dev/null +++ b/scaleph-engine/scaleph-engine-flink-cdc/src/main/java/cn/sliew/scaleph/engine/flink/cdc/service/impl/FlinkCDCJobServiceImpl.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.engine.flink.cdc.service.impl; + +import cn.sliew.scaleph.common.dict.common.YesOrNo; +import cn.sliew.scaleph.common.dict.flink.FlinkJobType; +import cn.sliew.scaleph.common.dict.flink.cdc.FlinkCDCVersion; +import cn.sliew.scaleph.common.exception.ScalephException; +import cn.sliew.scaleph.dao.entity.master.ws.WsFlinkArtifactCDC; +import cn.sliew.scaleph.dao.mapper.master.ws.WsFlinkArtifactCDCMapper; +import cn.sliew.scaleph.engine.flink.cdc.service.FlinkCDCDagService; +import cn.sliew.scaleph.engine.flink.cdc.service.FlinkCDCJobService; +import cn.sliew.scaleph.engine.flink.cdc.service.convert.WsFlinkArtifactCDCConvert; +import cn.sliew.scaleph.engine.flink.cdc.service.dto.WsFlinkArtifactCDCDTO; +import cn.sliew.scaleph.engine.flink.cdc.service.param.WsFlinkArtifactCDCAddParam; +import cn.sliew.scaleph.engine.flink.cdc.service.param.WsFlinkArtifactCDCListParam; +import cn.sliew.scaleph.engine.flink.cdc.service.param.WsFlinkArtifactCDCSelectListParam; +import cn.sliew.scaleph.engine.flink.cdc.service.param.WsFlinkArtifactCDCUpdateParam; +import cn.sliew.scaleph.project.service.WsFlinkArtifactService; +import cn.sliew.scaleph.project.service.dto.WsFlinkArtifactDTO; +import com.baomidou.mybatisplus.core.toolkit.Wrappers; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.List; + +import static cn.sliew.milky.common.check.Ensures.checkState; + +@Service +public class FlinkCDCJobServiceImpl implements FlinkCDCJobService { + + @Autowired + private WsFlinkArtifactCDCMapper wsFlinkArtifactCDCMapper; + @Autowired + private WsFlinkArtifactService wsFlinkArtifactService; + @Autowired + private FlinkCDCDagService flinkCDCDagService; + + @Override + public Page listByPage(WsFlinkArtifactCDCListParam param) { + Page page = new Page<>(param.getCurrent(), param.getPageSize()); + Page cdcPage = wsFlinkArtifactCDCMapper.list(page, param.getProjectId(), param.getName(), param.getFlinkVersion()); + Page result = + new Page<>(cdcPage.getCurrent(), cdcPage.getSize(), cdcPage.getTotal()); + result.setRecords(WsFlinkArtifactCDCConvert.INSTANCE.toDto(cdcPage.getRecords())); + return result; + } + + @Override + public List listAll(WsFlinkArtifactCDCSelectListParam param) { + List cdcs = wsFlinkArtifactCDCMapper.listAll(param.getProjectId(), param.getName()); + return WsFlinkArtifactCDCConvert.INSTANCE.toDto(cdcs); + } + + @Override + public List listAllByArtifact(Long artifactId) { + List list = wsFlinkArtifactCDCMapper.selectList( + Wrappers.lambdaQuery(WsFlinkArtifactCDC.class) + .eq(WsFlinkArtifactCDC::getFlinkArtifactId, artifactId) + .orderByDesc(WsFlinkArtifactCDC::getId) + ); + return WsFlinkArtifactCDCConvert.INSTANCE.toDto(list); + } + + @Override + public WsFlinkArtifactCDCDTO selectOne(Long id) { + WsFlinkArtifactCDC record = wsFlinkArtifactCDCMapper.selectOne(id); + checkState(record != null, () -> "flink artifact cdc not exists for id: " + id); + return WsFlinkArtifactCDCConvert.INSTANCE.toDto(record); + } + + @Override + public WsFlinkArtifactCDCDTO selectCurrent(Long artifactId) { + WsFlinkArtifactCDC record = wsFlinkArtifactCDCMapper.selectCurrent(artifactId); + return WsFlinkArtifactCDCConvert.INSTANCE.toDto(record); + } + + @Override + public String preview(Long id) throws Exception { + return null; + } + + @Override + public WsFlinkArtifactCDCDTO insert(WsFlinkArtifactCDCAddParam param) { + WsFlinkArtifactDTO flinkArtifact = new WsFlinkArtifactDTO(); + flinkArtifact.setProjectId(param.getProjectId()); + flinkArtifact.setType(FlinkJobType.FLINK_CDC); + flinkArtifact.setName(param.getName()); + flinkArtifact.setRemark(param.getRemark()); + flinkArtifact = wsFlinkArtifactService.insert(flinkArtifact); + WsFlinkArtifactCDC record = new WsFlinkArtifactCDC(); + record.setFlinkArtifactId(flinkArtifact.getId()); + record.setFlinkVersion(param.getFlinkVersion()); + record.setFlinkCDCVersion(FlinkCDCVersion.current()); + record.setDagId(flinkCDCDagService.initialize()); + record.setCurrent(YesOrNo.YES); + wsFlinkArtifactCDCMapper.insert(record); + return selectOne(record.getId()); + } + + @Override + public int update(WsFlinkArtifactCDCUpdateParam param) { + WsFlinkArtifactCDCDTO wsFlinkArtifactCDCDTO = selectOne(param.getId()); + WsFlinkArtifactDTO flinkArtifact = new WsFlinkArtifactDTO(); + flinkArtifact.setId(wsFlinkArtifactCDCDTO.getWsFlinkArtifact().getId()); + flinkArtifact.setName(param.getName()); + flinkArtifact.setRemark(param.getRemark()); + wsFlinkArtifactService.update(flinkArtifact); + + WsFlinkArtifactCDC record = new WsFlinkArtifactCDC(); + record.setId(param.getId()); + record.setFlinkVersion(param.getFlinkVersion()); + record.setCurrent(YesOrNo.YES); + return wsFlinkArtifactCDCMapper.updateById(record); + } + + @Override + public int delete(Long id) throws ScalephException { + WsFlinkArtifactCDCDTO wsFlinkArtifactCDCDTO = selectOne(id); + if (wsFlinkArtifactCDCDTO.getCurrent() == YesOrNo.YES) { + throw new ScalephException("Unsupport delete current flink cdc job"); + } + flinkCDCDagService.destroy(wsFlinkArtifactCDCDTO.getDagId()); + return wsFlinkArtifactCDCMapper.deleteById(id); + } + + @Override + public int deleteBatch(List ids) throws ScalephException { + for (Long id : ids) { + delete(id); + } + return ids.size(); + } + + @Override + public int deleteAll(Long flinkArtifactId) throws ScalephException { + List dtos = listAllByArtifact(flinkArtifactId); + for (WsFlinkArtifactCDCDTO cdc : dtos) { + wsFlinkArtifactCDCMapper.deleteById(cdc.getId()); + } + return wsFlinkArtifactService.deleteById(flinkArtifactId); + } +} diff --git a/scaleph-engine/scaleph-engine-flink-cdc/src/main/java/cn/sliew/scaleph/engine/flink/cdc/service/param/WsFlinkArtifactCDCAddParam.java b/scaleph-engine/scaleph-engine-flink-cdc/src/main/java/cn/sliew/scaleph/engine/flink/cdc/service/param/WsFlinkArtifactCDCAddParam.java new file mode 100644 index 000000000..fda3353fa --- /dev/null +++ b/scaleph-engine/scaleph-engine-flink-cdc/src/main/java/cn/sliew/scaleph/engine/flink/cdc/service/param/WsFlinkArtifactCDCAddParam.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.engine.flink.cdc.service.param; + +import cn.sliew.scaleph.common.dict.flink.FlinkVersion; +import cn.sliew.scaleph.project.service.param.AbstractWsFlinkArtifactAddParam; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; + +@Data +public class WsFlinkArtifactCDCAddParam extends AbstractWsFlinkArtifactAddParam { + + @Schema(description = "flink version") + private FlinkVersion flinkVersion; +} diff --git a/scaleph-engine/scaleph-engine-flink-cdc/src/main/java/cn/sliew/scaleph/engine/flink/cdc/service/param/WsFlinkArtifactCDCGraphParam.java b/scaleph-engine/scaleph-engine-flink-cdc/src/main/java/cn/sliew/scaleph/engine/flink/cdc/service/param/WsFlinkArtifactCDCGraphParam.java new file mode 100644 index 000000000..0bd8115b1 --- /dev/null +++ b/scaleph-engine/scaleph-engine-flink-cdc/src/main/java/cn/sliew/scaleph/engine/flink/cdc/service/param/WsFlinkArtifactCDCGraphParam.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.engine.flink.cdc.service.param; + +import cn.sliew.scaleph.dag.service.vo.DagGraphVO; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; +import lombok.EqualsAndHashCode; + +import javax.validation.constraints.NotNull; + +@Data +@EqualsAndHashCode +public class WsFlinkArtifactCDCGraphParam { + + @NotNull + @Schema(description = "id") + private Long id; + + @NotNull + @Schema(description = "job graph") + private DagGraphVO jobGraph; + +} diff --git a/scaleph-engine/scaleph-engine-flink-cdc/src/main/java/cn/sliew/scaleph/engine/flink/cdc/service/param/WsFlinkArtifactCDCListParam.java b/scaleph-engine/scaleph-engine-flink-cdc/src/main/java/cn/sliew/scaleph/engine/flink/cdc/service/param/WsFlinkArtifactCDCListParam.java new file mode 100644 index 000000000..d905e77a8 --- /dev/null +++ b/scaleph-engine/scaleph-engine-flink-cdc/src/main/java/cn/sliew/scaleph/engine/flink/cdc/service/param/WsFlinkArtifactCDCListParam.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.engine.flink.cdc.service.param; + +import cn.sliew.scaleph.common.dict.flink.FlinkVersion; +import cn.sliew.scaleph.common.dict.flink.cdc.FlinkCDCVersion; +import cn.sliew.scaleph.common.dict.seatunnel.SeaTunnelEngineType; +import cn.sliew.scaleph.project.service.param.WsFlinkArtifactListParam; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; +import lombok.EqualsAndHashCode; + +@Data +@EqualsAndHashCode(callSuper = true) +public class WsFlinkArtifactCDCListParam extends WsFlinkArtifactListParam { + + @Schema(description = "flink version") + private FlinkVersion flinkVersion; + + @Schema(description = "flink cdc version") + private FlinkCDCVersion flinkCDCVersion; +} diff --git a/scaleph-engine/scaleph-engine-flink-cdc/src/main/java/cn/sliew/scaleph/engine/flink/cdc/service/param/WsFlinkArtifactCDCSelectListParam.java b/scaleph-engine/scaleph-engine-flink-cdc/src/main/java/cn/sliew/scaleph/engine/flink/cdc/service/param/WsFlinkArtifactCDCSelectListParam.java new file mode 100644 index 000000000..efaab7ebf --- /dev/null +++ b/scaleph-engine/scaleph-engine-flink-cdc/src/main/java/cn/sliew/scaleph/engine/flink/cdc/service/param/WsFlinkArtifactCDCSelectListParam.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.engine.flink.cdc.service.param; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; + +import javax.validation.constraints.NotNull; + +@Data +public class WsFlinkArtifactCDCSelectListParam { + + @NotNull + @Schema(description = "项目id") + private Long projectId; + + @Schema(description = "名称。支持模糊搜索") + private String name; +} diff --git a/scaleph-engine/scaleph-engine-flink-cdc/src/main/java/cn/sliew/scaleph/engine/flink/cdc/service/param/WsFlinkArtifactCDCUpdateParam.java b/scaleph-engine/scaleph-engine-flink-cdc/src/main/java/cn/sliew/scaleph/engine/flink/cdc/service/param/WsFlinkArtifactCDCUpdateParam.java new file mode 100644 index 000000000..cee97e1f8 --- /dev/null +++ b/scaleph-engine/scaleph-engine-flink-cdc/src/main/java/cn/sliew/scaleph/engine/flink/cdc/service/param/WsFlinkArtifactCDCUpdateParam.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.engine.flink.cdc.service.param; + +import cn.sliew.scaleph.common.dict.flink.FlinkVersion; +import cn.sliew.scaleph.project.service.param.AbstractWsFlinkArtifactUpdateParam; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; + +@Data +public class WsFlinkArtifactCDCUpdateParam extends AbstractWsFlinkArtifactUpdateParam { + + @Schema(description = "flink version") + private FlinkVersion flinkVersion; +} diff --git a/scaleph-engine/scaleph-engine-flink-kubernetes/pom.xml b/scaleph-engine/scaleph-engine-flink-kubernetes/pom.xml index 30dbc8a52..51eecb6eb 100644 --- a/scaleph-engine/scaleph-engine-flink-kubernetes/pom.xml +++ b/scaleph-engine/scaleph-engine-flink-kubernetes/pom.xml @@ -41,6 +41,10 @@ ${project.parent.groupId} scaleph-engine-seatunnel + + ${project.parent.groupId} + scaleph-engine-flink-cdc + ${project.parent.groupId} scaleph-engine-sql diff --git a/scaleph-engine/scaleph-engine-seatunnel/src/main/java/cn/sliew/scaleph/engine/seatunnel/service/impl/WsDiJobServiceImpl.java b/scaleph-engine/scaleph-engine-seatunnel/src/main/java/cn/sliew/scaleph/engine/seatunnel/service/impl/WsDiJobServiceImpl.java index ede785e02..7ce0844fa 100644 --- a/scaleph-engine/scaleph-engine-seatunnel/src/main/java/cn/sliew/scaleph/engine/seatunnel/service/impl/WsDiJobServiceImpl.java +++ b/scaleph-engine/scaleph-engine-seatunnel/src/main/java/cn/sliew/scaleph/engine/seatunnel/service/impl/WsDiJobServiceImpl.java @@ -21,6 +21,7 @@ import cn.sliew.milky.common.util.JacksonUtil; import cn.sliew.scaleph.common.dict.common.YesOrNo; import cn.sliew.scaleph.common.dict.flink.FlinkJobType; +import cn.sliew.scaleph.common.util.UUIDUtil; import cn.sliew.scaleph.dag.service.DagService; import cn.sliew.scaleph.dag.service.dto.DagDTO; import cn.sliew.scaleph.dag.service.dto.DagInstanceDTO; @@ -100,7 +101,7 @@ public WsDiJobDTO insert(WsDiJobAddParam param) { Long dagId = dagService.insert(new DagSimpleAddParam()); WsDiJob record = new WsDiJob(); record.setFlinkArtifactId(flinkArtifact.getId()); - record.setJobId(UUID.randomUUID().toString()); + record.setJobId(UUIDUtil.randomUUId()); record.setDagId(dagId); record.setJobEngine(param.getJobEngine()); record.setCurrent(YesOrNo.YES); diff --git a/scaleph-engine/scaleph-engine-sql/src/main/java/cn/sliew/scaleph/engine/sql/service/impl/WsFlinkArtifactSqlServiceImpl.java b/scaleph-engine/scaleph-engine-sql/src/main/java/cn/sliew/scaleph/engine/sql/service/impl/WsFlinkArtifactSqlServiceImpl.java index 80c9d18db..2bcf976a6 100644 --- a/scaleph-engine/scaleph-engine-sql/src/main/java/cn/sliew/scaleph/engine/sql/service/impl/WsFlinkArtifactSqlServiceImpl.java +++ b/scaleph-engine/scaleph-engine-sql/src/main/java/cn/sliew/scaleph/engine/sql/service/impl/WsFlinkArtifactSqlServiceImpl.java @@ -37,6 +37,8 @@ import java.util.List; +import static cn.sliew.milky.common.check.Ensures.checkState; + @Service public class WsFlinkArtifactSqlServiceImpl implements WsFlinkArtifactSqlService { @@ -86,8 +88,9 @@ public List listAllByArtifact(Long artifactId) { @Override public WsFlinkArtifactSqlDTO selectOne(Long id) { - WsFlinkArtifactSql wsFlinkArtifactSql = wsFlinkArtifactSqlMapper.selectOne(id); - return WsFlinkArtifactSqlConvert.INSTANCE.toDto(wsFlinkArtifactSql); + WsFlinkArtifactSql record = wsFlinkArtifactSqlMapper.selectOne(id); + checkState(record != null, () -> "flink artifact sql not exists for id: " + id); + return WsFlinkArtifactSqlConvert.INSTANCE.toDto(record); } @Override diff --git a/scaleph-plugins/scaleph-plugin-flinkcdc/src/main/java/cn/sliew/scaleph/plugin/flink/cdc/FlinkCDCPipelineConnectorManager.java b/scaleph-plugins/scaleph-plugin-flinkcdc/src/main/java/cn/sliew/scaleph/plugin/flink/cdc/FlinkCDCPipelineConnectorManager.java index b37e706f6..0cb4996be 100644 --- a/scaleph-plugins/scaleph-plugin-flinkcdc/src/main/java/cn/sliew/scaleph/plugin/flink/cdc/FlinkCDCPipelineConnectorManager.java +++ b/scaleph-plugins/scaleph-plugin-flinkcdc/src/main/java/cn/sliew/scaleph/plugin/flink/cdc/FlinkCDCPipelineConnectorManager.java @@ -18,22 +18,39 @@ package cn.sliew.scaleph.plugin.flink.cdc; +import cn.sliew.scaleph.common.dict.flink.cdc.FlinkCDCPluginName; +import cn.sliew.scaleph.common.dict.flink.cdc.FlinkCDCPluginType; import cn.sliew.scaleph.plugin.framework.core.PluginInfo; import cn.sliew.scaleph.plugin.framework.core.PluginSPILoader; import cn.sliew.scaleph.plugin.framework.exception.PluginException; import java.util.Optional; import java.util.Properties; +import java.util.Set; +import java.util.stream.Collectors; public class FlinkCDCPipelineConnectorManager { private PluginSPILoader pluginPluginSPILoader = new PluginSPILoader<>(FlinkCDCPipilineConnectorPlugin.class, FlinkCDCPipilineConnectorPlugin.class.getClassLoader()); + public Set getAvailableConnectors(FlinkCDCPluginType pluginType) { + return pluginPluginSPILoader.getServices().values().stream() + .filter(connector -> connector.getPluginType() == pluginType) + .collect(Collectors.toSet()); + } + public FlinkCDCPipilineConnectorPlugin getConnector(PluginInfo pluginInfo) throws PluginException { final Optional optional = pluginPluginSPILoader.getPlugin(pluginInfo); return optional.orElseThrow(() -> new PluginException("FlinkCDCPipilineConnectorPlugin", "unknown plugin info for " + pluginInfo)); } + public FlinkCDCPipilineConnectorPlugin getConnector(FlinkCDCPluginType pluginType, FlinkCDCPluginName pluginName) throws PluginException { + return pluginPluginSPILoader.getServices().values().stream() + .filter(connector -> connector.getPluginType() == pluginType) + .filter(connector -> connector.getPluginName() == pluginName) + .findAny().orElseThrow(() -> new PluginException("FlinkCDCPipilineConnectorPlugin")); + } + public FlinkCDCPipilineConnectorPlugin newConnector(String name, Properties props) { return pluginPluginSPILoader.newInstance(name, props); } diff --git a/scaleph-support/scaleph-generator/src/main/java/cn/sliew/scaleph/generator/MybatisPlusGenerator.java b/scaleph-support/scaleph-generator/src/main/java/cn/sliew/scaleph/generator/MybatisPlusGenerator.java index 3a36f1ea6..a890e742f 100644 --- a/scaleph-support/scaleph-generator/src/main/java/cn/sliew/scaleph/generator/MybatisPlusGenerator.java +++ b/scaleph-support/scaleph-generator/src/main/java/cn/sliew/scaleph/generator/MybatisPlusGenerator.java @@ -54,7 +54,7 @@ public class MybatisPlusGenerator { /** * just add table names here and run the {@link #main(String[])} method. */ - private static final String[] TABLES = {"ws_doris_instance"}; + private static final String[] TABLES = {"ws_flink_artifact_cdc"}; public static void main(String[] args) { //自动生成配置 diff --git a/scaleph-ui-react2/config/routes.ts b/scaleph-ui-react2/config/routes.ts index aa20b1b20..a47716420 100644 --- a/scaleph-ui-react2/config/routes.ts +++ b/scaleph-ui-react2/config/routes.ts @@ -106,6 +106,15 @@ export default [ path: '/workspace/artifact/seatunnel/dag', component: './Project/Workspace/Artifact/DI/DiJobFlow', }, + { + name: 'cdc', + path: '/workspace/artifact/cdc', + component: './Project/Workspace/Artifact/CDC', + }, + { + path: '/workspace/artifact/cdc/dag', + component: './Project/Workspace/Artifact/CDC/Dag', + }, ] }, { diff --git a/scaleph-ui-react2/src/constants/dictType.ts b/scaleph-ui-react2/src/constants/dictType.ts index f9ce0024c..015eaaeb6 100644 --- a/scaleph-ui-react2/src/constants/dictType.ts +++ b/scaleph-ui-react2/src/constants/dictType.ts @@ -34,6 +34,7 @@ export const DICT_TYPE = { flinkJobType: 'flink_job_type', flinkJobStatus: 'flink_job_status', flinkServiceExposedType: 'flink_service_exposed_type', + seatunnelVersion: 'seatunnel_version', seatunnelEngineType: 'seatunnel_engine_type', seatunnelPluginName: 'seatunnel_plugin_name', @@ -42,6 +43,8 @@ export const DICT_TYPE = { seatunnelCDCFormat: 'seatunnel_cdc_format', imagePullPolicy: 'image_pull_policy', + flinkCDCVersion: 'flink_cdc_version', + deploymentKind: 'deployment_kind', resourceLifecycleState: 'resource_lifecycle_state', upgradeMode: 'upgrade_mode', diff --git a/scaleph-ui-react2/src/locales/zh-CN/menu.ts b/scaleph-ui-react2/src/locales/zh-CN/menu.ts index d6a5dff17..dbc132e0e 100644 --- a/scaleph-ui-react2/src/locales/zh-CN/menu.ts +++ b/scaleph-ui-react2/src/locales/zh-CN/menu.ts @@ -7,6 +7,7 @@ export default { 'menu.project.artifact.jar': 'Jar', 'menu.project.artifact.sql': 'SQL', 'menu.project.artifact.seatunnel': 'SeaTunnel', + 'menu.project.artifact.cdc': 'Flink CDC', 'menu.project.flink.kubernetes': 'Flink Kuberentes', 'menu.project.flink.kubernetes.template': 'Template', 'menu.project.flink.kubernetes.session-cluster': 'SessionCluster', diff --git a/scaleph-ui-react2/src/locales/zh-CN/pages/project.ts b/scaleph-ui-react2/src/locales/zh-CN/pages/project.ts index acce365e6..6a866bece 100644 --- a/scaleph-ui-react2/src/locales/zh-CN/pages/project.ts +++ b/scaleph-ui-react2/src/locales/zh-CN/pages/project.ts @@ -24,6 +24,8 @@ export default { 'pages.project.artifact.seatunnel': 'SeaTunnel', 'pages.project.artifact.seatunnel.jobEngine': 'Engine', + 'pages.project.artifact.cdc': 'CDC', + 'pages.project.artifact.cdc.flinkCDCVersion': 'Flink CDC Version', 'pages.project.job.jar': 'Jar', 'pages.project.job.jar.args': 'Main Args', diff --git a/scaleph-ui-react2/src/pages/Project/Workspace/Artifact/CDC/Dag/index.tsx b/scaleph-ui-react2/src/pages/Project/Workspace/Artifact/CDC/Dag/index.tsx new file mode 100644 index 000000000..372f45ff2 --- /dev/null +++ b/scaleph-ui-react2/src/pages/Project/Workspace/Artifact/CDC/Dag/index.tsx @@ -0,0 +1,20 @@ +import React from 'react'; +import {PageContainer} from "@ant-design/pro-components"; +import {useAccess, useIntl, useLocation} from '@umijs/max'; +import {WORKSPACE_CONF} from '@/constants/constant'; +import {WsFlinkArtifactCDC} from "@/services/project/typings"; + +const FlinkArtifactCDCDagWeb: React.FC = () => { + const intl = useIntl(); + const access = useAccess(); + const data = useLocation().state as WsFlinkArtifactCDC; + const projectId = localStorage.getItem(WORKSPACE_CONF.projectId); + + return ( + + Flink CDC Dag Web: {data.wsFlinkArtifact?.name} + + ); +}; + +export default FlinkArtifactCDCDagWeb; diff --git a/scaleph-ui-react2/src/pages/Project/Workspace/Artifact/CDC/FlinkArtifactCDCForm.tsx b/scaleph-ui-react2/src/pages/Project/Workspace/Artifact/CDC/FlinkArtifactCDCForm.tsx new file mode 100644 index 000000000..17efbc781 --- /dev/null +++ b/scaleph-ui-react2/src/pages/Project/Workspace/Artifact/CDC/FlinkArtifactCDCForm.tsx @@ -0,0 +1,98 @@ +import {Form, message, Modal} from 'antd'; +import {ProForm, ProFormDigit, ProFormSelect, ProFormText, ProFormTextArea} from "@ant-design/pro-components"; +import {useIntl} from '@umijs/max'; +import {ModalFormProps} from '@/typings'; +import {WORKSPACE_CONF} from '@/constants/constant'; +import {DICT_TYPE} from '@/constants/dictType'; +import {WsFlinkArtifactCDC} from '@/services/project/typings'; +import {DictDataService} from "@/services/admin/dictData.service"; +import {WsFlinkCDCService} from "@/services/project/WsFlinkCDCService"; + +const FlinkArtifactCDCForm: React.FC> = ({ + data, + visible, + onVisibleChange, + onCancel + }) => { + const intl = useIntl(); + const [form] = Form.useForm(); + const projectId = localStorage.getItem(WORKSPACE_CONF.projectId); + + return ( + { + form.validateFields().then((values) => { + const param = { + id: values.id, + projectId: projectId, + name: values.name, + remark: values.remark, + flinkVersion: values.flinkVersion, + }; + data?.id + ? WsFlinkCDCService.update(param).then((response) => { + if (response.success) { + message.success(intl.formatMessage({id: 'app.common.operate.edit.success'})); + if (onVisibleChange) { + onVisibleChange(false); + } + } + }) + : WsFlinkCDCService.add(param).then((response) => { + if (response.success) { + message.success(intl.formatMessage({id: 'app.common.operate.new.success'})); + if (onVisibleChange) { + onVisibleChange(false); + } + } + }) + }); + }} + > + + + + ); +}; + +export default FlinkArtifactCDCForm; diff --git a/scaleph-ui-react2/src/pages/Project/Workspace/Artifact/CDC/index.tsx b/scaleph-ui-react2/src/pages/Project/Workspace/Artifact/CDC/index.tsx new file mode 100644 index 000000000..74b3472e8 --- /dev/null +++ b/scaleph-ui-react2/src/pages/Project/Workspace/Artifact/CDC/index.tsx @@ -0,0 +1,197 @@ +import React, {useRef, useState} from 'react'; +import {Button, message, Modal, Space, Tooltip} from "antd"; +import {DeleteOutlined, EditOutlined, FolderOpenOutlined} from "@ant-design/icons"; +import {ActionType, ProColumns, ProFormInstance, ProTable} from '@ant-design/pro-components'; +import {history, useAccess, useIntl} from '@umijs/max'; +import {WORKSPACE_CONF} from '@/constants/constant'; +import {DICT_TYPE} from '@/constants/dictType'; +import {PRIVILEGE_CODE} from "@/constants/privilegeCode"; +import {DictDataService} from '@/services/admin/dictData.service'; +import {WsFlinkArtifactCDC} from '@/services/project/typings'; +import {WsFlinkCDCService} from "@/services/project/WsFlinkCDCService"; +import FlinkArtifactCDCForm from "@/pages/Project/Workspace/Artifact/CDC/FlinkArtifactCDCForm"; + +const FlinkArtifactCDCWeb: React.FC = () => { + const intl = useIntl(); + const access = useAccess(); + const actionRef = useRef(); + const formRef = useRef(); + const [flinkArtifactCDCFormData, setFlinkArtifactCDCFormData] = useState<{ + visible: boolean; + data: WsFlinkArtifactCDC + }>({ + visible: false, + data: {}, + }); + const projectId = localStorage.getItem(WORKSPACE_CONF.projectId); + + const tableColumns: ProColumns[] = [ + { + title: intl.formatMessage({id: 'pages.project.artifact.name'}), + dataIndex: 'name', + width: 240, + render: (dom, entity, index, action, schema) => { + return entity.wsFlinkArtifact?.name + } + }, + { + title: intl.formatMessage({id: 'pages.resource.flinkRelease.version'}), + dataIndex: 'flinkVersion', + width: 120, + render: (dom, record) => { + return record.flinkVersion?.label; + }, + request: (params, props) => { + return DictDataService.listDictDataByType2(DICT_TYPE.flinkVersion) + } + }, + { + title: intl.formatMessage({id: 'pages.project.artifact.cdc.flinkCDCVersion'}), + dataIndex: 'flinkCDCVersion', + width: 120, + render: (dom, record) => { + return record.flinkCDCVersion?.label; + }, + request: (params, props) => { + return DictDataService.listDictDataByType2(DICT_TYPE.flinkCDCVersion) + } + }, + { + title: intl.formatMessage({id: 'app.common.data.remark'}), + dataIndex: 'remark', + hideInSearch: true, + width: 150, + render: (dom, entity, index, action, schema) => { + return entity.wsFlinkArtifact?.remark + } + }, + { + title: intl.formatMessage({id: 'app.common.data.createTime'}), + dataIndex: 'createTime', + hideInSearch: true, + width: 180, + }, + { + title: intl.formatMessage({id: 'app.common.data.updateTime'}), + dataIndex: 'updateTime', + hideInSearch: true, + width: 180, + }, + { + title: intl.formatMessage({id: 'app.common.operate.label'}), + dataIndex: 'actions', + align: 'center', + width: 120, + fixed: 'right', + valueType: 'option', + render: (_, record) => ( + <> + + {access.canAccess(PRIVILEGE_CODE.datadevResourceDownload) && ( + + + + )} + + + ), + }, + ]; + + return ( +
+ + search={{ + labelWidth: 'auto', + span: {xs: 24, sm: 12, md: 8, lg: 6, xl: 6, xxl: 4}, + }} + rowKey="id" + actionRef={actionRef} + formRef={formRef} + options={false} + columns={tableColumns} + request={(params, sorter, filter) => + WsFlinkCDCService.list({...params, projectId: projectId}) + } + toolbar={{ + actions: [ + access.canAccess(PRIVILEGE_CODE.datadevResourceAdd) && ( + + ), + ], + }} + pagination={{showQuickJumper: true, showSizeChanger: true, defaultPageSize: 10}} + tableAlertRender={false} + tableAlertOptionRender={false} + /> + {flinkArtifactCDCFormData.visiable && ( + { + setFlinkArtifactCDCFormData({visiable: false, data: {}}); + }} + onVisibleChange={(visiable) => { + setFlinkArtifactCDCFormData({visiable: visiable, data: {}}); + actionRef.current?.reload(); + }} + /> + )} +
+ ); +}; + +export default FlinkArtifactCDCWeb; diff --git a/scaleph-ui-react2/src/pages/Project/Workspace/Artifact/Sql/FlinkArtifactSqlForm.tsx b/scaleph-ui-react2/src/pages/Project/Workspace/Artifact/Sql/FlinkArtifactSqlForm.tsx index 370905fa8..9e7528301 100644 --- a/scaleph-ui-react2/src/pages/Project/Workspace/Artifact/Sql/FlinkArtifactSqlForm.tsx +++ b/scaleph-ui-react2/src/pages/Project/Workspace/Artifact/Sql/FlinkArtifactSqlForm.tsx @@ -1,5 +1,5 @@ import {Form, message, Modal} from 'antd'; -import {ProForm, ProFormDigit, ProFormSelect, ProFormText} from "@ant-design/pro-components"; +import {ProForm, ProFormDigit, ProFormSelect, ProFormText, ProFormTextArea} from "@ant-design/pro-components"; import {useIntl} from '@umijs/max'; import {ModalFormProps} from '@/typings'; import {WORKSPACE_CONF} from '@/constants/constant'; @@ -85,7 +85,7 @@ const FlinkArtifactSqlForm: React.FC> = ({ return DictDataService.listDictDataByType2(DICT_TYPE.flinkVersion) }} /> - { + return request>>>(`${WsFlinkCDCService.url}/dag/dnd`, { + method: 'GET', + }); + }, + + list: async (queryParam: WsFlinkArtifactCDCParam) => { + return request>(`${WsFlinkCDCService.url}`, { + method: 'GET', + params: queryParam, + }).then((res) => { + const result = { + data: res.records, + total: res.total, + pageSize: res.size, + current: res.current, + }; + return result; + }); + }, + + listAll: async (queryParam: WsFlinkArtifactCDCSelectListParam) => { + return request>>(`${WsFlinkCDCService.url}/all`, { + method: 'GET', + params: queryParam, + }); + }, + + add: async (row: WsFlinkArtifactCDCAddParam) => { + return request>(`${WsFlinkCDCService.url}`, { + method: 'PUT', + data: row, + }); + }, + + update: async (row: WsFlinkArtifactCDCUpdateParam) => { + return request>(`${WsFlinkCDCService.url}`, { + method: 'POST', + data: row, + }); + }, + + deleteOne: async (row: WsFlinkArtifactCDC) => { + return request>(`${WsFlinkCDCService.url}/${row.id}`, { + method: 'DELETE', + }); + }, + + deleteBatch: async (rows: WsFlinkArtifactCDC[]) => { + const params = rows.map((row) => row.id); + return request>(`${WsFlinkCDCService.url}/batch`, { + method: 'DELETE', + data: params + }); + }, +}; diff --git a/scaleph-ui-react2/src/services/project/typings.d.ts b/scaleph-ui-react2/src/services/project/typings.d.ts index 067e6bf76..4bfc456d4 100644 --- a/scaleph-ui-react2/src/services/project/typings.d.ts +++ b/scaleph-ui-react2/src/services/project/typings.d.ts @@ -98,6 +98,42 @@ export type WsDiJobStep = { updateTime?: Date; }; +export type WsFlinkArtifactCDC = { + id?: number; + wsFlinkArtifact?: WsFlinkArtifact; + flinkVersion?: Dict; + flinkCDCVersion?: Dict; + current?: Dict; + createTime?: Date; + updateTime?: Date; +}; + +export type WsFlinkArtifactCDCParam = QueryParam & { + projectId: number; + flinkVersion?: string; + flinkCDCVersion?: string; + name?: string; +}; + +export type WsFlinkArtifactCDCSelectListParam = { + projectId: number; + name?: string; +}; + +export type WsFlinkArtifactCDCAddParam = { + projectId: number; + name?: string; + flinkVersion?: string; + remark?: string; +}; + +export type WsFlinkArtifactCDCUpdateParam = { + id: number; + name?: string; + flinkVersion?: string; + remark?: string; +}; + export type WsFlinkArtifactJar = { id?: number; wsFlinkArtifact?: WsFlinkArtifact; diff --git a/scaleph-workflow/scaleph-workflow-quartz/pom.xml b/scaleph-workflow/scaleph-workflow-quartz/pom.xml index c68768048..24899c74f 100644 --- a/scaleph-workflow/scaleph-workflow-quartz/pom.xml +++ b/scaleph-workflow/scaleph-workflow-quartz/pom.xml @@ -38,6 +38,15 @@ org.springframework.boot spring-boot-starter-quartz
+ + \ No newline at end of file diff --git a/tools/docker/mysql/init.d/scaleph-dag-mysql.sql b/tools/docker/mysql/init.d/scaleph-dag-mysql.sql index f2f2a5bdc..8ab4fde5f 100644 --- a/tools/docker/mysql/init.d/scaleph-dag-mysql.sql +++ b/tools/docker/mysql/init.d/scaleph-dag-mysql.sql @@ -18,6 +18,8 @@ INSERT INTO `dag_instance` (`id`, `dag_meta`, `dag_attrs`, `creator`, `editor`) VALUES (1, NULL, NULL, 'sys', 'sys'); INSERT INTO `dag_instance` (`id`, `dag_meta`, `dag_attrs`, `creator`, `editor`) VALUES (2, NULL, NULL, 'sys', 'sys'); +INSERT INTO `dag_instance`(`id`, `dag_meta`, `dag_attrs`, `creator`, `editor`) +VALUES (3, NULL, NULL, 'sys', 'sys'); drop table if exists dag_step; create table dag_step @@ -38,10 +40,28 @@ create table dag_step unique key uniq_step (dag_id, step_id) ) engine = innodb comment 'DAG 步骤'; -INSERT INTO `dag_step` (`id`, `dag_id`, `step_id`, `step_name`, `position_x`, `position_y`, `step_meta`, `step_attrs`, `creator`, `editor`) VALUES (1, 1, '68834928-2a32-427a-a864-83b6b5848e04', 'Jdbc Sink', 360, 290, '{"name":"Jdbc","type":"sink","engine":"seatunnel"}', '{"stepTitle":"Jdbc Sink","dataSourceType":"MySQL","dataSource":2,"batch_size":300,"batch_interval_ms":1000,"max_retries":3,"is_exactly_once":false,"query":"insert into sample_data_e_commerce_duplicate ( id, invoice_no, stock_code, description, quantity, invoice_date, unit_price, customer_id, country )\nvalues (?,?,?,?,?,?,?,?,?)"}', 'sys', 'sys'); -INSERT INTO `dag_step` (`id`, `dag_id`, `step_id`, `step_name`, `position_x`, `position_y`, `step_meta`, `step_attrs`, `creator`, `editor`) VALUES (2, 1, 'f3e02087-91fa-494d-86f4-694970a49ebd', 'Jdbc Source', 380, 140, '{"name":"Jdbc","type":"source","engine":"seatunnel"}', '{"stepTitle":"Jdbc Source","dataSourceType":"MySQL","dataSource":2,"query":"select * from sample_data_e_commerce"}', 'sys', 'sys'); -INSERT INTO `dag_step` (`id`, `dag_id`, `step_id`, `step_name`, `position_x`, `position_y`, `step_meta`, `step_attrs`, `creator`, `editor`) VALUES (3, 2, '6223c6c3-b552-4c69-adab-5300b7514fad', 'Fake Source', 380, 140, '{"name":"FakeSource","type":"source","engine":"seatunnel"}', '{"stepTitle":"Fake Source","fields":[{"field":"c_string","type":"string"},{"field":"c_boolean","type":"boolean"},{"field":"c_tinyint","type":"tinyint"},{"field":"c_smallint","type":"smallint"},{"field":"c_int","type":"int"},{"field":"c_bigint","type":"bigint"},{"field":"c_float","type":"float"},{"field":"c_double","type":"double"},{"field":"c_decimal","type":"decimal(30, 8)"},{"field":"c_bytes","type":"bytes"},{"field":"c_map","type":"map"},{"field":"c_date","type":"date"},{"field":"c_time","type":"time"},{"field":"c_timestamp","type":"timestamp"}],"schema":"{\\\"fields\\\":{\\\"c_string\\\":\\\"string\\\",\\\"c_boolean\\\":\\\"boolean\\\",\\\"c_tinyint\\\":\\\"tinyint\\\",\\\"c_smallint\\\":\\\"smallint\\\",\\\"c_int\\\":\\\"int\\\",\\\"c_bigint\\\":\\\"bigint\\\",\\\"c_float\\\":\\\"float\\\",\\\"c_double\\\":\\\"double\\\",\\\"c_decimal\\\":\\\"decimal(30, 8)\\\",\\\"c_bytes\\\":\\\"bytes\\\",\\\"c_map\\\":\\\"map\\\",\\\"c_date\\\":\\\"date\\\",\\\"c_time\\\":\\\"time\\\",\\\"c_timestamp\\\":\\\"timestamp\\\"}}"}', 'sys', 'sys'); -INSERT INTO `dag_step` (`id`, `dag_id`, `step_id`, `step_name`, `position_x`, `position_y`, `step_meta`, `step_attrs`, `creator`, `editor`) VALUES (4, 2, 'f08143b4-34dc-4190-8723-e8d8ce49738f', 'Console Sink', 360, 290, '{"name":"Console","type":"sink","engine":"seatunnel"}', '{"stepTitle":"Console Sink"}', 'sys', 'sys'); +INSERT INTO `dag_step` (`id`, `dag_id`, `step_id`, `step_name`, `position_x`, `position_y`, `step_meta`, `step_attrs`, + `creator`, `editor`) +VALUES (1, 1, '68834928-2a32-427a-a864-83b6b5848e04', 'Jdbc Sink', 360, 290, + '{"name":"Jdbc","type":"sink","engine":"seatunnel"}', + '{"stepTitle":"Jdbc Sink","dataSourceType":"MySQL","dataSource":2,"batch_size":300,"batch_interval_ms":1000,"max_retries":3,"is_exactly_once":false,"query":"insert into sample_data_e_commerce_duplicate ( id, invoice_no, stock_code, description, quantity, invoice_date, unit_price, customer_id, country )\nvalues (?,?,?,?,?,?,?,?,?)"}', + 'sys', 'sys'); +INSERT INTO `dag_step` (`id`, `dag_id`, `step_id`, `step_name`, `position_x`, `position_y`, `step_meta`, `step_attrs`, + `creator`, `editor`) +VALUES (2, 1, 'f3e02087-91fa-494d-86f4-694970a49ebd', 'Jdbc Source', 380, 140, + '{"name":"Jdbc","type":"source","engine":"seatunnel"}', + '{"stepTitle":"Jdbc Source","dataSourceType":"MySQL","dataSource":2,"query":"select * from sample_data_e_commerce"}', + 'sys', 'sys'); +INSERT INTO `dag_step` (`id`, `dag_id`, `step_id`, `step_name`, `position_x`, `position_y`, `step_meta`, `step_attrs`, + `creator`, `editor`) +VALUES (3, 2, '6223c6c3-b552-4c69-adab-5300b7514fad', 'Fake Source', 380, 140, + '{"name":"FakeSource","type":"source","engine":"seatunnel"}', + '{"stepTitle":"Fake Source","fields":[{"field":"c_string","type":"string"},{"field":"c_boolean","type":"boolean"},{"field":"c_tinyint","type":"tinyint"},{"field":"c_smallint","type":"smallint"},{"field":"c_int","type":"int"},{"field":"c_bigint","type":"bigint"},{"field":"c_float","type":"float"},{"field":"c_double","type":"double"},{"field":"c_decimal","type":"decimal(30, 8)"},{"field":"c_bytes","type":"bytes"},{"field":"c_map","type":"map"},{"field":"c_date","type":"date"},{"field":"c_time","type":"time"},{"field":"c_timestamp","type":"timestamp"}],"schema":"{\\\"fields\\\":{\\\"c_string\\\":\\\"string\\\",\\\"c_boolean\\\":\\\"boolean\\\",\\\"c_tinyint\\\":\\\"tinyint\\\",\\\"c_smallint\\\":\\\"smallint\\\",\\\"c_int\\\":\\\"int\\\",\\\"c_bigint\\\":\\\"bigint\\\",\\\"c_float\\\":\\\"float\\\",\\\"c_double\\\":\\\"double\\\",\\\"c_decimal\\\":\\\"decimal(30, 8)\\\",\\\"c_bytes\\\":\\\"bytes\\\",\\\"c_map\\\":\\\"map\\\",\\\"c_date\\\":\\\"date\\\",\\\"c_time\\\":\\\"time\\\",\\\"c_timestamp\\\":\\\"timestamp\\\"}}"}', + 'sys', 'sys'); +INSERT INTO `dag_step` (`id`, `dag_id`, `step_id`, `step_name`, `position_x`, `position_y`, `step_meta`, `step_attrs`, + `creator`, `editor`) +VALUES (4, 2, 'f08143b4-34dc-4190-8723-e8d8ce49738f', 'Console Sink', 360, 290, + '{"name":"Console","type":"sink","engine":"seatunnel"}', '{"stepTitle":"Console Sink"}', 'sys', 'sys'); drop table if exists dag_link; create table dag_link diff --git a/tools/docker/mysql/init.d/scaleph-security-mysql.sql b/tools/docker/mysql/init.d/scaleph-security-mysql.sql index 5cef6e1c6..e26325ee3 100644 --- a/tools/docker/mysql/init.d/scaleph-security-mysql.sql +++ b/tools/docker/mysql/init.d/scaleph-security-mysql.sql @@ -596,6 +596,14 @@ INSERT INTO `sec_resource_web` (`id`, `type`, `pid`, `name`, `path`, `redirect`, `remark`, `creator`, `editor`) VALUES (78, '2', 75, NULL, '/workspace/dataservice/config/steps', NULL, NULL, NULL, './Project/Workspace/DataService/Config/Steps', NULL, 'sys', 'sys'); +INSERT INTO `sec_resource_web`(`id`, `type`, `pid`, `name`, `path`, `redirect`, `layout`, `icon`, `component`, `remark`, + `creator`, `editor`) +VALUES (79, '2', 13, 'cdc', '/workspace/artifact/cdc', NULL, NULL, NULL, './Project/Workspace/Artifact/CDC', NULL, + 'sys', 'sys'); +INSERT INTO `sec_resource_web`(`id`, `type`, `pid`, `name`, `path`, `redirect`, `layout`, `icon`, `component`, `remark`, + `creator`, `editor`) +VALUES (80, '2', 13, NULL, '/workspace/artifact/cdc/dag', NULL, NULL, NULL, './Project/Workspace/Artifact/CDC/Dag', + NULL, 'sys', 'sys'); drop table if exists sec_resource_web_role; create table sec_resource_web_role diff --git a/tools/docker/mysql/init.d/scaleph-ws-mysql.sql b/tools/docker/mysql/init.d/scaleph-ws-mysql.sql index 24a307081..1e935646b 100644 --- a/tools/docker/mysql/init.d/scaleph-ws-mysql.sql +++ b/tools/docker/mysql/init.d/scaleph-ws-mysql.sql @@ -53,6 +53,8 @@ INSERT INTO `ws_flink_artifact` (`id`, `project_id`, `type`, `name`, `remark`, ` VALUES (8, 1, '0', 'jdbc&paimon-example', 'jdbc 和 paimon catalog example', 'sys', 'sys'); INSERT INTO `ws_flink_artifact` (`id`, `project_id`, `type`, `name`, `remark`, `creator`, `editor`) VALUES (9, 1, '0', 'sakura-example', 'sakura catalog example', 'sys', 'sys'); +INSERT INTO `ws_flink_artifact`(`id`, `project_id`, `type`, `name`, `remark`, `creator`, `editor`) +VALUES (10, 1, '3', 'flink-cdc-example', NULL, 'sys', 'sys'); drop table if exists ws_flink_artifact_jar; create table ws_flink_artifact_jar @@ -125,6 +127,26 @@ VALUES (7, 9, '1.18.0', 'CREATE CATALOG sakura WITH(\n \'type\' = \'sakura\',\n \'jdbcUrl\' = \'jdbc:mysql://localhost:3306/sakura\',\n \'username\' = \'root\',\n \'password\' = \'123456\',\n \'driver\' = \'com.mysql.cj.jdbc.Driver\'\n);\n\nCREATE DATABASE sakura.dev;\n\nCREATE TABLE sakura.dev.orders (\n order_number BIGINT,\n price DECIMAL(32,2),\n buyer ROW,\n order_time TIMESTAMP(3)\n) WITH (\n \'connector\' = \'datagen\'\n);\n\nCREATE TABLE sakura.dev.print_table WITH (\'connector\' = \'print\')\n LIKE sakura.dev.orders;\n\nINSERT INTO sakura.dev.print_table \nSELECT * FROM sakura.dev.orders;', '1', 'sysc', 'sys'); +drop table if exists ws_flink_artifact_cdc; +create table ws_flink_artifact_cdc +( + id bigint not null auto_increment comment '自增主键', + flink_artifact_id bigint not null comment '作业 artifact id', + flink_version varchar(32) not null comment 'flink 版本', + flink_cdc_version varchar(32) not null comment 'flink cdc 版本', + dag_id bigint not null, + current varchar(16) not null comment 'current artifact', + creator varchar(32) comment '创建人', + create_time timestamp default current_timestamp comment '创建时间', + editor varchar(32) comment '修改人', + update_time timestamp default current_timestamp on update current_timestamp comment '修改时间', + primary key (id), + key idx_flink_artifact (flink_artifact_id) +) engine = innodb comment 'flink artifact cdc'; +INSERT INTO `ws_flink_artifact_cdc`(`id`, `flink_artifact_id`, `flink_version`, `flink_cdc_version`, `dag_id`, + `current`, `creator`, `editor`) +VALUES (1, 10, '1.18.0', '3.0.0', 3, '1', 'sys', 'sys'); + drop table if exists ws_di_job; create table ws_di_job (