diff --git a/pom.xml b/pom.xml
index a3a1f85a6..7b0a423cc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -96,6 +96,7 @@
scaleph-api
scaleph-meta
+ scaleph-application
scaleph-engine
scaleph-workspace
scaleph-plugins
@@ -175,6 +176,7 @@
3.27.2
4.3.2
0.4.1
+ 1.5.2
@@ -263,6 +265,11 @@
scaleph-api
${project.version}
+
+ ${project.groupId}
+ scaleph-application
+ ${project.version}
+
${project.groupId}
scaleph-workspace-project
@@ -774,6 +781,12 @@
fury-core
${fury.version}
+
+
+ org.jgrapht
+ jgrapht-core
+ ${jgrapht.version}
+
diff --git a/scaleph-application/pom.xml b/scaleph-application/pom.xml
new file mode 100644
index 000000000..c87a21fc4
--- /dev/null
+++ b/scaleph-application/pom.xml
@@ -0,0 +1,35 @@
+
+
+
+
+ 4.0.0
+
+ cn.sliew
+ scaleph
+ 2.0.3-SNAPSHOT
+ ../pom.xml
+
+ scaleph-application
+
+
+
+
+
+
\ No newline at end of file
diff --git a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/model/Flow.java b/scaleph-application/src/main/java/cn/sliew/scaleph/application/Main.java
similarity index 83%
rename from scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/model/Flow.java
rename to scaleph-application/src/main/java/cn/sliew/scaleph/application/Main.java
index 797730f8a..c1dd8a5bc 100644
--- a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/model/Flow.java
+++ b/scaleph-application/src/main/java/cn/sliew/scaleph/application/Main.java
@@ -16,11 +16,11 @@
* limitations under the License.
*/
-package cn.sliew.scaleph.workflow.model;
+package cn.sliew.scaleph.application;
-import java.util.List;
+public class Main {
-public interface Flow {
-
- List getTasks();
+ public static void main(String[] args) {
+ System.out.println("Hello Application!");
+ }
}
diff --git a/scaleph-dao/src/main/java/cn/sliew/scaleph/dao/entity/master/dag/DagConfig.java b/scaleph-dao/src/main/java/cn/sliew/scaleph/dao/entity/master/dag/DagConfig.java
new file mode 100644
index 000000000..efb732434
--- /dev/null
+++ b/scaleph-dao/src/main/java/cn/sliew/scaleph/dao/entity/master/dag/DagConfig.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.sliew.scaleph.dao.entity.master.dag;
+
+import cn.sliew.scaleph.dao.entity.BaseDO;
+import com.baomidou.mybatisplus.annotation.TableField;
+import com.baomidou.mybatisplus.annotation.TableName;
+import io.swagger.v3.oas.annotations.media.Schema;
+import lombok.Data;
+
+/**
+ * DAG 配置
+ */
+@Data
+@TableName("dag_config")
+public class DagConfig extends BaseDO {
+
+ private static final long serialVersionUID = 1L;
+
+ @TableField("`type`")
+ private String type;
+
+ @Schema(description = "DAG名称")
+ @TableField("`name`")
+ private String name;
+
+ @TableField("config_id")
+ private String configId;
+
+ @TableField("dag_meta")
+ private String dagMeta;
+
+ @TableField("dag_attrs")
+ private String dagAttrs;
+
+ @TableField("intput_options")
+ private String intputOptions;
+
+ @TableField("output_options")
+ private String outputOptions;
+
+ @TableField("version")
+ private Integer version;
+
+ @TableField("remark")
+ private String remark;
+}
diff --git a/scaleph-dao/src/main/java/cn/sliew/scaleph/dao/entity/master/dag/DagConfigLink.java b/scaleph-dao/src/main/java/cn/sliew/scaleph/dao/entity/master/dag/DagConfigLink.java
new file mode 100644
index 000000000..e9b4e1400
--- /dev/null
+++ b/scaleph-dao/src/main/java/cn/sliew/scaleph/dao/entity/master/dag/DagConfigLink.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.sliew.scaleph.dao.entity.master.dag;
+
+import cn.sliew.scaleph.dao.entity.BaseDO;
+import com.baomidou.mybatisplus.annotation.TableField;
+import com.baomidou.mybatisplus.annotation.TableName;
+import lombok.Data;
+
+/**
+ * DAG 配置连线
+ */
+@Data
+@TableName("dag_config_link")
+public class DagConfigLink extends BaseDO {
+
+ private static final long serialVersionUID = 1L;
+
+ @TableField("dag_id")
+ private Long dagId;
+
+ @TableField("link_id")
+ private String linkId;
+
+ @TableField("link_name")
+ private String linkName;
+
+ @TableField("from_step_id")
+ private String fromStepId;
+
+ @TableField("to_step_id")
+ private String toStepId;
+
+ @TableField("shape")
+ private String shape;
+
+ @TableField("style")
+ private String style;
+
+ @TableField("link_meta")
+ private String linkMeta;
+
+ @TableField("link_attrs")
+ private String linkAttrs;
+}
diff --git a/scaleph-dao/src/main/java/cn/sliew/scaleph/dao/entity/master/dag/DagConfigStep.java b/scaleph-dao/src/main/java/cn/sliew/scaleph/dao/entity/master/dag/DagConfigStep.java
new file mode 100644
index 000000000..01ed49b96
--- /dev/null
+++ b/scaleph-dao/src/main/java/cn/sliew/scaleph/dao/entity/master/dag/DagConfigStep.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.sliew.scaleph.dao.entity.master.dag;
+
+import cn.sliew.scaleph.dao.entity.BaseDO;
+import com.baomidou.mybatisplus.annotation.TableField;
+import com.baomidou.mybatisplus.annotation.TableName;
+import lombok.Data;
+
+/**
+ * DAG 配置步骤
+ */
+@Data
+@TableName("dag_config_step")
+public class DagConfigStep extends BaseDO {
+
+ private static final long serialVersionUID = 1L;
+
+ @TableField("dag_id")
+ private Long dagId;
+
+ @TableField("step_id")
+ private String stepId;
+
+ @TableField("step_name")
+ private String stepName;
+
+ @TableField("position_x")
+ private Integer positionX;
+
+ @TableField("position_y")
+ private Integer positionY;
+
+ @TableField("shape")
+ private String shape;
+
+ @TableField("style")
+ private String style;
+
+ @TableField("step_meta")
+ private String stepMeta;
+
+ @TableField("step_attrs")
+ private String stepAttrs;
+}
diff --git a/scaleph-dao/src/main/java/cn/sliew/scaleph/dao/mapper/master/dag/DagConfigLinkMapper.java b/scaleph-dao/src/main/java/cn/sliew/scaleph/dao/mapper/master/dag/DagConfigLinkMapper.java
new file mode 100644
index 000000000..fc7f8eb28
--- /dev/null
+++ b/scaleph-dao/src/main/java/cn/sliew/scaleph/dao/mapper/master/dag/DagConfigLinkMapper.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.sliew.scaleph.dao.mapper.master.dag;
+
+import cn.sliew.scaleph.dao.entity.master.dag.DagConfigLink;
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import org.springframework.stereotype.Repository;
+
+/**
+ * DAG 配置连线 Mapper 接口
+ */
+@Repository
+public interface DagConfigLinkMapper extends BaseMapper {
+
+}
diff --git a/scaleph-dao/src/main/java/cn/sliew/scaleph/dao/mapper/master/dag/DagConfigMapper.java b/scaleph-dao/src/main/java/cn/sliew/scaleph/dao/mapper/master/dag/DagConfigMapper.java
new file mode 100644
index 000000000..db215017a
--- /dev/null
+++ b/scaleph-dao/src/main/java/cn/sliew/scaleph/dao/mapper/master/dag/DagConfigMapper.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.sliew.scaleph.dao.mapper.master.dag;
+
+import cn.sliew.scaleph.dao.entity.master.dag.DagConfig;
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import org.springframework.stereotype.Repository;
+
+/**
+ * DAG 配置 Mapper 接口
+ */
+@Repository
+public interface DagConfigMapper extends BaseMapper {
+
+}
diff --git a/scaleph-dao/src/main/java/cn/sliew/scaleph/dao/mapper/master/dag/DagConfigStepMapper.java b/scaleph-dao/src/main/java/cn/sliew/scaleph/dao/mapper/master/dag/DagConfigStepMapper.java
new file mode 100644
index 000000000..75198d001
--- /dev/null
+++ b/scaleph-dao/src/main/java/cn/sliew/scaleph/dao/mapper/master/dag/DagConfigStepMapper.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.sliew.scaleph.dao.mapper.master.dag;
+
+import cn.sliew.scaleph.dao.entity.master.dag.DagConfigStep;
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import org.springframework.stereotype.Repository;
+
+/**
+ * DAG 配置步骤 Mapper 接口
+ */
+@Repository
+public interface DagConfigStepMapper extends BaseMapper {
+
+}
diff --git a/scaleph-dao/src/main/resources/cn/sliew/scaleph/dao/mapper/master/dag/DagConfigLinkMapper.xml b/scaleph-dao/src/main/resources/cn/sliew/scaleph/dao/mapper/master/dag/DagConfigLinkMapper.xml
new file mode 100644
index 000000000..cdda71844
--- /dev/null
+++ b/scaleph-dao/src/main/resources/cn/sliew/scaleph/dao/mapper/master/dag/DagConfigLinkMapper.xml
@@ -0,0 +1,50 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ id,
+ creator,
+ create_time,
+ editor,
+ update_time,
+ dag_id, link_id, link_name, from_step_id, to_step_id, shape, style, link_meta, link_attrs
+
+
+
diff --git a/scaleph-dao/src/main/resources/cn/sliew/scaleph/dao/mapper/master/dag/DagConfigMapper.xml b/scaleph-dao/src/main/resources/cn/sliew/scaleph/dao/mapper/master/dag/DagConfigMapper.xml
new file mode 100644
index 000000000..bad8bfd10
--- /dev/null
+++ b/scaleph-dao/src/main/resources/cn/sliew/scaleph/dao/mapper/master/dag/DagConfigMapper.xml
@@ -0,0 +1,50 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ id,
+ creator,
+ create_time,
+ editor,
+ update_time,
+ `type`, `name`, config_id, dag_meta, dag_attrs, intput_options, output_options, version, remark
+
+
+
diff --git a/scaleph-dao/src/main/resources/cn/sliew/scaleph/dao/mapper/master/dag/DagConfigStepMapper.xml b/scaleph-dao/src/main/resources/cn/sliew/scaleph/dao/mapper/master/dag/DagConfigStepMapper.xml
new file mode 100644
index 000000000..3122667db
--- /dev/null
+++ b/scaleph-dao/src/main/resources/cn/sliew/scaleph/dao/mapper/master/dag/DagConfigStepMapper.xml
@@ -0,0 +1,50 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ id,
+ creator,
+ create_time,
+ editor,
+ update_time,
+ dag_id, step_id, step_name, position_x, position_y, shape, style, step_meta, step_attrs
+
+
+
diff --git a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/action/FlinkJobStatusSyncJob2.java b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/action/FlinkJobStatusSyncJobStepOne.java
similarity index 85%
rename from scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/action/FlinkJobStatusSyncJob2.java
rename to scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/action/FlinkJobStatusSyncJobStepOne.java
index 6952b54c3..50c033e37 100644
--- a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/action/FlinkJobStatusSyncJob2.java
+++ b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/action/FlinkJobStatusSyncJobStepOne.java
@@ -27,10 +27,10 @@
@Slf4j
@Component
-public class FlinkJobStatusSyncJob2 extends AbstractWorkFlow {
+public class FlinkJobStatusSyncJobStepOne extends AbstractWorkFlow {
- public FlinkJobStatusSyncJob2() {
- super("FLINK_JOB_STATUS_SYNC_JOB2");
+ public FlinkJobStatusSyncJobStepOne() {
+ super("FLINK_JOB_STATUS_SYNC_JOB_STEP_ONE");
}
@Override
@@ -39,7 +39,7 @@ protected Runnable doExecute(ActionContext context, ActionListener
}
private void process() {
- log.info("update flink kubernetes job status success!");
+ log.info("update flink kubernetes job status step-1");
}
}
diff --git a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/action/FlinkJobStatusSyncJobStepThreeOne.java b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/action/FlinkJobStatusSyncJobStepThreeOne.java
new file mode 100644
index 000000000..91f039fd5
--- /dev/null
+++ b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/action/FlinkJobStatusSyncJobStepThreeOne.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.sliew.scaleph.engine.flink.kubernetes.action;
+
+import cn.sliew.milky.common.filter.ActionListener;
+import cn.sliew.scaleph.workflow.engine.action.ActionContext;
+import cn.sliew.scaleph.workflow.engine.action.ActionResult;
+import cn.sliew.scaleph.workflow.engine.workflow.AbstractWorkFlow;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class FlinkJobStatusSyncJobStepThreeOne extends AbstractWorkFlow {
+
+ public FlinkJobStatusSyncJobStepThreeOne() {
+ super("FLINK_JOB_STATUS_SYNC_JOB_STEP_THREE_ONE");
+ }
+
+ @Override
+ protected Runnable doExecute(ActionContext context, ActionListener listener) {
+ return () -> process();
+ }
+
+ private void process() {
+ log.info("update flink kubernetes job status step-3-1");
+ }
+
+}
diff --git a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/action/FlinkJobStatusSyncJobStepThreeTwo.java b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/action/FlinkJobStatusSyncJobStepThreeTwo.java
new file mode 100644
index 000000000..d0b275b9a
--- /dev/null
+++ b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/action/FlinkJobStatusSyncJobStepThreeTwo.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.sliew.scaleph.engine.flink.kubernetes.action;
+
+import cn.sliew.milky.common.filter.ActionListener;
+import cn.sliew.scaleph.workflow.engine.action.ActionContext;
+import cn.sliew.scaleph.workflow.engine.action.ActionResult;
+import cn.sliew.scaleph.workflow.engine.workflow.AbstractWorkFlow;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class FlinkJobStatusSyncJobStepThreeTwo extends AbstractWorkFlow {
+
+ public FlinkJobStatusSyncJobStepThreeTwo() {
+ super("FLINK_JOB_STATUS_SYNC_JOB_STEP_THREE_TWO");
+ }
+
+ @Override
+ protected Runnable doExecute(ActionContext context, ActionListener listener) {
+ return () -> process();
+ }
+
+ private void process() {
+ log.info("update flink kubernetes job status step-3-2");
+ }
+
+}
diff --git a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/action/FlinkJobStatusSyncJobStepTwo.java b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/action/FlinkJobStatusSyncJobStepTwo.java
new file mode 100644
index 000000000..ef96455d1
--- /dev/null
+++ b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/action/FlinkJobStatusSyncJobStepTwo.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.sliew.scaleph.engine.flink.kubernetes.action;
+
+import cn.sliew.milky.common.filter.ActionListener;
+import cn.sliew.scaleph.workflow.engine.action.ActionContext;
+import cn.sliew.scaleph.workflow.engine.action.ActionResult;
+import cn.sliew.scaleph.workflow.engine.workflow.AbstractWorkFlow;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class FlinkJobStatusSyncJobStepTwo extends AbstractWorkFlow {
+
+ public FlinkJobStatusSyncJobStepTwo() {
+ super("FLINK_JOB_STATUS_SYNC_JOB_STEP_TWO");
+ }
+
+ @Override
+ protected Runnable doExecute(ActionContext context, ActionListener listener) {
+ return () -> process();
+ }
+
+ private void process() {
+ log.info("update flink kubernetes job status step-2");
+ }
+
+}
diff --git a/scaleph-kubernetes/pom.xml b/scaleph-kubernetes/pom.xml
index 9fd517e62..7603093b7 100644
--- a/scaleph-kubernetes/pom.xml
+++ b/scaleph-kubernetes/pom.xml
@@ -16,6 +16,10 @@
${project.parent.groupId}
scaleph-resource
+
+ ${project.parent.groupId}
+ scaleph-workflow-quartz
+
io.fabric8
diff --git a/scaleph-kubernetes/src/main/java/cn/sliew/scaleph/kubernetes/service/impl/KubernetesServiceImpl.java b/scaleph-kubernetes/src/main/java/cn/sliew/scaleph/kubernetes/service/impl/KubernetesServiceImpl.java
index 1e0ae1032..47f4028af 100644
--- a/scaleph-kubernetes/src/main/java/cn/sliew/scaleph/kubernetes/service/impl/KubernetesServiceImpl.java
+++ b/scaleph-kubernetes/src/main/java/cn/sliew/scaleph/kubernetes/service/impl/KubernetesServiceImpl.java
@@ -24,6 +24,8 @@
import cn.sliew.scaleph.kubernetes.service.KubernetesService;
import cn.sliew.scaleph.resource.service.ClusterCredentialService;
import cn.sliew.scaleph.resource.service.dto.ClusterCredentialDTO;
+import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.Table;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
@@ -45,7 +47,7 @@
public class KubernetesServiceImpl implements KubernetesService {
private ConcurrentMap cache = new ConcurrentHashMap<>(4);
- private ConcurrentMap> namespacedCache = new ConcurrentHashMap<>(4);
+ private Table clientCache = HashBasedTable.create();
@Autowired
private ClusterCredentialService clusterCredentialService;
@@ -57,8 +59,10 @@ public KubernetesClient getClient(Long clusterCredentialId) {
@Override
public NamespacedKubernetesClient getClient(Long clusterCredentialId, String namespace) {
- ConcurrentMap clientMap = namespacedCache.computeIfAbsent(clusterCredentialId, (key) -> new ConcurrentHashMap());
- return clientMap.computeIfAbsent(namespace, (key) -> buildNamespacedClient(clusterCredentialId, namespace));
+ if (clientCache.contains(clusterCredentialId, namespace) == false) {
+ clientCache.put(clusterCredentialId, namespace, buildNamespacedClient(clusterCredentialId, namespace));
+ }
+ return clientCache.get(clusterCredentialId, namespace);
}
private KubernetesClient buildClient(Long clusterCredentialId) {
diff --git a/scaleph-kubernetes/src/main/java/cn/sliew/scaleph/kubernetes/watch/cron/CronManager.java b/scaleph-kubernetes/src/main/java/cn/sliew/scaleph/kubernetes/watch/cron/CronManager.java
new file mode 100644
index 000000000..c11b17278
--- /dev/null
+++ b/scaleph-kubernetes/src/main/java/cn/sliew/scaleph/kubernetes/watch/cron/CronManager.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.sliew.scaleph.kubernetes.watch.cron;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.DisposableBean;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+@Component
+public class CronManager implements InitializingBean, DisposableBean {
+
+ @Override
+ public void destroy() throws Exception {
+
+ }
+
+ @Override
+ public void afterPropertiesSet() throws Exception {
+
+ }
+
+ private void checkStatus(WatchCronIntervalEnum intervalEnum) {
+// log.info("check status");
+ }
+
+ /**
+ * quartz 配置为集群模式,这里无需添加分布式锁保证调度任务不重复执行
+ */
+ @Scheduled(fixedDelay = 5, timeUnit = TimeUnit.SECONDS)
+ public void refresh5s() {
+ checkStatus(WatchCronIntervalEnum.LEVEL_5s);
+ }
+
+ @Scheduled(fixedDelay = 10, timeUnit = TimeUnit.SECONDS)
+ public void refresh10s() {
+ checkStatus(WatchCronIntervalEnum.LEVEL_10s);
+ }
+
+ @Scheduled(fixedDelay = 30, timeUnit = TimeUnit.SECONDS)
+ public void refresh30s() {
+ checkStatus(WatchCronIntervalEnum.LEVEL_30s);
+ }
+
+ @Scheduled(fixedDelay = 1, timeUnit = TimeUnit.MINUTES)
+ public void refresh1m() {
+ checkStatus(WatchCronIntervalEnum.LEVEL_1m);
+ }
+
+ @Scheduled(fixedDelay = 2, timeUnit = TimeUnit.MINUTES)
+ public void refresh2m() {
+ checkStatus(WatchCronIntervalEnum.LEVEL_2m);
+ }
+
+ @Scheduled(fixedDelay = 3, timeUnit = TimeUnit.MINUTES)
+ public void refresh3m() {
+ checkStatus(WatchCronIntervalEnum.LEVEL_3m);
+ }
+
+ @Scheduled(fixedDelay = 4, timeUnit = TimeUnit.MINUTES)
+ public void refresh4m() {
+ checkStatus(WatchCronIntervalEnum.LEVEL_4m);
+ }
+
+ @Scheduled(fixedDelay = 5, timeUnit = TimeUnit.MINUTES)
+ public void refresh5m() {
+ checkStatus(WatchCronIntervalEnum.LEVEL_5m);
+ }
+}
diff --git a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/model/Task.java b/scaleph-kubernetes/src/main/java/cn/sliew/scaleph/kubernetes/watch/cron/KubernetesCronHandler.java
similarity index 81%
rename from scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/model/Task.java
rename to scaleph-kubernetes/src/main/java/cn/sliew/scaleph/kubernetes/watch/cron/KubernetesCronHandler.java
index 75acd3b10..8c792fc51 100644
--- a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/model/Task.java
+++ b/scaleph-kubernetes/src/main/java/cn/sliew/scaleph/kubernetes/watch/cron/KubernetesCronHandler.java
@@ -16,15 +16,11 @@
* limitations under the License.
*/
-package cn.sliew.scaleph.workflow.model;
+package cn.sliew.scaleph.kubernetes.watch.cron;
-import java.util.List;
+import io.fabric8.kubernetes.client.KubernetesClient;
-public interface Task {
+public interface KubernetesCronHandler {
- String getName();
-
- List getInputs();
-
- List getOutputs();
+ void checkStatus(KubernetesClient client);
}
diff --git a/scaleph-kubernetes/src/main/java/cn/sliew/scaleph/kubernetes/watch/cron/WatchCronIntervalEnum.java b/scaleph-kubernetes/src/main/java/cn/sliew/scaleph/kubernetes/watch/cron/WatchCronIntervalEnum.java
new file mode 100644
index 000000000..007895bde
--- /dev/null
+++ b/scaleph-kubernetes/src/main/java/cn/sliew/scaleph/kubernetes/watch/cron/WatchCronIntervalEnum.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.sliew.scaleph.kubernetes.watch.cron;
+
+import lombok.Getter;
+
+import java.time.Duration;
+
+@Getter
+public enum WatchCronIntervalEnum {
+
+ LEVEL_5s(60L, Duration.ofSeconds(5L)),
+ LEVEL_10s(120L, Duration.ofSeconds(10L)),
+ LEVEL_30s(180L, Duration.ofSeconds(30L)),
+ LEVEL_1m(240L, Duration.ofMinutes(1L)),
+ LEVEL_2m(300L, Duration.ofMinutes(2L)),
+ LEVEL_3m(360L, Duration.ofMinutes(3L)),
+ LEVEL_4m(420L, Duration.ofMinutes(4L)),
+ LEVEL_5m(Long.MAX_VALUE, Duration.ofMinutes(5L)),
+ ;
+
+ private Long maxCount;
+ private Duration interval;
+
+ WatchCronIntervalEnum(Long maxCount, Duration interval) {
+ this.maxCount = maxCount;
+ this.interval = interval;
+ }
+}
diff --git a/scaleph-kubernetes/src/main/java/cn/sliew/scaleph/kubernetes/watch/informer/InformerManager.java b/scaleph-kubernetes/src/main/java/cn/sliew/scaleph/kubernetes/watch/informer/InformerManager.java
new file mode 100644
index 000000000..37c27b16e
--- /dev/null
+++ b/scaleph-kubernetes/src/main/java/cn/sliew/scaleph/kubernetes/watch/informer/InformerManager.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.sliew.scaleph.kubernetes.watch.informer;
+
+import cn.sliew.scaleph.kubernetes.service.KubernetesService;
+import cn.sliew.scaleph.resource.service.ClusterCredentialService;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.informers.SharedInformerFactory;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.DisposableBean;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+@Slf4j
+@Component
+public class InformerManager implements InitializingBean, DisposableBean {
+
+ private final ConcurrentMap informerMap = new ConcurrentHashMap<>();
+
+ @Autowired
+ private ClusterCredentialService clusterCredentialService;
+ @Autowired
+ private KubernetesService kubernetesService;
+ @Autowired(required = false)
+ private Map watchHandlers;
+
+ @Override
+ public void afterPropertiesSet() throws Exception {
+ List clusterCredentialIds = clusterCredentialService.listAll();
+ for (Long clusterCredentialId : clusterCredentialIds) {
+ initCluster(clusterCredentialId);
+ }
+ }
+
+ @Override
+ public void destroy() throws Exception {
+ informerMap.forEach(this::stopCluster);
+ }
+
+ private void initCluster(Long clusterCredentialId) {
+ KubernetesClient client = kubernetesService.getClient(clusterCredentialId);
+ informerMap.put(clusterCredentialId, client.informers());
+ SharedInformerFactory informer = client.informers();
+ watchHandlers.values().forEach(handler -> registerWatchHandler(clusterCredentialId, client, informer, handler));
+ informer.startAllRegisteredInformers();
+ log.info("shared informer start success, clusterCredentialId: {}", clusterCredentialId);
+ }
+
+ private void registerWatchHandler(Long clusterCredentialId, KubernetesClient client, SharedInformerFactory informer, KubernetesInformerWatchHandler watchHandler) {
+ watchHandler.register(clusterCredentialId, client, informer);
+ }
+
+ private void stopCluster(Long clusterCredentialId, SharedInformerFactory informer) {
+ informer.stopAllRegisteredInformers();
+ log.info("shared informer stop success, clusterCredentialId {}", clusterCredentialId);
+ }
+}
diff --git a/scaleph-kubernetes/src/main/java/cn/sliew/scaleph/kubernetes/watch/informer/KubernetesInformerWatchHandler.java b/scaleph-kubernetes/src/main/java/cn/sliew/scaleph/kubernetes/watch/informer/KubernetesInformerWatchHandler.java
new file mode 100644
index 000000000..7ca04369a
--- /dev/null
+++ b/scaleph-kubernetes/src/main/java/cn/sliew/scaleph/kubernetes/watch/informer/KubernetesInformerWatchHandler.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.sliew.scaleph.kubernetes.watch.informer;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.informers.SharedInformerFactory;
+
+public interface KubernetesInformerWatchHandler {
+
+ void register(Long clusterCredentialId, KubernetesClient client, SharedInformerFactory sharedInformerFactory);
+}
diff --git a/scaleph-resource/src/main/java/cn/sliew/scaleph/resource/service/ClusterCredentialService.java b/scaleph-resource/src/main/java/cn/sliew/scaleph/resource/service/ClusterCredentialService.java
index 66452ba62..c7d048cc6 100644
--- a/scaleph-resource/src/main/java/cn/sliew/scaleph/resource/service/ClusterCredentialService.java
+++ b/scaleph-resource/src/main/java/cn/sliew/scaleph/resource/service/ClusterCredentialService.java
@@ -32,6 +32,8 @@ public interface ClusterCredentialService extends ResourceDescriptor list(ClusterCredentialListParam param);
+ List listAll();
+
ClusterCredentialDTO selectOne(Long id);
void upload(ClusterCredentialUploadParam param, MultipartFile file) throws IOException;
diff --git a/scaleph-resource/src/main/java/cn/sliew/scaleph/resource/service/impl/ClusterCredentialServiceImpl.java b/scaleph-resource/src/main/java/cn/sliew/scaleph/resource/service/impl/ClusterCredentialServiceImpl.java
index e07101f4c..c1433c3df 100644
--- a/scaleph-resource/src/main/java/cn/sliew/scaleph/resource/service/impl/ClusterCredentialServiceImpl.java
+++ b/scaleph-resource/src/main/java/cn/sliew/scaleph/resource/service/impl/ClusterCredentialServiceImpl.java
@@ -28,6 +28,7 @@
import cn.sliew.scaleph.resource.service.param.ClusterCredentialUploadParam;
import cn.sliew.scaleph.resource.service.param.ResourceListParam;
import cn.sliew.scaleph.storage.service.FileSystemService;
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.hadoop.fs.Path;
@@ -42,6 +43,7 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
+import java.util.stream.Collectors;
import static cn.sliew.milky.common.check.Ensures.checkState;
@@ -83,6 +85,14 @@ public Page list(ClusterCredentialListParam param) {
return result;
}
+ @Override
+ public List listAll() {
+ LambdaQueryWrapper queryWrapper = Wrappers.lambdaQuery(ResourceClusterCredential.class)
+ .select(ResourceClusterCredential::getId);
+ List records = resourceClusterCredentialMapper.selectList(queryWrapper);
+ return records.stream().map(ResourceClusterCredential::getId).collect(Collectors.toList());
+ }
+
@Override
public ClusterCredentialDTO selectOne(Long id) {
ResourceClusterCredential record = resourceClusterCredentialMapper.selectById(id);
diff --git a/scaleph-support/scaleph-dag/pom.xml b/scaleph-support/scaleph-dag/pom.xml
index 740fd7b31..a1f425d0e 100644
--- a/scaleph-support/scaleph-dag/pom.xml
+++ b/scaleph-support/scaleph-dag/pom.xml
@@ -30,7 +30,17 @@
${project.parent.groupId}
- scaleph-system
+ scaleph-queue
+
+
+
+ com.alibaba.cola
+ cola-component-statemachine
+
+
+
+ org.jgrapht
+ jgrapht-core
diff --git a/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/algorithm/DAG.java b/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/algorithm/DAG.java
new file mode 100644
index 000000000..79cb9733e
--- /dev/null
+++ b/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/algorithm/DAG.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.sliew.scaleph.dag.algorithm;
+
+import org.jgrapht.Graph;
+import org.jgrapht.GraphPath;
+import org.jgrapht.alg.shortestpath.AllDirectedPaths;
+import org.jgrapht.graph.DefaultEdge;
+import org.jgrapht.graph.builder.GraphTypeBuilder;
+import org.jgrapht.traverse.BreadthFirstIterator;
+import org.jgrapht.traverse.TopologicalOrderIterator;
+
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+public class DAG {
+
+ private Graph jgrapht = GraphTypeBuilder.directed()
+ .allowingSelfLoops(false)
+ .weighted(false)
+ .buildGraph();
+
+ public Set getSources() {
+ return jgrapht.vertexSet().stream()
+ .filter(node -> jgrapht.inDegreeOf(node) == 0)
+ .collect(Collectors.toSet());
+ }
+
+ public Set getSinks() {
+ return jgrapht.vertexSet().stream()
+ .filter(node -> jgrapht.outDegreeOf(node) == 0)
+ .collect(Collectors.toSet());
+ }
+
+ public Integer getMaxDepth() {
+ AllDirectedPaths paths = new AllDirectedPaths<>(jgrapht);
+ return paths.getAllPaths(getSources(), getSinks(), true, null)
+ .stream().map(GraphPath::getLength)
+ .sorted()
+ .findFirst().get();
+ }
+
+ public void topologyTraversal(Consumer consumer) {
+ TopologicalOrderIterator iterator = new TopologicalOrderIterator<>(jgrapht);
+ while (iterator.hasNext()) {
+ consumer.accept(iterator.next());
+ }
+ }
+
+ public void breadthFirstTraversal(Consumer consumer) {
+ BreadthFirstIterator iterator = new BreadthFirstIterator<>(jgrapht);
+ while (iterator.hasNext()) {
+ N node = iterator.next();
+ int depth = iterator.getDepth(node);
+ consumer.accept(node);
+ }
+ }
+
+ /**
+ * 一层一层执行
+ */
+ public void executeBFS(Consumer consumer) {
+ doExecuteBFS(0, getMaxDepth(), consumer);
+ }
+
+ public void doExecuteBFS(Integer depth, Integer maxDepth, Consumer consumer) {
+ // 使用 BFS,计算每个节点所在的层级
+ BreadthFirstIterator iterator = new BreadthFirstIterator<>(jgrapht);
+ while (iterator.hasNext()) {
+ N node = iterator.next();
+ if (iterator.getDepth(node) == depth) {
+ consumer.accept(node);
+ }
+ }
+ if (depth <= maxDepth) {
+ // 开始遍历第二层
+ doExecuteBFS(depth + 1, maxDepth, consumer);
+ }
+ }
+}
diff --git a/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/DagConfigComplexService.java b/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/DagConfigComplexService.java
new file mode 100644
index 000000000..e8c01f0dd
--- /dev/null
+++ b/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/DagConfigComplexService.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.sliew.scaleph.dag.service;
+
+import cn.sliew.scaleph.dag.service.dto.DagConfigComplexDTO;
+import cn.sliew.scaleph.dag.service.dto.DagConfigDTO;
+import cn.sliew.scaleph.dag.service.param.DagConfigSimpleAddParam;
+import cn.sliew.scaleph.dag.service.param.DagConfigSimpleUpdateParam;
+import cn.sliew.scaleph.dag.service.vo.DagGraphVO;
+
+import java.util.List;
+
+public interface DagConfigComplexService {
+
+ DagConfigComplexDTO selectOne(Long dagId);
+
+ DagConfigDTO selectSimpleOne(Long dagId);
+
+ Long insert(DagConfigSimpleAddParam param);
+
+ int update(DagConfigSimpleUpdateParam param);
+
+ void replace(Long dagId, DagGraphVO graph);
+
+ Long clone(Long dagId);
+
+ int delete(Long dagId);
+
+ int deleteBatch(List dagIds);
+}
diff --git a/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/DagConfigLinkService.java b/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/DagConfigLinkService.java
new file mode 100644
index 000000000..d0a7d89c6
--- /dev/null
+++ b/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/DagConfigLinkService.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.sliew.scaleph.dag.service;
+
+import cn.sliew.scaleph.dag.service.dto.DagConfigLinkDTO;
+
+import java.util.List;
+
+public interface DagConfigLinkService {
+
+ List listLinks(Long dagId);
+
+ int insert(DagConfigLinkDTO linkDTO);
+
+ int update(DagConfigLinkDTO linkDTO);
+
+ int upsert(DagConfigLinkDTO linkDTO);
+
+ int deleteByDag(Long dagId);
+
+ int deleteByDag(List dagIds);
+
+ int deleteSurplusLinks(Long dagId, List linkIds);
+
+ int clone(Long sourceDagId, Long targetDagId);
+}
diff --git a/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/DagConfigService.java b/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/DagConfigService.java
new file mode 100644
index 000000000..885c2e598
--- /dev/null
+++ b/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/DagConfigService.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.sliew.scaleph.dag.service;
+
+import cn.sliew.scaleph.dag.service.dto.DagConfigDTO;
+
+import java.util.List;
+
+public interface DagConfigService {
+
+ DagConfigDTO selectOne(Long dagId);
+
+ Long insert(DagConfigDTO instanceDTO);
+
+ int update(DagConfigDTO instanceDTO);
+
+ void upsert(DagConfigDTO instanceDTO);
+
+ int delete(Long dagId);
+
+ int deleteBatch(List dagIds);
+
+ Long clone(Long dagId);
+}
diff --git a/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/DagConfigStepService.java b/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/DagConfigStepService.java
new file mode 100644
index 000000000..71dd03b65
--- /dev/null
+++ b/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/DagConfigStepService.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.sliew.scaleph.dag.service;
+
+import cn.sliew.scaleph.dag.service.dto.DagConfigStepDTO;
+
+import java.util.List;
+
+public interface DagConfigStepService {
+
+ List listSteps(Long dagId);
+
+ DagConfigStepDTO selectOne(Long stepId);
+
+ int insert(DagConfigStepDTO stepDTO);
+
+ int update(DagConfigStepDTO stepDTO);
+
+ int upsert(DagConfigStepDTO stepDTO);
+
+ int deleteByDag(Long dagId);
+
+ int deleteByDag(List dagIds);
+
+ int deleteSurplusSteps(Long dagId, List stepIds);
+
+ int clone(Long sourceDagId, Long targetDagId);
+}
diff --git a/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/convert/DagConfigConvert.java b/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/convert/DagConfigConvert.java
new file mode 100644
index 000000000..4fb176e24
--- /dev/null
+++ b/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/convert/DagConfigConvert.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.sliew.scaleph.dag.service.convert;
+
+import cn.sliew.milky.common.util.JacksonUtil;
+import cn.sliew.scaleph.common.convert.BaseConvert;
+import cn.sliew.scaleph.dag.service.dto.DagConfigDTO;
+import cn.sliew.scaleph.dao.entity.master.dag.DagConfig;
+import org.mapstruct.Mapper;
+import org.mapstruct.ReportingPolicy;
+import org.mapstruct.factory.Mappers;
+import org.springframework.beans.BeanUtils;
+import org.springframework.util.StringUtils;
+
+@Mapper(unmappedTargetPolicy = ReportingPolicy.IGNORE)
+public interface DagConfigConvert extends BaseConvert {
+ DagConfigConvert INSTANCE = Mappers.getMapper(DagConfigConvert.class);
+
+ @Override
+ default DagConfig toDo(DagConfigDTO dto) {
+ DagConfig entity = new DagConfig();
+ BeanUtils.copyProperties(dto, entity);
+ if (dto.getDagMeta() != null) {
+ entity.setDagMeta(dto.getDagMeta().toString());
+ }
+ if (dto.getDagAttrs() != null) {
+ entity.setDagAttrs(dto.getDagAttrs().toString());
+ }
+ if (dto.getIntputOptions() != null) {
+ entity.setIntputOptions(dto.getIntputOptions().toString());
+ }
+ if (dto.getOutputOptions() != null) {
+ entity.setOutputOptions(dto.getOutputOptions().toString());
+ }
+ return entity;
+ }
+
+ @Override
+ default DagConfigDTO toDto(DagConfig entity) {
+ DagConfigDTO dto = new DagConfigDTO();
+ BeanUtils.copyProperties(entity, dto);
+ if (StringUtils.hasText(entity.getDagMeta())) {
+ dto.setDagMeta(JacksonUtil.toJsonNode(entity.getDagMeta()));
+ }
+ if (StringUtils.hasText(entity.getDagAttrs())) {
+ dto.setDagAttrs(JacksonUtil.toJsonNode(entity.getDagAttrs()));
+ }
+ if (StringUtils.hasText(entity.getIntputOptions())) {
+ dto.setIntputOptions(JacksonUtil.toJsonNode(entity.getIntputOptions()));
+ }
+ if (StringUtils.hasText(entity.getOutputOptions())) {
+ dto.setOutputOptions(JacksonUtil.toJsonNode(entity.getOutputOptions()));
+ }
+ return dto;
+ }
+}
diff --git a/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/convert/DagConfigLinkConvert.java b/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/convert/DagConfigLinkConvert.java
new file mode 100644
index 000000000..011786d1d
--- /dev/null
+++ b/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/convert/DagConfigLinkConvert.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.sliew.scaleph.dag.service.convert;
+
+import cn.sliew.milky.common.util.JacksonUtil;
+import cn.sliew.scaleph.common.convert.BaseConvert;
+import cn.sliew.scaleph.dag.service.dto.DagConfigLinkDTO;
+import cn.sliew.scaleph.dao.entity.master.dag.DagConfigLink;
+import org.mapstruct.Mapper;
+import org.mapstruct.ReportingPolicy;
+import org.mapstruct.factory.Mappers;
+import org.springframework.beans.BeanUtils;
+import org.springframework.util.StringUtils;
+
+@Mapper(unmappedTargetPolicy = ReportingPolicy.IGNORE)
+public interface DagConfigLinkConvert extends BaseConvert {
+ DagConfigLinkConvert INSTANCE = Mappers.getMapper(DagConfigLinkConvert.class);
+
+ @Override
+ default DagConfigLink toDo(DagConfigLinkDTO dto) {
+ DagConfigLink entity = new DagConfigLink();
+ BeanUtils.copyProperties(dto, entity);
+ if (dto.getStyle() != null) {
+ entity.setStyle(dto.getStyle().toString());
+ }
+ if (dto.getLinkMeta() != null) {
+ entity.setLinkMeta(dto.getLinkMeta().toString());
+ }
+ if (dto.getLinkAttrs() != null) {
+ entity.setLinkAttrs(dto.getLinkAttrs().toString());
+ }
+ return entity;
+ }
+
+ @Override
+ default DagConfigLinkDTO toDto(DagConfigLink entity) {
+ DagConfigLinkDTO dto = new DagConfigLinkDTO();
+ BeanUtils.copyProperties(entity, dto);
+ if (StringUtils.hasText(entity.getStyle())) {
+ dto.setStyle(JacksonUtil.toJsonNode(entity.getStyle()));
+ }
+ if (StringUtils.hasText(entity.getLinkMeta())) {
+ dto.setLinkMeta(JacksonUtil.toJsonNode(entity.getLinkMeta()));
+ }
+ if (StringUtils.hasText(entity.getLinkAttrs())) {
+ dto.setLinkAttrs(JacksonUtil.toJsonNode(entity.getLinkAttrs()));
+ }
+ return dto;
+ }
+}
diff --git a/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/convert/DagConfigStepConvert.java b/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/convert/DagConfigStepConvert.java
new file mode 100644
index 000000000..9dc8b588b
--- /dev/null
+++ b/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/convert/DagConfigStepConvert.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.sliew.scaleph.dag.service.convert;
+
+import cn.sliew.milky.common.util.JacksonUtil;
+import cn.sliew.scaleph.common.convert.BaseConvert;
+import cn.sliew.scaleph.dag.service.dto.DagConfigStepDTO;
+import cn.sliew.scaleph.dao.entity.master.dag.DagConfigStep;
+import org.mapstruct.Mapper;
+import org.mapstruct.ReportingPolicy;
+import org.mapstruct.factory.Mappers;
+import org.springframework.beans.BeanUtils;
+import org.springframework.util.StringUtils;
+
+@Mapper(unmappedTargetPolicy = ReportingPolicy.IGNORE)
+public interface DagConfigStepConvert extends BaseConvert {
+ DagConfigStepConvert INSTANCE = Mappers.getMapper(DagConfigStepConvert.class);
+
+ @Override
+ default DagConfigStep toDo(DagConfigStepDTO dto) {
+ DagConfigStep entity = new DagConfigStep();
+ BeanUtils.copyProperties(dto, entity);
+ if (dto.getStyle() != null) {
+ entity.setStyle(dto.getStyle().toString());
+ }
+ if (dto.getStepMeta() != null) {
+ entity.setStepMeta(dto.getStepMeta().toString());
+ }
+ if (dto.getStepAttrs() != null) {
+ entity.setStepAttrs(dto.getStepAttrs().toString());
+ }
+ return entity;
+ }
+
+ @Override
+ default DagConfigStepDTO toDto(DagConfigStep entity) {
+ DagConfigStepDTO dto = new DagConfigStepDTO();
+ BeanUtils.copyProperties(entity, dto);
+ if (StringUtils.hasText(entity.getStyle())) {
+ dto.setStyle(JacksonUtil.toJsonNode(entity.getStyle()));
+ }
+ if (StringUtils.hasText(entity.getStepMeta())) {
+ dto.setStepMeta(JacksonUtil.toJsonNode(entity.getStepMeta()));
+ }
+ if (StringUtils.hasText(entity.getStepAttrs())) {
+ dto.setStepAttrs(JacksonUtil.toJsonNode(entity.getStepAttrs()));
+ }
+ return dto;
+ }
+}
diff --git a/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/dto/DagConfigComplexDTO.java b/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/dto/DagConfigComplexDTO.java
new file mode 100644
index 000000000..985822333
--- /dev/null
+++ b/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/dto/DagConfigComplexDTO.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.sliew.scaleph.dag.service.dto;
+
+import io.swagger.v3.oas.annotations.media.Schema;
+import lombok.Data;
+
+import java.util.List;
+
+/**
+ * DAG 实例
+ */
+@Data
+@Schema(name = "Dag", description = "DAG")
+public class DagConfigComplexDTO extends DagConfigDTO {
+
+ @Schema(description = "连线")
+ private List links;
+
+ @Schema(description = "步骤")
+ private List steps;
+
+}
diff --git a/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/dto/DagConfigDTO.java b/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/dto/DagConfigDTO.java
new file mode 100644
index 000000000..b1f9ac9c4
--- /dev/null
+++ b/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/dto/DagConfigDTO.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.sliew.scaleph.dag.service.dto;
+
+import cn.sliew.scaleph.system.model.BaseDTO;
+import com.fasterxml.jackson.databind.JsonNode;
+import io.swagger.v3.oas.annotations.media.Schema;
+import lombok.Data;
+
+/**
+ * DAG 配置
+ */
+@Data
+@Schema(name = "DagConfig", description = "DAG 配置")
+public class DagConfigDTO extends BaseDTO {
+
+ private static final long serialVersionUID = 1L;
+
+ @Schema(description = "DAG 类型")
+ private String type;
+
+ @Schema(description = "DAG名称")
+ private String name;
+
+ @Schema(description = "DAG ID")
+ private String configId;
+
+ @Schema(description = "DAG元信息")
+ private JsonNode dagMeta;
+
+ @Schema(description = "DAG属性")
+ private JsonNode dagAttrs;
+
+ @Schema(description = "输入参数声明")
+ private JsonNode intputOptions;
+
+ @Schema(description = "输出参数声明")
+ private JsonNode outputOptions;
+
+ @Schema(description = "版本号")
+ private Integer version;
+
+ @Schema(description = "备注")
+ private String remark;
+}
diff --git a/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/dto/DagConfigLinkDTO.java b/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/dto/DagConfigLinkDTO.java
new file mode 100644
index 000000000..e8af6d8af
--- /dev/null
+++ b/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/dto/DagConfigLinkDTO.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.sliew.scaleph.dag.service.dto;
+
+import cn.sliew.scaleph.system.model.BaseDTO;
+import com.fasterxml.jackson.databind.JsonNode;
+import io.swagger.v3.oas.annotations.media.Schema;
+import lombok.Data;
+
+/**
+ * DAG 配置连线
+ */
+@Data
+@Schema(name = "DagConfigLink", description = "DAG 配置连线")
+public class DagConfigLinkDTO extends BaseDTO {
+
+ private static final long serialVersionUID = 1L;
+
+ @Schema(description = "DAG id")
+ private Long dagId;
+
+ @Schema(description = "连线id")
+ private String linkId;
+
+ @Schema(description = "连线名称")
+ private String linkName;
+
+ @Schema(description = "源步骤id")
+ private String fromStepId;
+
+ @Schema(description = "目标步骤id")
+ private String toStepId;
+
+ private String shape;
+
+ private JsonNode style;
+
+ @Schema(description = "连线元信息")
+ private JsonNode linkMeta;
+
+ @Schema(description = "连线属性")
+ private JsonNode linkAttrs;
+}
diff --git a/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/dto/DagConfigStepDTO.java b/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/dto/DagConfigStepDTO.java
new file mode 100644
index 000000000..cd1fd8dc2
--- /dev/null
+++ b/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/dto/DagConfigStepDTO.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.sliew.scaleph.dag.service.dto;
+
+import cn.sliew.scaleph.system.model.BaseDTO;
+import com.fasterxml.jackson.databind.JsonNode;
+import io.swagger.v3.oas.annotations.media.Schema;
+import lombok.Data;
+
+/**
+ * DAG 配置步骤
+ */
+@Data
+@Schema(name = "DagConfigStep", description = "DAG 配置步骤")
+public class DagConfigStepDTO extends BaseDTO {
+
+ private static final long serialVersionUID = 1L;
+
+ @Schema(description = "DAG id")
+ private Long dagId;
+
+ @Schema(description = "步骤id")
+ private String stepId;
+
+ @Schema(description = "步骤名称")
+ private String stepName;
+
+ @Schema(description = "x坐标")
+ private Integer positionX;
+
+ @Schema(description = "y坐标")
+ private Integer positionY;
+
+ private String shape;
+
+ private JsonNode style;
+
+ @Schema(description = "步骤元信息")
+ private JsonNode stepMeta;
+
+ @Schema(description = "步骤属性")
+ private JsonNode stepAttrs;
+}
diff --git a/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/impl/DagConfigComplexServiceImpl.java b/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/impl/DagConfigComplexServiceImpl.java
new file mode 100644
index 000000000..acc459f3b
--- /dev/null
+++ b/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/impl/DagConfigComplexServiceImpl.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.sliew.scaleph.dag.service.impl;
+
+import cn.sliew.milky.common.util.JacksonUtil;
+import cn.sliew.scaleph.dag.service.DagConfigComplexService;
+import cn.sliew.scaleph.dag.service.DagConfigLinkService;
+import cn.sliew.scaleph.dag.service.DagConfigService;
+import cn.sliew.scaleph.dag.service.DagConfigStepService;
+import cn.sliew.scaleph.dag.service.dto.DagConfigComplexDTO;
+import cn.sliew.scaleph.dag.service.dto.DagConfigDTO;
+import cn.sliew.scaleph.dag.service.dto.DagConfigLinkDTO;
+import cn.sliew.scaleph.dag.service.dto.DagConfigStepDTO;
+import cn.sliew.scaleph.dag.service.param.DagConfigSimpleAddParam;
+import cn.sliew.scaleph.dag.service.param.DagConfigSimpleUpdateParam;
+import cn.sliew.scaleph.dag.service.vo.DagGraphVO;
+import cn.sliew.scaleph.dag.service.vo.EdgeCellVO;
+import cn.sliew.scaleph.dag.service.vo.NodeCellVO;
+import org.springframework.beans.BeanUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+@Service
+public class DagConfigComplexServiceImpl implements DagConfigComplexService {
+
+ @Autowired
+ private DagConfigService dagConfigService;
+ @Autowired
+ private DagConfigLinkService dagConfigLinkService;
+ @Autowired
+ private DagConfigStepService dagConfigStepService;
+
+ @Override
+ public DagConfigComplexDTO selectOne(Long dagId) {
+ DagConfigComplexDTO dagConfigComplexDTO = new DagConfigComplexDTO();
+ DagConfigDTO configDTO = dagConfigService.selectOne(dagId);
+ BeanUtils.copyProperties(configDTO, dagConfigComplexDTO);
+ dagConfigComplexDTO.setLinks(dagConfigLinkService.listLinks(dagId));
+ dagConfigComplexDTO.setSteps(dagConfigStepService.listSteps(dagId));
+ return dagConfigComplexDTO;
+ }
+
+ @Override
+ public DagConfigDTO selectSimpleOne(Long dagId) {
+ return dagConfigService.selectOne(dagId);
+ }
+
+ @Override
+ public Long insert(DagConfigSimpleAddParam param) {
+ DagConfigDTO configDTO = new DagConfigDTO();
+ BeanUtils.copyProperties(param, configDTO);
+ return dagConfigService.insert(configDTO);
+ }
+
+ @Override
+ public int update(DagConfigSimpleUpdateParam param) {
+ DagConfigDTO configDTO = new DagConfigDTO();
+ BeanUtils.copyProperties(param, configDTO);
+ return dagConfigService.update(configDTO);
+ }
+
+ @Override
+ public void replace(Long dagId, DagGraphVO graph) {
+ saveSteps(dagId, graph.getNodes());
+ saveLinks(dagId, graph.getEdges());
+ }
+
+ private void saveSteps(Long dagId, List nodes) {
+ List stepIds = nodes.stream().map(NodeCellVO::getId)
+ .collect(Collectors.toList());
+ dagConfigStepService.deleteSurplusSteps(dagId, stepIds);
+ for (NodeCellVO node : nodes) {
+ DagConfigStepDTO stepDTO = new DagConfigStepDTO();
+ stepDTO.setDagId(dagId);
+ stepDTO.setStepId(node.getId());
+ if (node.getData() != null) {
+ stepDTO.setStepName(node.getData().getLabel());
+ stepDTO.setStepMeta(JacksonUtil.toJsonNode(node.getData().getMeta()));
+ stepDTO.setStepAttrs(JacksonUtil.toJsonNode(node.getData().getAttrs()));
+ }
+ stepDTO.setPositionX(node.getPosition().getX());
+ stepDTO.setPositionY(node.getPosition().getY());
+ dagConfigStepService.upsert(stepDTO);
+ }
+ }
+
+ private void saveLinks(Long jobId, List edges) {
+ List linkIds = edges.stream().map(EdgeCellVO::getId)
+ .collect(Collectors.toList());
+ dagConfigLinkService.deleteSurplusLinks(jobId, linkIds);
+ for (EdgeCellVO edge : edges) {
+ DagConfigLinkDTO linkDTO = new DagConfigLinkDTO();
+ linkDTO.setDagId(jobId);
+ linkDTO.setLinkId(edge.getId());
+ if (edge.getData() != null) {
+ linkDTO.setLinkName(edge.getData().getLabel());
+ linkDTO.setLinkMeta(JacksonUtil.toJsonNode(edge.getData().getMeta()));
+ linkDTO.setLinkAttrs(JacksonUtil.toJsonNode(edge.getData().getAttrs()));
+ }
+ linkDTO.setFromStepId(edge.getSource().getCell());
+ linkDTO.setToStepId(edge.getTarget().getCell());
+ dagConfigLinkService.upsert(linkDTO);
+ }
+ }
+
+ @Override
+ public Long clone(Long dagId) {
+ Long cloneDagId = dagConfigService.clone(dagId);
+ dagConfigStepService.clone(dagId, cloneDagId);
+ dagConfigLinkService.clone(dagId, cloneDagId);
+ return cloneDagId;
+ }
+
+ @Override
+ public int delete(Long dagId) {
+ dagConfigStepService.deleteByDag(dagId);
+ dagConfigLinkService.deleteByDag(dagId);
+ return dagConfigService.delete(dagId);
+ }
+
+ @Override
+ public int deleteBatch(List dagIds) {
+ dagConfigStepService.deleteByDag(dagIds);
+ dagConfigLinkService.deleteByDag(dagIds);
+ return dagConfigService.deleteBatch(dagIds);
+ }
+}
diff --git a/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/impl/DagConfigLinkServiceImpl.java b/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/impl/DagConfigLinkServiceImpl.java
new file mode 100644
index 000000000..84695b2d7
--- /dev/null
+++ b/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/impl/DagConfigLinkServiceImpl.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.sliew.scaleph.dag.service.impl;
+
+import cn.sliew.scaleph.dag.service.DagConfigLinkService;
+import cn.sliew.scaleph.dag.service.convert.DagConfigLinkConvert;
+import cn.sliew.scaleph.dag.service.dto.DagConfigLinkDTO;
+import cn.sliew.scaleph.dao.entity.master.dag.DagConfigLink;
+import cn.sliew.scaleph.dao.mapper.master.dag.DagConfigLinkMapper;
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
+import com.baomidou.mybatisplus.core.toolkit.Wrappers;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import org.springframework.util.CollectionUtils;
+
+import java.util.List;
+
+@Service
+public class DagConfigLinkServiceImpl implements DagConfigLinkService {
+
+ @Autowired
+ private DagConfigLinkMapper dagConfigLinkMapper;
+
+ @Override
+ public List listLinks(Long dagId) {
+ LambdaQueryWrapper queryWrapper = Wrappers.lambdaQuery(DagConfigLink.class)
+ .eq(DagConfigLink::getDagId, dagId);
+ List dagConfigLinks = dagConfigLinkMapper.selectList(queryWrapper);
+ return DagConfigLinkConvert.INSTANCE.toDto(dagConfigLinks);
+ }
+
+ @Override
+ public int insert(DagConfigLinkDTO linkDTO) {
+ DagConfigLink record = DagConfigLinkConvert.INSTANCE.toDo(linkDTO);
+ return dagConfigLinkMapper.insert(record);
+ }
+
+ @Override
+ public int update(DagConfigLinkDTO linkDTO) {
+ LambdaUpdateWrapper updateWrapper = Wrappers.lambdaUpdate(DagConfigLink.class)
+ .eq(DagConfigLink::getDagId, linkDTO.getDagId())
+ .eq(DagConfigLink::getLinkId, linkDTO.getLinkId());
+ DagConfigLink record = DagConfigLinkConvert.INSTANCE.toDo(linkDTO);
+ return dagConfigLinkMapper.update(record, updateWrapper);
+ }
+
+ @Override
+ public int upsert(DagConfigLinkDTO linkDTO) {
+ LambdaQueryWrapper queryWrapper = Wrappers.lambdaQuery(DagConfigLink.class)
+ .eq(DagConfigLink::getDagId, linkDTO.getDagId())
+ .eq(DagConfigLink::getLinkId, linkDTO.getLinkId());
+ if (dagConfigLinkMapper.exists(queryWrapper)) {
+ return update(linkDTO);
+ } else {
+ return insert(linkDTO);
+ }
+ }
+
+ @Override
+ public int deleteByDag(Long dagId) {
+ LambdaUpdateWrapper updateWrapper = Wrappers.lambdaUpdate(DagConfigLink.class)
+ .eq(DagConfigLink::getDagId, dagId);
+ return dagConfigLinkMapper.delete(updateWrapper);
+ }
+
+ @Override
+ public int deleteByDag(List dagIds) {
+ LambdaUpdateWrapper updateWrapper = Wrappers.lambdaUpdate(DagConfigLink.class)
+ .in(DagConfigLink::getDagId, dagIds);
+ return dagConfigLinkMapper.delete(updateWrapper);
+ }
+
+ @Override
+ public int deleteSurplusLinks(Long dagId, List linkIds) {
+ LambdaUpdateWrapper updateWrapper = Wrappers.lambdaUpdate(DagConfigLink.class)
+ .eq(DagConfigLink::getDagId, dagId)
+ .notIn(CollectionUtils.isEmpty(linkIds) == false, DagConfigLink::getLinkId, linkIds);
+ return dagConfigLinkMapper.delete(updateWrapper);
+ }
+
+ @Override
+ public int clone(Long sourceDagId, Long targetDagId) {
+ List sourceLinks = listLinks(sourceDagId);
+ sourceLinks.stream().forEach(linkDTO -> {
+ linkDTO.setDagId(targetDagId);
+ linkDTO.setId(null);
+ linkDTO.setCreator(null);
+ linkDTO.setCreateTime(null);
+ linkDTO.setEditor(null);
+ linkDTO.setUpdateTime(null);
+ insert(linkDTO);
+ });
+ return sourceLinks.size();
+ }
+}
diff --git a/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/impl/DagConfigServiceImpl.java b/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/impl/DagConfigServiceImpl.java
new file mode 100644
index 000000000..67d83721e
--- /dev/null
+++ b/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/impl/DagConfigServiceImpl.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.sliew.scaleph.dag.service.impl;
+
+import cn.sliew.scaleph.common.util.UUIDUtil;
+import cn.sliew.scaleph.dag.service.DagConfigService;
+import cn.sliew.scaleph.dag.service.convert.DagConfigConvert;
+import cn.sliew.scaleph.dag.service.dto.DagConfigDTO;
+import cn.sliew.scaleph.dao.entity.master.dag.DagConfig;
+import cn.sliew.scaleph.dao.mapper.master.dag.DagConfigMapper;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+
+@Service
+public class DagConfigServiceImpl implements DagConfigService {
+
+ @Autowired
+ private DagConfigMapper dagConfigMapper;
+
+ @Override
+ public DagConfigDTO selectOne(Long dagId) {
+ DagConfig record = dagConfigMapper.selectById(dagId);
+ return DagConfigConvert.INSTANCE.toDto(record);
+ }
+
+ @Override
+ public Long insert(DagConfigDTO configDTO) {
+ DagConfig record = DagConfigConvert.INSTANCE.toDo(configDTO);
+ record.setConfigId(UUIDUtil.randomUUId());
+ record.setVersion(0);
+ dagConfigMapper.insert(record);
+ return record.getId();
+ }
+
+ @Override
+ public int update(DagConfigDTO configDTO) {
+ DagConfig record = DagConfigConvert.INSTANCE.toDo(configDTO);
+ return dagConfigMapper.updateById(record);
+ }
+
+ @Override
+ public void upsert(DagConfigDTO configDTO) {
+ if (configDTO.getId() != null) {
+ update(configDTO);
+ } else {
+ insert(configDTO);
+ }
+ }
+
+ @Override
+ public int delete(Long dagId) {
+ return dagConfigMapper.deleteById(dagId);
+ }
+
+ @Override
+ public int deleteBatch(List dagIds) {
+ return dagConfigMapper.deleteBatchIds(dagIds);
+ }
+
+ @Override
+ public Long clone(Long dagId) {
+ DagConfigDTO configDTO = selectOne(dagId);
+ configDTO.setId(null);
+ configDTO.setCreator(null);
+ configDTO.setCreateTime(null);
+ configDTO.setEditor(null);
+ configDTO.setUpdateTime(null);
+ configDTO.setName(configDTO.getName() + "_copy_" + UUIDUtil.randomUUId());
+ return insert(configDTO);
+ }
+}
diff --git a/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/impl/DagConfigStepServiceImpl.java b/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/impl/DagConfigStepServiceImpl.java
new file mode 100644
index 000000000..d80593981
--- /dev/null
+++ b/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/impl/DagConfigStepServiceImpl.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.sliew.scaleph.dag.service.impl;
+
+import cn.sliew.scaleph.dag.service.DagConfigStepService;
+import cn.sliew.scaleph.dag.service.convert.DagConfigStepConvert;
+import cn.sliew.scaleph.dag.service.dto.DagConfigStepDTO;
+import cn.sliew.scaleph.dao.entity.master.dag.DagConfigStep;
+import cn.sliew.scaleph.dao.mapper.master.dag.DagConfigStepMapper;
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
+import com.baomidou.mybatisplus.core.toolkit.Wrappers;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import org.springframework.util.CollectionUtils;
+
+import java.util.List;
+
+import static cn.sliew.milky.common.check.Ensures.checkState;
+
+@Service
+public class DagConfigStepServiceImpl implements DagConfigStepService {
+
+ @Autowired
+ private DagConfigStepMapper dagConfigStepMapper;
+
+ @Override
+ public List listSteps(Long dagId) {
+ LambdaQueryWrapper queryWrapper = Wrappers.lambdaQuery(DagConfigStep.class)
+ .eq(DagConfigStep::getDagId, dagId);
+ List dagConfigSteps = dagConfigStepMapper.selectList(queryWrapper);
+ return DagConfigStepConvert.INSTANCE.toDto(dagConfigSteps);
+ }
+
+ @Override
+ public DagConfigStepDTO selectOne(Long stepId) {
+ DagConfigStep record = dagConfigStepMapper.selectById(stepId);
+ checkState(record != null, () -> "dag config step not exists for id: " + stepId);
+ return DagConfigStepConvert.INSTANCE.toDto(record);
+ }
+
+ @Override
+ public int insert(DagConfigStepDTO stepDTO) {
+ DagConfigStep record = DagConfigStepConvert.INSTANCE.toDo(stepDTO);
+ return dagConfigStepMapper.insert(record);
+ }
+
+ @Override
+ public int update(DagConfigStepDTO stepDTO) {
+ LambdaUpdateWrapper updateWrapper = Wrappers.lambdaUpdate(DagConfigStep.class)
+ .eq(DagConfigStep::getDagId, stepDTO.getDagId())
+ .eq(DagConfigStep::getStepId, stepDTO.getStepId());
+ DagConfigStep record = DagConfigStepConvert.INSTANCE.toDo(stepDTO);
+ return dagConfigStepMapper.update(record, updateWrapper);
+ }
+
+ @Override
+ public int upsert(DagConfigStepDTO stepDTO) {
+ LambdaQueryWrapper queryWrapper = Wrappers.lambdaQuery(DagConfigStep.class)
+ .eq(DagConfigStep::getDagId, stepDTO.getDagId())
+ .eq(DagConfigStep::getStepId, stepDTO.getStepId());
+ if (dagConfigStepMapper.exists(queryWrapper)) {
+ return update(stepDTO);
+ } else {
+ return insert(stepDTO);
+ }
+ }
+
+ @Override
+ public int deleteByDag(Long dagId) {
+ LambdaUpdateWrapper updateWrapper = Wrappers.lambdaUpdate(DagConfigStep.class)
+ .eq(DagConfigStep::getDagId, dagId);
+ return dagConfigStepMapper.delete(updateWrapper);
+ }
+
+ @Override
+ public int deleteByDag(List dagIds) {
+ LambdaUpdateWrapper updateWrapper = Wrappers.lambdaUpdate(DagConfigStep.class)
+ .in(DagConfigStep::getDagId, dagIds);
+ return dagConfigStepMapper.delete(updateWrapper);
+ }
+
+ @Override
+ public int deleteSurplusSteps(Long dagId, List stepIds) {
+ LambdaUpdateWrapper updateWrapper = Wrappers.lambdaUpdate(DagConfigStep.class)
+ .eq(DagConfigStep::getDagId, dagId)
+ .notIn(CollectionUtils.isEmpty(stepIds) == false, DagConfigStep::getStepId, stepIds);
+ return dagConfigStepMapper.delete(updateWrapper);
+ }
+
+ @Override
+ public int clone(Long sourceDagId, Long targetDagId) {
+ List sourceSteps = listSteps(sourceDagId);
+ sourceSteps.forEach(stepDTO -> {
+ stepDTO.setDagId(targetDagId);
+ stepDTO.setId(null);
+ stepDTO.setCreator(null);
+ stepDTO.setCreateTime(null);
+ stepDTO.setEditor(null);
+ stepDTO.setUpdateTime(null);
+ insert(stepDTO);
+ });
+ return sourceSteps.size();
+ }
+}
diff --git a/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/param/DagConfigSimpleAddParam.java b/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/param/DagConfigSimpleAddParam.java
new file mode 100644
index 000000000..209b180d2
--- /dev/null
+++ b/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/param/DagConfigSimpleAddParam.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.sliew.scaleph.dag.service.param;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import io.swagger.v3.oas.annotations.media.Schema;
+import lombok.Data;
+
+import javax.validation.constraints.NotNull;
+
+@Data
+public class DagConfigSimpleAddParam {
+
+ @NotNull
+ @Schema(description = "DAG 类型")
+ private String type;
+
+ @Schema(description = "DAG名称")
+ private String name;
+
+ @Schema(description = "DAG元信息")
+ private JsonNode dagMeta;
+
+ @Schema(description = "DAG属性")
+ private JsonNode dagAttrs;
+
+ @Schema(description = "备注")
+ private String remark;
+}
diff --git a/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/param/DagConfigSimpleUpdateParam.java b/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/param/DagConfigSimpleUpdateParam.java
new file mode 100644
index 000000000..2fca78cb4
--- /dev/null
+++ b/scaleph-support/scaleph-dag/src/main/java/cn/sliew/scaleph/dag/service/param/DagConfigSimpleUpdateParam.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.sliew.scaleph.dag.service.param;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import io.swagger.v3.oas.annotations.media.Schema;
+import lombok.Data;
+
+import javax.validation.constraints.NotNull;
+
+@Data
+public class DagConfigSimpleUpdateParam {
+
+ @NotNull
+ @Schema(description = "id")
+ private Long id;
+
+ @Schema(description = "DAG名称")
+ private String name;
+
+ @Schema(description = "DAG元信息")
+ private JsonNode dagMeta;
+
+ @Schema(description = "DAG属性")
+ private JsonNode dagAttrs;
+
+ @Schema(description = "备注")
+ private String remark;
+}
diff --git a/scaleph-support/scaleph-generator/src/main/java/cn/sliew/scaleph/generator/MybatisPlusGenerator.java b/scaleph-support/scaleph-generator/src/main/java/cn/sliew/scaleph/generator/MybatisPlusGenerator.java
index 24c59e5fa..e6681be5c 100644
--- a/scaleph-support/scaleph-generator/src/main/java/cn/sliew/scaleph/generator/MybatisPlusGenerator.java
+++ b/scaleph-support/scaleph-generator/src/main/java/cn/sliew/scaleph/generator/MybatisPlusGenerator.java
@@ -54,7 +54,7 @@ public class MybatisPlusGenerator {
/**
* just add table names here and run the {@link #main(String[])} method.
*/
- private static final String[] TABLES = {"ws_artifact_seatunnel"};
+ private static final String[] TABLES = {"dag_config", "dag_config_step", "dag_config_link"};
public static void main(String[] args) {
//自动生成配置
diff --git a/scaleph-support/scaleph-queue/src/main/java/cn/sliew/scaleph/queue/util/FuryUtil.java b/scaleph-support/scaleph-queue/src/main/java/cn/sliew/scaleph/queue/util/FuryUtil.java
index 6f034b835..d46cd072e 100644
--- a/scaleph-support/scaleph-queue/src/main/java/cn/sliew/scaleph/queue/util/FuryUtil.java
+++ b/scaleph-support/scaleph-queue/src/main/java/cn/sliew/scaleph/queue/util/FuryUtil.java
@@ -21,6 +21,8 @@
import io.fury.Fury;
import io.fury.config.Language;
+import java.io.*;
+
public enum FuryUtil {
;
@@ -33,7 +35,27 @@ public static byte[] serialize(Object obj) {
return fury.serialize(obj);
}
+ public static byte[] serializeByJava(Object obj) {
+ try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(bos)) {
+ oos.writeObject(obj);
+ return bos.toByteArray();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
public static Object deserialize(byte[] bytes) {
return fury.deserialize(bytes);
}
+
+ public static Object deserializeByJava(byte[] bytes) {
+ try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+ ObjectInputStream ois = new ObjectInputStream(bis)) {
+ return ois.readObject();
+
+ } catch (IOException | ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ }
}
diff --git a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/engine/action/Action.java b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/engine/action/Action.java
index 684e8545b..b465440ce 100644
--- a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/engine/action/Action.java
+++ b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/engine/action/Action.java
@@ -20,11 +20,16 @@
import cn.sliew.milky.common.constant.AttributeKey;
import cn.sliew.milky.common.filter.ActionListener;
+import cn.sliew.scaleph.common.dict.workflow.WorkflowTaskType;
import java.util.List;
public interface Action {
+ default WorkflowTaskType getType() {
+ return WorkflowTaskType.JAVA;
+ }
+
String getName();
List getInputs();
@@ -32,4 +37,12 @@ public interface Action {
List getOutputs();
void execute(ActionContext context, ActionListener listener);
+
+ default void onTimeout(ActionContext context, ActionListener listener) {
+
+ }
+
+ default void onTerminate(ActionContext context, ActionListener listener) {
+
+ }
}
diff --git a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/engine/action/ActionExecutionInterceptor.java b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/engine/action/ActionExecutionInterceptor.java
new file mode 100644
index 000000000..478893275
--- /dev/null
+++ b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/engine/action/ActionExecutionInterceptor.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.sliew.scaleph.workflow.engine.action;
+
+public interface ActionExecutionInterceptor {
+
+ default ActionResult beforeExecution(Action action) {
+ return null;
+ }
+
+ default ActionResult afterExecution(Action action, ActionResult actionResult) {
+ return actionResult;
+ }
+
+ default void finallyAfterExecution(Action action, ActionResult actionResult, Throwable throwable) {
+ }
+}
diff --git a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/engine/action/ActionExecutionListener.java b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/engine/action/ActionExecutionListener.java
new file mode 100644
index 000000000..3c2b389b8
--- /dev/null
+++ b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/engine/action/ActionExecutionListener.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.sliew.scaleph.workflow.engine.action;
+
+public interface ActionExecutionListener {
+
+ default void onTaskStartup(Action action) {
+ }
+
+ default void onTaskEnd(Action action) {
+ }
+
+ default void onTaskFailure(Action action, Throwable throwable) {
+ }
+}
diff --git a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/engine/action/RetryableAction.java b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/engine/action/RetryableAction.java
new file mode 100644
index 000000000..76f5451cc
--- /dev/null
+++ b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/engine/action/RetryableAction.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.sliew.scaleph.workflow.engine.action;
+
+import java.time.Duration;
+
+public interface RetryableAction extends Action {
+
+ Duration getBackoffPeriod();
+
+ Integer getRetryTimes();
+
+ Integer getMaxRetryTimes();
+}
diff --git a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/model/Output.java b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/engine/workflow/composite/DagWorkflow.java
similarity index 82%
rename from scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/model/Output.java
rename to scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/engine/workflow/composite/DagWorkflow.java
index d464b86fa..68427d862 100644
--- a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/model/Output.java
+++ b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/engine/workflow/composite/DagWorkflow.java
@@ -16,8 +16,10 @@
* limitations under the License.
*/
-package cn.sliew.scaleph.workflow.model;
+package cn.sliew.scaleph.workflow.engine.workflow.composite;
-public interface Output {
+import cn.sliew.scaleph.workflow.engine.workflow.WorkFlow;
+
+public interface DagWorkflow extends WorkFlow {
}
diff --git a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/model/Input.java b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/engine/workflow/composite/GroupWorkFlow.java
similarity index 80%
rename from scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/model/Input.java
rename to scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/engine/workflow/composite/GroupWorkFlow.java
index b648b249b..6cef6b808 100644
--- a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/model/Input.java
+++ b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/engine/workflow/composite/GroupWorkFlow.java
@@ -16,15 +16,11 @@
* limitations under the License.
*/
-package cn.sliew.scaleph.workflow.model;
+package cn.sliew.scaleph.workflow.engine.workflow.composite;
-public interface Input {
+import cn.sliew.scaleph.workflow.engine.workflow.WorkFlow;
- String getName();
+public interface GroupWorkFlow extends WorkFlow {
- String getDescription();
-
- Object getDefaults();
-
- Boolean isRequired();
+ String getGroup();
}
diff --git a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/engine/workflow/composite/SubWorkFlow.java b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/engine/workflow/composite/SubWorkFlow.java
new file mode 100644
index 000000000..84329c5b7
--- /dev/null
+++ b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/engine/workflow/composite/SubWorkFlow.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.sliew.scaleph.workflow.engine.workflow.composite;
+
+import cn.sliew.scaleph.workflow.engine.workflow.WorkFlow;
+
+public interface SubWorkFlow extends WorkFlow {
+
+ WorkFlow getParent();
+}
diff --git a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/taskinstance/WorkflowTaskInstanceEventDTO.java b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/taskinstance/WorkflowTaskInstanceEventDTO.java
index 001f62f0a..193f608c3 100644
--- a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/taskinstance/WorkflowTaskInstanceEventDTO.java
+++ b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/taskinstance/WorkflowTaskInstanceEventDTO.java
@@ -22,16 +22,18 @@
import cn.sliew.scaleph.common.dict.workflow.WorkflowTaskInstanceStage;
import lombok.Getter;
-import java.util.Optional;
+import java.io.Serializable;
@Getter
-public class WorkflowTaskInstanceEventDTO {
+public class WorkflowTaskInstanceEventDTO implements Serializable {
+
+ private static final long serialVersionUID = 1L;
private final WorkflowTaskInstanceStage state;
private final WorkflowTaskInstanceStage nextState;
private final WorkflowTaskInstanceEvent event;
private final Long workflowTaskInstanceId;
- private final Optional throwable;
+ private final Throwable throwable;
public WorkflowTaskInstanceEventDTO(WorkflowTaskInstanceStage state, WorkflowTaskInstanceStage nextState, WorkflowTaskInstanceEvent event, Long workflowTaskInstanceId) {
this(state, nextState, event, workflowTaskInstanceId, null);
@@ -42,6 +44,6 @@ public WorkflowTaskInstanceEventDTO(WorkflowTaskInstanceStage state, WorkflowTas
this.nextState = nextState;
this.event = event;
this.workflowTaskInstanceId = workflowTaskInstanceId;
- this.throwable = Optional.ofNullable(throwable);
+ this.throwable = throwable;
}
}
diff --git a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/taskinstance/WorkflowTaskInstanceEventListener.java b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/taskinstance/WorkflowTaskInstanceEventListener.java
index f81629b11..9850d239d 100644
--- a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/taskinstance/WorkflowTaskInstanceEventListener.java
+++ b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/taskinstance/WorkflowTaskInstanceEventListener.java
@@ -27,8 +27,11 @@ public interface WorkflowTaskInstanceEventListener extends MessageHandler {
@Override
default void handler(Message message) throws Exception {
if (message.getBody() != null) {
- WorkflowTaskInstanceEventDTO eventDTO = (WorkflowTaskInstanceEventDTO) FuryUtil.deserialize(message.getBody());
- onEvent(eventDTO);
+ Object deserialized = FuryUtil.deserializeByJava(message.getBody());
+ if (deserialized instanceof WorkflowTaskInstanceEventDTO) {
+ WorkflowTaskInstanceEventDTO eventDTO = (WorkflowTaskInstanceEventDTO)deserialized;
+ onEvent(eventDTO);
+ }
}
}
diff --git a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/taskinstance/WorkflowTaskInstanceFailureEventListener.java b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/taskinstance/WorkflowTaskInstanceFailureEventListener.java
index 61c2fe271..774a07145 100644
--- a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/taskinstance/WorkflowTaskInstanceFailureEventListener.java
+++ b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/taskinstance/WorkflowTaskInstanceFailureEventListener.java
@@ -41,9 +41,9 @@ private class FailureRunner implements Runnable, Serializable {
private Long workflowTaskInstanceId;
private Optional throwable;
- public FailureRunner(Long workflowTaskInstanceId, Optional throwable) {
+ public FailureRunner(Long workflowTaskInstanceId, Throwable throwable) {
this.workflowTaskInstanceId = workflowTaskInstanceId;
- this.throwable = throwable;
+ this.throwable = Optional.ofNullable(throwable);
}
@Override
diff --git a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/workflowinstance/WorkflowInstanceEventDTO.java b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/workflowinstance/WorkflowInstanceEventDTO.java
index d23dbe342..14288cb26 100644
--- a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/workflowinstance/WorkflowInstanceEventDTO.java
+++ b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/workflowinstance/WorkflowInstanceEventDTO.java
@@ -22,16 +22,18 @@
import cn.sliew.scaleph.common.dict.workflow.WorkflowInstanceState;
import lombok.Getter;
-import java.util.Optional;
+import java.io.Serializable;
@Getter
-public class WorkflowInstanceEventDTO {
+public class WorkflowInstanceEventDTO implements Serializable {
+
+ private static final long serialVersionUID = 1L;
private final WorkflowInstanceState state;
private final WorkflowInstanceState nextState;
private final WorkflowInstanceEvent event;
private final Long workflowInstanceId;
- private final Optional throwable;
+ private final Throwable throwable;
public WorkflowInstanceEventDTO(WorkflowInstanceState state, WorkflowInstanceState nextState, WorkflowInstanceEvent event, Long workflowInstanceId) {
this(state, nextState, event, workflowInstanceId, null);
@@ -42,6 +44,6 @@ public WorkflowInstanceEventDTO(WorkflowInstanceState state, WorkflowInstanceSta
this.nextState = nextState;
this.event = event;
this.workflowInstanceId = workflowInstanceId;
- this.throwable = Optional.ofNullable(throwable);
+ this.throwable = throwable;
}
}
diff --git a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/workflowinstance/WorkflowInstanceEventListener.java b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/workflowinstance/WorkflowInstanceEventListener.java
index 233838e9a..fe87da42f 100644
--- a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/workflowinstance/WorkflowInstanceEventListener.java
+++ b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/workflowinstance/WorkflowInstanceEventListener.java
@@ -27,8 +27,11 @@ public interface WorkflowInstanceEventListener extends MessageHandler {
@Override
default void handler(Message message) throws Exception {
if (message.getBody() != null) {
- WorkflowInstanceEventDTO eventDTO = (WorkflowInstanceEventDTO) FuryUtil.deserialize(message.getBody());
- onEvent(eventDTO);
+ Object deserialized = FuryUtil.deserializeByJava(message.getBody());
+ if (deserialized instanceof WorkflowInstanceEventDTO) {
+ WorkflowInstanceEventDTO eventDTO = (WorkflowInstanceEventDTO) deserialized;
+ onEvent(eventDTO);
+ }
}
}
diff --git a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/workflowinstance/WorkflowInstanceFailureEventListener.java b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/workflowinstance/WorkflowInstanceFailureEventListener.java
index 0961ec24d..1f3b227dc 100644
--- a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/workflowinstance/WorkflowInstanceFailureEventListener.java
+++ b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/listener/workflowinstance/WorkflowInstanceFailureEventListener.java
@@ -45,9 +45,9 @@ private class FailureRunner implements Runnable, Serializable {
private Long workflowInstanceId;
private Optional throwable;
- public FailureRunner(Long workflowInstanceId, Optional throwable) {
+ public FailureRunner(Long workflowInstanceId, Throwable throwable) {
this.workflowInstanceId = workflowInstanceId;
- this.throwable = throwable;
+ this.throwable = Optional.ofNullable(throwable);
}
@Override
diff --git a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/service/WorkflowDagService.java b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/service/WorkflowDagService.java
index de41c6561..5fc86c36e 100644
--- a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/service/WorkflowDagService.java
+++ b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/service/WorkflowDagService.java
@@ -18,25 +18,22 @@
package cn.sliew.scaleph.workflow.service;
-import cn.sliew.scaleph.dag.service.dto.DagDTO;
-import cn.sliew.scaleph.dag.service.dto.DagStepDTO;
+import cn.sliew.scaleph.dag.service.dto.DagConfigComplexDTO;
+import cn.sliew.scaleph.dag.service.dto.DagConfigStepDTO;
import cn.sliew.scaleph.dag.service.vo.DagGraphVO;
import cn.sliew.scaleph.dag.xflow.dnd.DndDTO;
-import cn.sliew.scaleph.workflow.service.dto.WorkflowDefinitionDTO;
-import cn.sliew.scaleph.workflow.service.param.WorkflowDefinitionListParam;
-import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import java.util.List;
public interface WorkflowDagService {
- Long initialize();
+ Long initialize(String name, String remark);
void destroy(Long dagId);
- DagDTO getDag(Long dagId);
+ DagConfigComplexDTO getDag(Long dagId);
- DagStepDTO getStep(Long stepId);
+ DagConfigStepDTO getStep(Long stepId);
void update(Long dagId, DagGraphVO graph);
diff --git a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/service/convert/WorkflowDefinitionConvert.java b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/service/convert/WorkflowDefinitionConvert.java
index d4435beb6..7e1bb4b08 100644
--- a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/service/convert/WorkflowDefinitionConvert.java
+++ b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/service/convert/WorkflowDefinitionConvert.java
@@ -20,7 +20,7 @@
import cn.sliew.milky.common.util.JacksonUtil;
import cn.sliew.scaleph.common.convert.BaseConvert;
-import cn.sliew.scaleph.dag.service.dto.DagDTO;
+import cn.sliew.scaleph.dag.service.dto.DagConfigComplexDTO;
import cn.sliew.scaleph.dao.entity.master.workflow.WorkflowDefinition;
import cn.sliew.scaleph.workflow.service.dto.WorkflowDefinitionDTO;
import com.fasterxml.jackson.core.type.TypeReference;
@@ -55,9 +55,10 @@ default WorkflowDefinitionDTO toDto(WorkflowDefinition entity) {
WorkflowDefinitionDTO dto = new WorkflowDefinitionDTO();
BeanUtils.copyProperties(entity, dto);
if (StringUtils.hasText(entity.getParam())) {
- dto.setParam(JacksonUtil.parseJsonString(entity.getParam(), new TypeReference>() {}));
+ dto.setParam(JacksonUtil.parseJsonString(entity.getParam(), new TypeReference>() {
+ }));
}
- DagDTO dag = new DagDTO();
+ DagConfigComplexDTO dag = new DagConfigComplexDTO();
dag.setId(entity.getDagId());
dto.setDag(dag);
return dto;
diff --git a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/service/convert/WorkflowDefinitionVOConvert.java b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/service/convert/WorkflowDefinitionVOConvert.java
index e2f7e74c9..6dbf82f99 100644
--- a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/service/convert/WorkflowDefinitionVOConvert.java
+++ b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/service/convert/WorkflowDefinitionVOConvert.java
@@ -20,7 +20,7 @@
import cn.sliew.milky.common.util.JacksonUtil;
import cn.sliew.scaleph.common.convert.BaseConvert;
-import cn.sliew.scaleph.dag.service.dto.DagDTO;
+import cn.sliew.scaleph.dag.service.dto.DagConfigComplexDTO;
import cn.sliew.scaleph.dao.entity.master.workflow.WorkflowDefinitionVO;
import cn.sliew.scaleph.dao.entity.master.workflow.WorkflowSchedule;
import cn.sliew.scaleph.workflow.service.dto.WorkflowDefinitionDTO;
@@ -64,7 +64,7 @@ default WorkflowDefinitionDTO toDto(WorkflowDefinitionVO entity) {
}
WorkflowScheduleDTO schedule = WorkflowScheduleConvert.INSTANCE.toDto(entity.getSchedule());
dto.setSchedule(schedule);
- DagDTO dag = new DagDTO();
+ DagConfigComplexDTO dag = new DagConfigComplexDTO();
dag.setId(entity.getDagId());
dto.setDag(dag);
return dto;
diff --git a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/service/convert/WorkflowTaskDefinition2Convert.java b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/service/convert/WorkflowTaskDefinition2Convert.java
index 0e005e827..498da0bb5 100644
--- a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/service/convert/WorkflowTaskDefinition2Convert.java
+++ b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/service/convert/WorkflowTaskDefinition2Convert.java
@@ -20,7 +20,7 @@
import cn.sliew.milky.common.util.JacksonUtil;
import cn.sliew.scaleph.common.convert.BaseConvert;
-import cn.sliew.scaleph.dag.service.dto.DagStepDTO;
+import cn.sliew.scaleph.dag.service.dto.DagConfigStepDTO;
import cn.sliew.scaleph.workflow.service.dto.WorkflowTaskDefinitionAttrs;
import cn.sliew.scaleph.workflow.service.dto.WorkflowTaskDefinitionDTO2;
import cn.sliew.scaleph.workflow.service.dto.WorkflowTaskDefinitionMeta;
@@ -31,12 +31,12 @@
import org.springframework.beans.BeanUtils;
@Mapper(uses = {}, unmappedTargetPolicy = ReportingPolicy.IGNORE)
-public interface WorkflowTaskDefinition2Convert extends BaseConvert {
+public interface WorkflowTaskDefinition2Convert extends BaseConvert {
WorkflowTaskDefinition2Convert INSTANCE = Mappers.getMapper(WorkflowTaskDefinition2Convert.class);
@Override
- default DagStepDTO toDo(WorkflowTaskDefinitionDTO2 dto) {
- DagStepDTO entity = new DagStepDTO();
+ default DagConfigStepDTO toDo(WorkflowTaskDefinitionDTO2 dto) {
+ DagConfigStepDTO entity = new DagConfigStepDTO();
BeanUtils.copyProperties(dto, entity);
ObjectNode stepMeta = (ObjectNode) JacksonUtil.toJsonNode(dto.getStepMeta());
stepMeta.putPOJO("type", dto.getStepMeta().getType().getValue());
@@ -46,7 +46,7 @@ default DagStepDTO toDo(WorkflowTaskDefinitionDTO2 dto) {
}
@Override
- default WorkflowTaskDefinitionDTO2 toDto(DagStepDTO entity) {
+ default WorkflowTaskDefinitionDTO2 toDto(DagConfigStepDTO entity) {
WorkflowTaskDefinitionDTO2 dto = new WorkflowTaskDefinitionDTO2();
BeanUtils.copyProperties(entity, dto);
dto.setStepMeta(JacksonUtil.toObject(entity.getStepMeta(), WorkflowTaskDefinitionMeta.class));
diff --git a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/service/dto/WorkflowDefinitionDTO.java b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/service/dto/WorkflowDefinitionDTO.java
index bd4333fa8..d7c166e86 100644
--- a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/service/dto/WorkflowDefinitionDTO.java
+++ b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/service/dto/WorkflowDefinitionDTO.java
@@ -20,7 +20,7 @@
import cn.sliew.scaleph.common.dict.workflow.WorkflowExecuteType;
import cn.sliew.scaleph.common.dict.workflow.WorkflowType;
-import cn.sliew.scaleph.dag.service.dto.DagDTO;
+import cn.sliew.scaleph.dag.service.dto.DagConfigComplexDTO;
import cn.sliew.scaleph.system.model.BaseDTO;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
@@ -55,6 +55,6 @@ public class WorkflowDefinitionDTO extends BaseDTO {
private WorkflowScheduleDTO schedule;
@Schema(description = "dag")
- private DagDTO dag;
+ private DagConfigComplexDTO dag;
}
diff --git a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/service/impl/WorkflowDagServiceImpl.java b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/service/impl/WorkflowDagServiceImpl.java
index 5b8e8d7ca..0371ea113 100644
--- a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/service/impl/WorkflowDagServiceImpl.java
+++ b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/service/impl/WorkflowDagServiceImpl.java
@@ -18,11 +18,11 @@
package cn.sliew.scaleph.workflow.service.impl;
-import cn.sliew.scaleph.dag.service.DagService;
-import cn.sliew.scaleph.dag.service.DagStepService;
-import cn.sliew.scaleph.dag.service.dto.DagDTO;
-import cn.sliew.scaleph.dag.service.dto.DagStepDTO;
-import cn.sliew.scaleph.dag.service.param.DagSimpleAddParam;
+import cn.sliew.scaleph.dag.service.DagConfigComplexService;
+import cn.sliew.scaleph.dag.service.DagConfigStepService;
+import cn.sliew.scaleph.dag.service.dto.DagConfigComplexDTO;
+import cn.sliew.scaleph.dag.service.dto.DagConfigStepDTO;
+import cn.sliew.scaleph.dag.service.param.DagConfigSimpleAddParam;
import cn.sliew.scaleph.dag.service.vo.DagGraphVO;
import cn.sliew.scaleph.dag.xflow.dnd.DndDTO;
import cn.sliew.scaleph.workflow.service.WorkflowDagService;
@@ -36,33 +36,37 @@
public class WorkflowDagServiceImpl implements WorkflowDagService {
@Autowired
- private DagService dagService;
+ private DagConfigComplexService dagConfigComplexService;
@Autowired
- private DagStepService dagStepService;
+ private DagConfigStepService dagConfigStepService;
@Override
- public Long initialize() {
- return dagService.insert(new DagSimpleAddParam());
+ public Long initialize(String name, String remark) {
+ DagConfigSimpleAddParam param = new DagConfigSimpleAddParam();
+ param.setType("WorkFlow");
+ param.setName(name);
+ param.setRemark(remark);
+ return dagConfigComplexService.insert(param);
}
@Override
public void destroy(Long dagId) {
- dagService.delete(dagId);
+ dagConfigComplexService.delete(dagId);
}
@Override
- public DagDTO getDag(Long dagId) {
- return dagService.selectOne(dagId);
+ public DagConfigComplexDTO getDag(Long dagId) {
+ return dagConfigComplexService.selectOne(dagId);
}
@Override
- public DagStepDTO getStep(Long stepId) {
- return dagStepService.selectOne(stepId);
+ public DagConfigStepDTO getStep(Long stepId) {
+ return dagConfigStepService.selectOne(stepId);
}
@Override
public void update(Long dagId, DagGraphVO graph) {
- dagService.replace(dagId, graph);
+ dagConfigComplexService.replace(dagId, graph);
}
@Override
diff --git a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/service/impl/WorkflowDefinitionServiceImpl.java b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/service/impl/WorkflowDefinitionServiceImpl.java
index 412c14090..0587b8fc9 100644
--- a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/service/impl/WorkflowDefinitionServiceImpl.java
+++ b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/service/impl/WorkflowDefinitionServiceImpl.java
@@ -18,9 +18,9 @@
package cn.sliew.scaleph.workflow.service.impl;
-import cn.sliew.scaleph.dag.service.dto.DagDTO;
-import cn.sliew.scaleph.dag.service.dto.DagLinkDTO;
-import cn.sliew.scaleph.dag.service.dto.DagStepDTO;
+import cn.sliew.scaleph.dag.service.dto.DagConfigComplexDTO;
+import cn.sliew.scaleph.dag.service.dto.DagConfigLinkDTO;
+import cn.sliew.scaleph.dag.service.dto.DagConfigStepDTO;
import cn.sliew.scaleph.dao.entity.master.workflow.WorkflowDefinition;
import cn.sliew.scaleph.dao.entity.master.workflow.WorkflowDefinitionVO;
import cn.sliew.scaleph.dao.mapper.master.workflow.WorkflowDefinitionMapper;
@@ -75,22 +75,23 @@ public WorkflowDefinitionDTO get(Long id) {
@Override
public Graph getDag(Long id) {
WorkflowDefinitionDTO workflowDefinitionDTO = get(id);
- DagDTO dag = workflowDefinitionDTO.getDag();
+ DagConfigComplexDTO dag = workflowDefinitionDTO.getDag();
return buildGraph(id, dag);
}
- private MutableGraph buildGraph(Long id, DagDTO dag) {
+ private MutableGraph buildGraph(Long id, DagConfigComplexDTO dag) {
MutableGraph graph = GraphBuilder.directed().build();
- List steps = dag.getSteps();
- List links = dag.getLinks();
+ List steps = dag.getSteps();
+ List links = dag.getLinks();
if (CollectionUtils.isEmpty(steps)) {
return graph;
}
Map stepMap = new HashMap<>();
- for (DagStepDTO step : steps) {
+ for (DagConfigStepDTO step : steps) {
WorkflowTaskDefinitionDTO2 taskDefinitionDTO = WorkflowTaskDefinition2Convert.INSTANCE.toDto(step);
taskDefinitionDTO.setWorkflowDefinitionId(id);
graph.addNode(taskDefinitionDTO);
+ stepMap.put(taskDefinitionDTO.getStepId(), taskDefinitionDTO);
}
links.forEach(link -> graph.putEdge(stepMap.get(link.getFromStepId()), stepMap.get(link.getToStepId())));
return graph;
@@ -98,7 +99,7 @@ private MutableGraph buildGraph(Long id, DagDTO dag)
@Override
public WorkflowTaskDefinitionDTO2 getTaskDefinition(Long workflowTaskDefinitionId) {
- DagStepDTO step = workflowDagService.getStep(workflowTaskDefinitionId);
+ DagConfigStepDTO step = workflowDagService.getStep(workflowTaskDefinitionId);
return WorkflowTaskDefinition2Convert.INSTANCE.toDto(step);
}
}
diff --git a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/statemachine/WorkflowInstanceStateMachine.java b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/statemachine/WorkflowInstanceStateMachine.java
index 0d7b75463..2e7057256 100644
--- a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/statemachine/WorkflowInstanceStateMachine.java
+++ b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/statemachine/WorkflowInstanceStateMachine.java
@@ -109,7 +109,7 @@ private Action jobName) throws Exception {
WsArtifactSeaTunnelDTO dto = selectOne(id);
ObjectNode conf = JacksonUtil.createObjectNode();
- DagDTO dag = dto.getDag();
+ DagConfigComplexDTO dag = dto.getDag();
// env
buildEnvs(conf, jobName.isPresent() ? jobName.get() : dto.getArtifact().getName(), dag.getDagAttrs());
// source, sink, transform
@@ -166,7 +166,7 @@ public WsArtifactSeaTunnelDTO insert(WsArtifactSeaTunnelAddParam param) {
record.setSeaTunnelEngine(SeaTunnelEngineType.SEATUNNEL);
record.setFlinkVersion(FlinkVersion.V_1_16_3);
record.setSeaTunnelVersion(SeaTunnelVersion.current());
- record.setDagId(seaTunnelDagService.initialize());
+ record.setDagId(seaTunnelDagService.initialize(param.getName(), param.getRemark()));
record.setCurrent(YesOrNo.YES);
wsArtifactSeaTunnelMapper.insert(record);
return selectOne(record.getId());
@@ -240,15 +240,15 @@ private ObjectNode buildEnv(String jobName, JsonNode dagAttrs) {
return env;
}
- private MutableGraph buildGraph(DagDTO dag) throws PluginException {
+ private MutableGraph buildGraph(DagConfigComplexDTO dag) throws PluginException {
MutableGraph graph = GraphBuilder.directed().build();
- List steps = dag.getSteps();
- List links = dag.getLinks();
+ List steps = dag.getSteps();
+ List links = dag.getLinks();
if (CollectionUtils.isEmpty(steps) || CollectionUtils.isEmpty(links)) {
return graph;
}
Map stepMap = new HashMap<>();
- for (DagStepDTO step : steps) {
+ for (DagConfigStepDTO step : steps) {
Properties properties = mergeJobAttrs(step);
SeaTunnelPluginType stepType = SeaTunnelPluginType.of(step.getStepMeta().get("type").asText());
SeaTunnelPluginName stepName = SeaTunnelPluginName.of(step.getStepMeta().get("name").asText());
@@ -264,7 +264,7 @@ private MutableGraph buildGraph(DagDTO dag) throws PluginException {
return graph;
}
- private Properties mergeJobAttrs(DagStepDTO step) throws PluginException {
+ private Properties mergeJobAttrs(DagConfigStepDTO step) throws PluginException {
Properties properties = PropertyUtil.mapToProperties(JacksonUtil.toObject(step.getStepAttrs(), new TypeReference>() {
}));
SeaTunnelPluginType pluginType = SeaTunnelPluginType.of(step.getStepMeta().get("type").asText());
diff --git a/tools/docker/mysql/init.d/scaleph-app-mysql.sql b/tools/docker/mysql/init.d/scaleph-app-mysql.sql
new file mode 100644
index 000000000..955828f61
--- /dev/null
+++ b/tools/docker/mysql/init.d/scaleph-app-mysql.sql
@@ -0,0 +1,18 @@
+create database if not exists scaleph default character set utf8mb4 collate utf8mb4_unicode_ci;
+use scaleph;
+
+drop table if exists ws_app;
+create table ws_app
+(
+ id bigint not null auto_increment comment '自增主键',
+ project_id bigint not null comment '项目id',
+ app_id varchar(64) comment '应用id。主要用于 kubernetes 中 metadata 使用',
+ version varchar(64) comment '版本',
+ remark varchar(256) comment '备注',
+ creator varchar(32) comment '创建人',
+ create_time timestamp not null default current_timestamp comment '创建时间',
+ editor varchar(32) comment '修改人',
+ update_time timestamp not null default current_timestamp on update current_timestamp comment '修改时间',
+ primary key (id),
+ key idx_project (project_id)
+) engine = innodb comment '应用信息';
\ No newline at end of file
diff --git a/tools/docker/mysql/init.d/scaleph-dag-mysql.sql b/tools/docker/mysql/init.d/scaleph-dag-mysql.sql
index bfa7f1f3a..aeaa4de6f 100644
--- a/tools/docker/mysql/init.d/scaleph-dag-mysql.sql
+++ b/tools/docker/mysql/init.d/scaleph-dag-mysql.sql
@@ -1,36 +1,68 @@
create database if not exists scaleph default character set utf8mb4 collate utf8mb4_unicode_ci;
use scaleph;
-drop table if exists dag_instance;
-create table dag_instance
+drop table if exists dag_config;
+create table dag_config
(
- id bigint not null auto_increment comment '自增主键',
- dag_meta varchar(128) comment 'DAG元信息',
- dag_attrs mediumtext comment 'DAG属性',
- creator varchar(32) comment '创建人',
- create_time timestamp default current_timestamp comment '创建时间',
- editor varchar(32) comment '修改人',
- update_time timestamp default current_timestamp on update current_timestamp comment '修改时间',
+ id bigint not null auto_increment comment '自增主键',
+ type varchar(16) not null comment 'DAG 类型',
+ name varchar(128) comment 'DAG名称',
+ config_id varchar(64) not null comment 'DAG ID',
+ dag_meta varchar(128) comment 'DAG元信息',
+ dag_attrs mediumtext comment 'DAG属性',
+ intput_options text comment '输入参数声明',
+ output_options text comment '输出参数声明',
+ version int not null default 0 comment '版本号',
+ remark varchar(255) comment '备注',
+ creator varchar(32) comment '创建人',
+ create_time timestamp default current_timestamp comment '创建时间',
+ editor varchar(32) comment '修改人',
+ update_time timestamp default current_timestamp on update current_timestamp comment '修改时间',
primary key (id)
-) engine = innodb comment 'DAG 实例';
+) engine = innodb comment 'DAG 配置';
-INSERT INTO `dag_instance` (`id`, `dag_meta`, `dag_attrs`, `creator`, `editor`)
-VALUES (1, NULL, NULL, 'sys', 'sys');
-INSERT INTO `dag_instance` (`id`, `dag_meta`, `dag_attrs`, `creator`, `editor`)
-VALUES (2, NULL, NULL, 'sys', 'sys');
-INSERT INTO `dag_instance`(`id`, `dag_meta`, `dag_attrs`, `creator`, `editor`)
-VALUES (3, NULL, NULL, 'sys', 'sys');
-INSERT INTO `dag_instance`(`id`, `dag_meta`, `dag_attrs`, `creator`, `editor`)
-VALUES (4, NULL, NULL, 'sys', 'sys');
-INSERT INTO `dag_instance`(`id`, `dag_meta`, `dag_attrs`, `creator`, `editor`)
-VALUES (5, NULL, NULL, 'sys', 'sys');
-INSERT INTO `dag_instance`(`id`, `dag_meta`, `dag_attrs`, `creator`, `editor`)
-VALUES (6, NULL, NULL, 'sys', 'sys');
-INSERT INTO `dag_instance`(`id`, `dag_meta`, `dag_attrs`, `creator`, `editor`)
-VALUES (7, NULL, NULL, 'sys', 'sys');
+INSERT INTO `dag_config` (`id`, `type`, `name`, `config_id`, `dag_meta`, `dag_attrs`, `remark`, `creator`, `editor`)
+VALUES (1, 'SeaTunnel', 'e_commerce', 'bsag8e409f8c81a64edc8f0e1b27d6a010cb', NULL, NULL, NULL, 'sys', 'sys');
+INSERT INTO `dag_config` (`id`, `type`, `name`, `config_id`, `dag_meta`, `dag_attrs`, `remark`, `creator`, `editor`)
+VALUES (2, 'SeaTunnel', 'fake', 'ewykdb10bd1e437346369e027a437473d483', NULL, NULL, NULL, 'sys', 'sys');
+INSERT INTO `dag_config`(`id`, `type`, `name`, `config_id`, `dag_meta`, `dag_attrs`, `remark`, `creator`, `editor`)
+VALUES (3, 'Flink-CDC', 'flink-cdc-example', 'nlly3ab39bb296a34c5888dd6509ffe588e4', NULL, NULL, NULL, 'sys', 'sys');
+INSERT INTO `dag_config`(`id`, `type`, `name`, `config_id`, `dag_meta`, `dag_attrs`, `remark`, `creator`, `editor`)
+VALUES (4, 'WorkFlow', 'FlinkSessionClusterStatusSyncJob', 'rnsp52fdd5edd77044a9acc0c2f24c42d760', NULL, NULL, NULL,
+ 'sys', 'sys');
+INSERT INTO `dag_config`(`id`, `type`, `name`, `config_id`, `dag_meta`, `dag_attrs`, `remark`, `creator`, `editor`)
+VALUES (5, 'WorkFlow', 'FlinkJobStatusSyncJob', 'kvqfebc60efa8def410ebfe30f70fd8f1768', NULL, NULL, NULL, 'sys', 'sys');
+INSERT INTO `dag_config`(`id`, `type`, `name`, `config_id`, `dag_meta`, `dag_attrs`, `remark`, `creator`, `editor`)
+VALUES (6, 'WorkFlow', 'DorisOperatorInstanceStatusSyncJob', 'kepa00f4fdb5e8794cbb931067244caf5ef2', NULL, NULL, NULL,
+ 'sys', 'sys');
+INSERT INTO `dag_config`(`id`, `type`, `name`, `config_id`, `dag_meta`, `dag_attrs`, `remark`, `creator`, `editor`)
+VALUES (7, 'WorkFlow', 'FlinkJobStatusSyncJob2', 'fssxbe099903bf174c11bf64b0d486383784', NULL, NULL, NULL, 'sys',
+ 'sys');
-drop table if exists dag_step;
-create table dag_step
+drop table if exists dag_config_history;
+create table dag_config_history
+(
+ id bigint not null auto_increment comment '自增主键',
+ dag_config_id bigint not null comment 'DAG 配置ID',
+ type varchar(16) not null comment 'DAG 类型',
+ name varchar(128) comment 'DAG名称',
+ config_id varchar(64) not null comment 'DAG ID',
+ dag_meta varchar(128) comment 'DAG元信息',
+ dag_attrs mediumtext comment 'DAG属性',
+ intput_options text comment '输入参数声明',
+ output_options text comment '输出参数声明',
+ version int not null default 0 comment '版本号',
+ remark varchar(255) comment '备注',
+ creator varchar(32) comment '创建人',
+ create_time timestamp default current_timestamp comment '创建时间',
+ editor varchar(32) comment '修改人',
+ update_time timestamp default current_timestamp on update current_timestamp comment '修改时间',
+ primary key (id),
+ key idx_dag_config (`dag_config_id`)
+) engine = innodb comment 'DAG 配置历史。使用 javers 管理';
+
+drop table if exists dag_config_step;
+create table dag_config_step
(
id bigint not null auto_increment comment '自增主键',
dag_id bigint not null comment 'DAG id',
@@ -38,6 +70,8 @@ create table dag_step
step_name varchar(128) comment '步骤名称',
position_x int not null comment 'x坐标',
position_y int not null comment 'y坐标',
+ shape varchar(64),
+ style text,
step_meta varchar(128) comment '步骤元信息',
step_attrs mediumtext comment '步骤属性',
creator varchar(32) comment '创建人',
@@ -46,53 +80,68 @@ create table dag_step
update_time timestamp default current_timestamp on update current_timestamp comment '修改时间',
primary key (id),
unique key uniq_step (dag_id, step_id)
-) engine = innodb comment 'DAG 步骤';
+) engine = innodb comment 'DAG 配置步骤';
-INSERT INTO `dag_step` (`id`, `dag_id`, `step_id`, `step_name`, `position_x`, `position_y`, `step_meta`, `step_attrs`,
- `creator`, `editor`)
-VALUES (1, 1, '157f118c-9b6c-4d18-a919-fce824676696', 'Jdbc Source', 520, 150,
+INSERT INTO `dag_config_step` (`id`, `dag_id`, `step_id`, `step_name`, `position_x`, `position_y`, `shape`, `style`,
+ `step_meta`, `step_attrs`, `creator`, `editor`)
+VALUES (1, 1, '157f118c-9b6c-4d18-a919-fce824676696', 'Jdbc Source', 520, 150, NULL, NULL,
'{\"name\":\"Jdbc\",\"type\":\"source\",\"engine\":\"seatunnel\"}',
'{\"stepTitle\":\"Jdbc Source\",\"dataSourceType\":\"MySQL\",\"dataSource\":1,\"fetch_size\":0,\"query\":\"select * from sample_data_e_commerce\"}',
'sys', 'sys');
-INSERT INTO `dag_step` (`id`, `dag_id`, `step_id`, `step_name`, `position_x`, `position_y`, `step_meta`, `step_attrs`,
- `creator`, `editor`)
-VALUES (2, 1, 'e69dbf5a-76ad-47be-aa16-175b733a7df2', 'Jdbc Sink', 460, 400,
+INSERT INTO `dag_config_step` (`id`, `dag_id`, `step_id`, `step_name`, `position_x`, `position_y`, `shape`, `style`,
+ `step_meta`, `step_attrs`, `creator`, `editor`)
+VALUES (2, 1, 'e69dbf5a-76ad-47be-aa16-175b733a7df2', 'Jdbc Sink', 460, 400, NULL, NULL,
'{\"name\":\"Jdbc\",\"type\":\"sink\",\"engine\":\"seatunnel\"}',
'{\"stepTitle\":\"Jdbc Sink\",\"dataSourceType\":\"MySQL\",\"dataSource\":1,\"generate_sink_sql\":false,\"batch_size\":300,\"max_retries\":3,\"is_exactly_once\":false,\"query\":\"insert into sample_data_e_commerce_duplicate \\\\n( id, invoice_no, stock_code, description, quantity, invoice_date, unit_price, customer_id, country )\\\\nvalues (?,?,?,?,?,?,?,?,?)\",\"primary_keys\":\"[]\"}',
'sys', 'sys');
-INSERT INTO `dag_step` (`id`, `dag_id`, `step_id`, `step_name`, `position_x`, `position_y`, `step_meta`, `step_attrs`,
- `creator`, `editor`)
-VALUES (3, 2, '6223c6c3-b552-4c69-adab-5300b7514fad', 'Fake Source', 380, 140,
+INSERT INTO `dag_config_step` (`id`, `dag_id`, `step_id`, `step_name`, `position_x`, `position_y`, `shape`, `style`,
+ `step_meta`, `step_attrs`, `creator`, `editor`)
+VALUES (3, 2, '6223c6c3-b552-4c69-adab-5300b7514fad', 'Fake Source', 380, 140, NULL, NULL,
'{"name":"FakeSource","type":"source","engine":"seatunnel"}',
'{"stepTitle":"Fake Source","fields":[{"field":"c_string","type":"string"},{"field":"c_boolean","type":"boolean"},{"field":"c_tinyint","type":"tinyint"},{"field":"c_smallint","type":"smallint"},{"field":"c_int","type":"int"},{"field":"c_bigint","type":"bigint"},{"field":"c_float","type":"float"},{"field":"c_double","type":"double"},{"field":"c_decimal","type":"decimal(30, 8)"},{"field":"c_bytes","type":"bytes"},{"field":"c_map","type":"map"},{"field":"c_date","type":"date"},{"field":"c_time","type":"time"},{"field":"c_timestamp","type":"timestamp"}],"schema":"{\\\"fields\\\":{\\\"c_string\\\":\\\"string\\\",\\\"c_boolean\\\":\\\"boolean\\\",\\\"c_tinyint\\\":\\\"tinyint\\\",\\\"c_smallint\\\":\\\"smallint\\\",\\\"c_int\\\":\\\"int\\\",\\\"c_bigint\\\":\\\"bigint\\\",\\\"c_float\\\":\\\"float\\\",\\\"c_double\\\":\\\"double\\\",\\\"c_decimal\\\":\\\"decimal(30, 8)\\\",\\\"c_bytes\\\":\\\"bytes\\\",\\\"c_map\\\":\\\"map\\\",\\\"c_date\\\":\\\"date\\\",\\\"c_time\\\":\\\"time\\\",\\\"c_timestamp\\\":\\\"timestamp\\\"}}"}',
'sys', 'sys');
-INSERT INTO `dag_step` (`id`, `dag_id`, `step_id`, `step_name`, `position_x`, `position_y`, `step_meta`, `step_attrs`,
- `creator`, `editor`)
-VALUES (4, 2, 'f08143b4-34dc-4190-8723-e8d8ce49738f', 'Console Sink', 360, 290,
+INSERT INTO `dag_config_step` (`id`, `dag_id`, `step_id`, `step_name`, `position_x`, `position_y`, `shape`, `style`,
+ `step_meta`, `step_attrs`, `creator`, `editor`)
+VALUES (4, 2, 'f08143b4-34dc-4190-8723-e8d8ce49738f', 'Console Sink', 360, 290, NULL, NULL,
'{"name":"Console","type":"sink","engine":"seatunnel"}', '{"stepTitle":"Console Sink"}', 'sys', 'sys');
-INSERT INTO `dag_step` (`id`, `dag_id`, `step_id`, `step_name`, `position_x`, `position_y`, `step_meta`, `step_attrs`,
- `creator`, `editor`)
-VALUES (7, 4, '7f7ced76-7771-4870-91d9-435ef1c4e623', 'FlinkSessionClusterStatus', 460, 400,
+INSERT INTO `dag_config_step` (`id`, `dag_id`, `step_id`, `step_name`, `position_x`, `position_y`, `shape`, `style`,
+ `step_meta`, `step_attrs`, `creator`, `editor`)
+VALUES (7, 4, '7f7ced76-7771-4870-91d9-435ef1c4e623', 'FlinkSessionClusterStatus', 460, 400, NULL, NULL,
'{\"handler\":\"cn.sliew.scaleph.engine.flink.kubernetes.action.FlinkSessionClusterStatusSyncJob\",\"type\":\"1\"}',
NULL, 'sys', 'sys');
-INSERT INTO `dag_step` (`id`, `dag_id`, `step_id`, `step_name`, `position_x`, `position_y`, `step_meta`, `step_attrs`,
- `creator`, `editor`)
-VALUES (8, 5, '5d5d67c5-ade3-4005-a0db-d514bf11616d', 'FlinkJobStatus', 460, 400,
+INSERT INTO `dag_config_step` (`id`, `dag_id`, `step_id`, `step_name`, `position_x`, `position_y`, `shape`, `style`,
+ `step_meta`, `step_attrs`, `creator`, `editor`)
+VALUES (8, 5, '5d5d67c5-ade3-4005-a0db-d514bf11616d', 'FlinkJobStatus', 460, 400, NULL, NULL,
'{\"handler\":\"cn.sliew.scaleph.engine.flink.kubernetes.action.FlinkJobStatusSyncJob\",\"type\":\"1\"}', NULL,
'sys', 'sys');
-INSERT INTO `dag_step` (`id`, `dag_id`, `step_id`, `step_name`, `position_x`, `position_y`, `step_meta`, `step_attrs`,
- `creator`, `editor`)
-VALUES (9, 6, '8c7b171c-f232-4b96-b842-5f4fbef34bc1', 'DorisOperatorInstanceStatus', 460, 400,
+INSERT INTO `dag_config_step` (`id`, `dag_id`, `step_id`, `step_name`, `position_x`, `position_y`, `shape`, `style`,
+ `step_meta`, `step_attrs`, `creator`, `editor`)
+VALUES (9, 6, '8c7b171c-f232-4b96-b842-5f4fbef34bc1', 'DorisOperatorInstanceStatus', 460, 400, NULL, NULL,
'{\"handler\":\"cn.sliew.scaleph.engine.doris.action.DorisOperatorInstanceStatusSyncJob\",\"type\":\"1\"}',
NULL, 'sys', 'sys');
-INSERT INTO `dag_step` (`id`, `dag_id`, `step_id`, `step_name`, `position_x`, `position_y`, `step_meta`, `step_attrs`,
- `creator`, `editor`)
-VALUES (10, 7, 'cae1a622-6c96-4cec-81d3-883510c17702', 'FlinkJobStatus', 460, 400,
- '{\"handler\":\"cn.sliew.scaleph.engine.flink.kubernetes.action.FlinkJobStatusSyncJob2\",\"type\":\"1\"}', NULL,
- 'sys', 'sys');
+INSERT INTO `dag_config_step` (`id`, `dag_id`, `step_id`, `step_name`, `position_x`, `position_y`, `shape`, `style`,
+ `step_meta`, `step_attrs`, `creator`, `editor`)
+VALUES (10, 7, 'cae1a622-6c96-4cec-81d3-883510c17702', 'FlinkJobStatus-1', 460, 400, NULL, NULL,
+ '{\"handler\":\"cn.sliew.scaleph.engine.flink.kubernetes.action.FlinkJobStatusSyncJobStepOne\",\"type\":\"1\"}',
+ NULL, 'sys', 'sys');
+INSERT INTO `dag_config_step` (`id`, `dag_id`, `step_id`, `step_name`, `position_x`, `position_y`, `shape`, `style`,
+ `step_meta`, `step_attrs`, `creator`, `editor`)
+VALUES (11, 7, '2c2cb6c8-794b-4cc1-8258-cd1898912744', 'FlinkJobStatus-2', 460, 400, NULL, NULL,
+ '{\"handler\":\"cn.sliew.scaleph.engine.flink.kubernetes.action.FlinkJobStatusSyncJobStepTwo\",\"type\":\"1\"}',
+ NULL, 'sys', 'sys');
+INSERT INTO `dag_config_step` (`id`, `dag_id`, `step_id`, `step_name`, `position_x`, `position_y`, `shape`, `style`,
+ `step_meta`, `step_attrs`, `creator`, `editor`)
+VALUES (12, 7, 'd82a947b-f414-4273-973a-06f20fe33f0d', 'FlinkJobStatus-3-1', 460, 400, NULL, NULL,
+ '{\"handler\":\"cn.sliew.scaleph.engine.flink.kubernetes.action.FlinkJobStatusSyncJobStepThreeOne\",\"type\":\"1\"}',
+ NULL, 'sys', 'sys');
+INSERT INTO `dag_config_step` (`id`, `dag_id`, `step_id`, `step_name`, `position_x`, `position_y`, `shape`, `style`,
+ `step_meta`, `step_attrs`, `creator`, `editor`)
+VALUES (13, 7, '027db10b-9150-403d-9d11-e4a36c99e1db', 'FlinkJobStatus-3-2', 460, 400, NULL, NULL,
+ '{\"handler\":\"cn.sliew.scaleph.engine.flink.kubernetes.action.FlinkJobStatusSyncJobStepThreeTwo\",\"type\":\"1\"}',
+ NULL, 'sys', 'sys');
-drop table if exists dag_link;
-create table dag_link
+drop table if exists dag_config_link;
+create table dag_config_link
(
id bigint not null auto_increment comment '自增主键',
dag_id bigint not null comment 'DAG id',
@@ -100,6 +149,8 @@ create table dag_link
link_name varchar(128) comment '连线名称',
from_step_id varchar(36) not null comment '源步骤id',
to_step_id varchar(36) not null comment '目标步骤id',
+ shape varchar(64),
+ style text,
link_meta varchar(128) comment '连线元信息',
link_attrs mediumtext comment '连线属性',
creator varchar(32) comment '创建人',
@@ -108,13 +159,76 @@ create table dag_link
update_time timestamp default current_timestamp on update current_timestamp comment '修改时间',
primary key (id),
unique key uniq_link (dag_id, link_id)
-) engine = innodb comment 'DAG 连线';
+) engine = innodb comment 'DAG 配置连线';
-INSERT INTO `dag_link` (`id`, `dag_id`, `link_id`, `link_name`, `from_step_id`, `to_step_id`, `link_meta`, `link_attrs`,
- `creator`, `editor`)
+INSERT INTO `dag_config_link` (`id`, `dag_id`, `link_id`, `link_name`, `from_step_id`, `to_step_id`, `shape`, `style`,
+ `link_meta`, `link_attrs`, `creator`, `editor`)
VALUES (1, 1, '78ca5c31-0eaa-4d43-8f30-0d8f7d0ec317', NULL, '157f118c-9b6c-4d18-a919-fce824676696',
- 'e69dbf5a-76ad-47be-aa16-175b733a7df2', NULL, NULL, 'sys', 'sys');
-INSERT INTO `dag_link` (`id`, `dag_id`, `link_id`, `link_name`, `from_step_id`, `to_step_id`, `link_meta`, `link_attrs`,
- `creator`, `editor`)
+ 'e69dbf5a-76ad-47be-aa16-175b733a7df2', NULL, NULL, NULL, NULL, 'sys', 'sys');
+INSERT INTO `dag_config_link` (`id`, `dag_id`, `link_id`, `link_name`, `from_step_id`, `to_step_id`, `shape`, `style`,
+ `link_meta`, `link_attrs`, `creator`, `editor`)
VALUES (2, 2, 'd57021a1-65c7-4dfe-ae89-3b73d00fcf72', NULL, '6223c6c3-b552-4c69-adab-5300b7514fad',
- 'f08143b4-34dc-4190-8723-e8d8ce49738f', NULL, NULL, 'sys', 'sys');
\ No newline at end of file
+ 'f08143b4-34dc-4190-8723-e8d8ce49738f', NULL, NULL, NULL, NULL, 'sys', 'sys');
+INSERT INTO `dag_config_link` (`id`, `dag_id`, `link_id`, `link_name`, `from_step_id`, `to_step_id`, `shape`, `style`,
+ `link_meta`, `link_attrs`, `creator`, `editor`)
+VALUES (4, 7, '2d172e1a-ef92-431c-9889-7461bccae7a5', NULL, 'cae1a622-6c96-4cec-81d3-883510c17702',
+ '2c2cb6c8-794b-4cc1-8258-cd1898912744', NULL, NULL, NULL, NULL, 'sys', 'sys');
+INSERT INTO `dag_config_link` (`id`, `dag_id`, `link_id`, `link_name`, `from_step_id`, `to_step_id`, `shape`, `style`,
+ `link_meta`, `link_attrs`, `creator`, `editor`)
+VALUES (5, 7, 'af16c8ee-0abf-4555-aa0e-98ec01964ce1', NULL, '2c2cb6c8-794b-4cc1-8258-cd1898912744',
+ 'd82a947b-f414-4273-973a-06f20fe33f0d', NULL, NULL, NULL, NULL, 'sys', 'sys');
+INSERT INTO `dag_config_link` (`id`, `dag_id`, `link_id`, `link_name`, `from_step_id`, `to_step_id`, `shape`, `style`,
+ `link_meta`, `link_attrs`, `creator`, `editor`)
+VALUES (6, 7, '027db10b-9150-403d-9d11-e4a36c99e1db', NULL, '2c2cb6c8-794b-4cc1-8258-cd1898912744',
+ '027db10b-9150-403d-9d11-e4a36c99e1db', NULL, NULL, NULL, NULL, 'sys', 'sys');
+
+drop table if exists dag_instance;
+create table dag_instance
+(
+ id bigint not null auto_increment comment '自增主键',
+ dag_meta varchar(128) comment 'DAG元信息',
+ dag_attrs mediumtext comment 'DAG属性',
+ creator varchar(32) comment '创建人',
+ create_time timestamp default current_timestamp comment '创建时间',
+ editor varchar(32) comment '修改人',
+ update_time timestamp default current_timestamp on update current_timestamp comment '修改时间',
+ primary key (id)
+) engine = innodb comment 'DAG 实例';
+
+drop table if exists dag_step;
+create table dag_step
+(
+ id bigint not null auto_increment comment '自增主键',
+ dag_id bigint not null comment 'DAG id',
+ step_id varchar(36) not null comment '步骤id',
+ step_name varchar(128) comment '步骤名称',
+ position_x int not null comment 'x坐标',
+ position_y int not null comment 'y坐标',
+ step_meta varchar(128) comment '步骤元信息',
+ step_attrs mediumtext comment '步骤属性',
+ creator varchar(32) comment '创建人',
+ create_time timestamp default current_timestamp comment '创建时间',
+ editor varchar(32) comment '修改人',
+ update_time timestamp default current_timestamp on update current_timestamp comment '修改时间',
+ primary key (id),
+ unique key uniq_step (dag_id, step_id)
+) engine = innodb comment 'DAG 步骤';
+
+drop table if exists dag_link;
+create table dag_link
+(
+ id bigint not null auto_increment comment '自增主键',
+ dag_id bigint not null comment 'DAG id',
+ link_id varchar(36) not null comment '连线id',
+ link_name varchar(128) comment '连线名称',
+ from_step_id varchar(36) not null comment '源步骤id',
+ to_step_id varchar(36) not null comment '目标步骤id',
+ link_meta varchar(128) comment '连线元信息',
+ link_attrs mediumtext comment '连线属性',
+ creator varchar(32) comment '创建人',
+ create_time timestamp default current_timestamp comment '创建时间',
+ editor varchar(32) comment '修改人',
+ update_time timestamp default current_timestamp on update current_timestamp comment '修改时间',
+ primary key (id),
+ unique key uniq_link (dag_id, link_id)
+) engine = innodb comment 'DAG 连线';
\ No newline at end of file