From b7d8c97d65af575b71efe6755eb67b0bb9126f01 Mon Sep 17 00:00:00 2001 From: Giulio Frasca Date: Tue, 3 Dec 2024 18:38:00 -0500 Subject: [PATCH] feat(backend): Add Parallelism Limit to ParallelFor tasks. Fixes #8718 (#10798) * feat(backend): Add Parallelism Limit to ParallelFor tasks Signed-off-by: Giulio Frasca * feat(backend): Add intermediate Template for iterator Tasks in ArgoCompiler Signed-off-by: Giulio Frasca * test: Add argoCompiler test case to validate individual parallel-limited tasks Signed-off-by: Giulio Frasca * test: Update tests for ParallelFor loop update Signed-off-by: Giulio Frasca * fix(backend): Fix broken dependantTasks in ParallelFor Signed-off-by: Giulio Frasca * fix(backend): pass correct ParentDagID to iterator DAG - Passthrough ParentDagID rather than DriverExecutionID to iterator such that iteration item correctly detects dependentTasks. - Remove depends from iterator DAG as it is already handled by root-level task - Update Iterator template names/nomenclature for clarity - Update tests accordingly Signed-off-by: Giulio Frasca * fix(backend): Remove DAG Driver from Iterator abstraction template - Removes the Driver pod from the Iterator abstraction-layer template as it confuses MLMD and is purley an Argo implementation - Drivers still used on the Component and Iteration-item templates Signed-off-by: Giulio Frasca --------- Signed-off-by: Giulio Frasca --- .../src/v2/compiler/argocompiler/argo_test.go | 5 + backend/src/v2/compiler/argocompiler/dag.go | 61 +- .../testdata/multiple_parallel_loops.yaml | 555 ++++++++++++++++++ .../testdata/multiple_parallel_loops.json | 284 +++++++++ 4 files changed, 901 insertions(+), 4 deletions(-) create mode 100755 backend/src/v2/compiler/argocompiler/testdata/multiple_parallel_loops.yaml create mode 100644 backend/src/v2/compiler/testdata/multiple_parallel_loops.json diff --git a/backend/src/v2/compiler/argocompiler/argo_test.go b/backend/src/v2/compiler/argocompiler/argo_test.go index 6c92e54574c..cd802b5f38b 100644 --- a/backend/src/v2/compiler/argocompiler/argo_test.go +++ b/backend/src/v2/compiler/argocompiler/argo_test.go @@ -47,6 +47,11 @@ func Test_argo_compiler(t *testing.T) { platformSpecPath: "", argoYAMLPath: "testdata/importer.yaml", }, + { + jobPath: "../testdata/multiple_parallel_loops.json", + platformSpecPath: "", + argoYAMLPath: "testdata/multiple_parallel_loops.yaml", + }, { jobPath: "../testdata/create_mount_delete_dynamic_pvc.json", platformSpecPath: "../testdata/create_mount_delete_dynamic_pvc_platform.json", diff --git a/backend/src/v2/compiler/argocompiler/dag.go b/backend/src/v2/compiler/argocompiler/dag.go index 719a166a9a3..7c997ee61d4 100644 --- a/backend/src/v2/compiler/argocompiler/dag.go +++ b/backend/src/v2/compiler/argocompiler/dag.go @@ -265,10 +265,62 @@ func (c *workflowCompiler) iteratorTask(name string, task *pipelinespec.Pipeline } }() componentName := task.GetComponentRef().GetName() + // Set up Loop Control Template + iteratorTasks, err := c.iterationItemTask("iteration", task, taskJson, parentDagID) + if err != nil { + return nil, err + } + loopTmpl := &wfapi.Template{ + Inputs: wfapi.Inputs{ + Parameters: []wfapi.Parameter{ + {Name: paramParentDagID}, + }, + }, + DAG: &wfapi.DAGTemplate{ + Tasks: iteratorTasks, + }, + } + parallelismLimit := int64(task.GetIteratorPolicy().GetParallelismLimit()) + if parallelismLimit > 0 { + loopTmpl.Parallelism = ¶llelismLimit + } + + loopTmplName, err := c.addTemplate(loopTmpl, fmt.Sprintf("%s-%s-iterator", componentName, name)) + if err != nil { + return nil, err + } + + tasks = []wfapi.DAGTask{ + { + Name: name + "-loop", + Template: loopTmplName, + Depends: depends(task.GetDependentTasks()), + Arguments: wfapi.Arguments{ + Parameters: []wfapi.Parameter{ + { + Name: paramParentDagID, + Value: wfapi.AnyStringPtr(parentDagID), + }, + }, + }, + }, + } + return tasks, nil +} + +func (c *workflowCompiler) iterationItemTask(name string, task *pipelinespec.PipelineTaskSpec, taskJson string, parentDagID string) (tasks []wfapi.DAGTask, err error) { + defer func() { + if err != nil { + err = fmt.Errorf("iterationItem task: %w", err) + } + }() + componentName := task.GetComponentRef().GetName() componentSpecPlaceholder, err := c.useComponentSpec(componentName) if err != nil { return nil, err } + + // Set up Iteration (Single Task) Template driverArgoName := name + "-driver" driverInputs := dagDriverInputs{ component: componentSpecPlaceholder, @@ -279,10 +331,10 @@ func (c *workflowCompiler) iteratorTask(name string, task *pipelinespec.Pipeline if err != nil { return nil, err } - driver.Depends = depends(task.GetDependentTasks()) + iterationCount := intstr.FromString(driverOutputs.iterationCount) iterationTasks, err := c.task( - "iteration", + "iteration-item", task, taskInputs{ parentDagID: inputParameter(paramParentDagID), @@ -311,7 +363,8 @@ func (c *workflowCompiler) iteratorTask(name string, task *pipelinespec.Pipeline if task.GetTriggerPolicy().GetCondition() != "" { when = driverOutputs.condition + " != false" } - tasks = []wfapi.DAGTask{ + + iteratorTasks := []wfapi.DAGTask{ *driver, { Name: name + "-iterations", @@ -330,7 +383,7 @@ func (c *workflowCompiler) iteratorTask(name string, task *pipelinespec.Pipeline WithSequence: &wfapi.Sequence{Count: &iterationCount}, }, } - return tasks, nil + return iteratorTasks, nil } type dagDriverOutputs struct { diff --git a/backend/src/v2/compiler/argocompiler/testdata/multiple_parallel_loops.yaml b/backend/src/v2/compiler/argocompiler/testdata/multiple_parallel_loops.yaml new file mode 100755 index 00000000000..738762f8077 --- /dev/null +++ b/backend/src/v2/compiler/argocompiler/testdata/multiple_parallel_loops.yaml @@ -0,0 +1,555 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: my-pipeline- +spec: + arguments: + parameters: + - name: components-e7a1060777c9ef84e36a4d54f25d3102abbcbd62d4c8bbb5883c3a2cbcfb5c6d + value: '{"executorLabel":"exec-print-op","inputDefinitions":{"parameters":{"s":{"parameterType":"STRING"}}}}' + - name: implementations-e7a1060777c9ef84e36a4d54f25d3102abbcbd62d4c8bbb5883c3a2cbcfb5c6d + value: '{"args":["--executor_input","{{$}}","--function_to_execute","print_op"],"command":["sh","-c","\nif + ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m + ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 + python3 -m pip install --quiet --no-warn-script-location ''kfp==2.7.0'' ''--no-deps'' + ''typing-extensions\u003e=3.7.4,\u003c5; python_version\u003c\"3.9\"'' \u0026\u0026 + \"$0\" \"$@\"\n","sh","-ec","program_path=$(mktemp -d)\n\nprintf \"%s\" \"$0\" + \u003e \"$program_path/ephemeral_component.py\"\n_KFP_RUNTIME=true python3 -m + kfp.dsl.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n","\nimport + kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import *\n\ndef + print_op(s: str):\n print(s)\n\n"],"image":"python:3.7"}' + - name: components-comp-for-loop-2 + value: '{"dag":{"tasks":{"print-op":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-print-op"},"inputs":{"parameters":{"s":{"componentInputParameter":"pipelinechannel--loop-item-param-1","parameterExpressionSelector":"parseJson(string_value)[\"A_a\"]"}}},"taskInfo":{"name":"print-op"}},"print-op-2":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-print-op-2"},"inputs":{"parameters":{"s":{"componentInputParameter":"pipelinechannel--loop-item-param-1","parameterExpressionSelector":"parseJson(string_value)[\"B_b\"]"}}},"taskInfo":{"name":"print-op-2"}}}},"inputDefinitions":{"parameters":{"pipelinechannel--loop-item-param-1":{"parameterType":"STRUCT"}}}}' + - name: components-comp-for-loop-4 + value: '{"dag":{"tasks":{"print-op-3":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-print-op-3"},"inputs":{"parameters":{"s":{"componentInputParameter":"pipelinechannel--loop-item-param-3","parameterExpressionSelector":"parseJson(string_value)[\"A_a\"]"}}},"taskInfo":{"name":"print-op-3"}},"print-op-4":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-print-op-4"},"inputs":{"parameters":{"s":{"componentInputParameter":"pipelinechannel--loop-item-param-3","parameterExpressionSelector":"parseJson(string_value)[\"B_b\"]"}}},"taskInfo":{"name":"print-op-4"}}}},"inputDefinitions":{"parameters":{"pipelinechannel--loop-item-param-3":{"parameterType":"STRUCT"}}}}' + - name: components-root + value: '{"dag":{"tasks":{"for-loop-2":{"componentRef":{"name":"comp-for-loop-2"},"iteratorPolicy":{"parallelismLimit":2},"parameterIterator":{"itemInput":"pipelinechannel--loop-item-param-1","items":{"raw":"[{\"A_a\": + \"1\", \"B_b\": \"10\"}, {\"A_a\": \"2\", \"B_b\": \"20\"}, {\"A_a\": \"3\", + \"B_b\": \"30\"}, {\"A_a\": \"4\", \"B_b\": \"40\"}, {\"A_a\": \"5\", \"B_b\": + \"50\"}, {\"A_a\": \"6\", \"B_b\": \"60\"}, {\"A_a\": \"7\", \"B_b\": \"70\"}, + {\"A_a\": \"8\", \"B_b\": \"80\"}, {\"A_a\": \"9\", \"B_b\": \"90\"}, {\"A_a\": + \"10\", \"B_b\": \"100\"}]"}},"taskInfo":{"name":"foo"}},"for-loop-4":{"componentRef":{"name":"comp-for-loop-4"},"iteratorPolicy":{"parallelismLimit":4},"parameterIterator":{"itemInput":"pipelinechannel--loop-item-param-3","items":{"raw":"[{\"A_a\": + \"1\", \"B_b\": \"10\"}, {\"A_a\": \"2\", \"B_b\": \"20\"}, {\"A_a\": \"3\", + \"B_b\": \"30\"}, {\"A_a\": \"4\", \"B_b\": \"40\"}, {\"A_a\": \"5\", \"B_b\": + \"50\"}, {\"A_a\": \"6\", \"B_b\": \"60\"}, {\"A_a\": \"7\", \"B_b\": \"70\"}, + {\"A_a\": \"8\", \"B_b\": \"80\"}, {\"A_a\": \"9\", \"B_b\": \"90\"}, {\"A_a\": + \"10\", \"B_b\": \"100\"}]"}},"taskInfo":{"name":"bar"}}}}}' + entrypoint: entrypoint + podMetadata: + annotations: + pipelines.kubeflow.org/v2_component: "true" + labels: + pipelines.kubeflow.org/v2_component: "true" + serviceAccountName: pipeline-runner + templates: + - container: + args: + - --type + - CONTAINER + - --pipeline_name + - my-pipeline + - --run_id + - '{{workflow.uid}}' + - --dag_execution_id + - '{{inputs.parameters.parent-dag-id}}' + - --component + - '{{inputs.parameters.component}}' + - --task + - '{{inputs.parameters.task}}' + - --container + - '{{inputs.parameters.container}}' + - --iteration_index + - '{{inputs.parameters.iteration-index}}' + - --cached_decision_path + - '{{outputs.parameters.cached-decision.path}}' + - --pod_spec_patch_path + - '{{outputs.parameters.pod-spec-patch.path}}' + - --condition_path + - '{{outputs.parameters.condition.path}}' + - --kubernetes_config + - '{{inputs.parameters.kubernetes-config}}' + command: + - driver + image: gcr.io/ml-pipeline/kfp-driver + name: "" + resources: + limits: + cpu: 500m + memory: 512Mi + requests: + cpu: 100m + memory: 64Mi + inputs: + parameters: + - name: component + - name: task + - name: container + - name: parent-dag-id + - default: "-1" + name: iteration-index + - default: "" + name: kubernetes-config + metadata: {} + name: system-container-driver + outputs: + parameters: + - name: pod-spec-patch + valueFrom: + default: "" + path: /tmp/outputs/pod-spec-patch + - default: "false" + name: cached-decision + valueFrom: + default: "false" + path: /tmp/outputs/cached-decision + - name: condition + valueFrom: + default: "true" + path: /tmp/outputs/condition + - dag: + tasks: + - arguments: + parameters: + - name: pod-spec-patch + value: '{{inputs.parameters.pod-spec-patch}}' + name: executor + template: system-container-impl + when: '{{inputs.parameters.cached-decision}} != true' + inputs: + parameters: + - name: pod-spec-patch + - default: "false" + name: cached-decision + metadata: {} + name: system-container-executor + outputs: {} + - container: + command: + - should-be-overridden-during-runtime + env: + - name: KFP_POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: KFP_POD_UID + valueFrom: + fieldRef: + fieldPath: metadata.uid + envFrom: + - configMapRef: + name: metadata-grpc-configmap + optional: true + image: gcr.io/ml-pipeline/should-be-overridden-during-runtime + name: "" + resources: {} + volumeMounts: + - mountPath: /kfp-launcher + name: kfp-launcher + - mountPath: /gcs + name: gcs-scratch + + - mountPath: /s3 + name: s3-scratch + - mountPath: /minio + name: minio-scratch + - mountPath: /.local + name: dot-local-scratch + - mountPath: /.cache + name: dot-cache-scratch + - mountPath: /.config + name: dot-config-scratch + initContainers: + - command: + - launcher-v2 + - --copy + - /kfp-launcher/launch + image: gcr.io/ml-pipeline/kfp-launcher + name: kfp-launcher + resources: + limits: + cpu: 500m + memory: 128Mi + requests: + cpu: 100m + volumeMounts: + - mountPath: /kfp-launcher + name: kfp-launcher + inputs: + parameters: + - name: pod-spec-patch + metadata: {} + name: system-container-impl + outputs: {} + podSpecPatch: '{{inputs.parameters.pod-spec-patch}}' + volumes: + - emptyDir: {} + name: kfp-launcher + - emptyDir: {} + name: gcs-scratch + - emptyDir: {} + name: s3-scratch + - emptyDir: {} + name: minio-scratch + - emptyDir: {} + name: dot-local-scratch + - emptyDir: {} + name: dot-cache-scratch + - emptyDir: {} + name: dot-config-scratch + - dag: + tasks: + - arguments: + parameters: + - name: component + value: '{{workflow.parameters.components-e7a1060777c9ef84e36a4d54f25d3102abbcbd62d4c8bbb5883c3a2cbcfb5c6d}}' + - name: task + value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-print-op"},"inputs":{"parameters":{"s":{"componentInputParameter":"pipelinechannel--loop-item-param-1","parameterExpressionSelector":"parseJson(string_value)[\"A_a\"]"}}},"taskInfo":{"name":"print-op"}}' + - name: container + value: '{{workflow.parameters.implementations-e7a1060777c9ef84e36a4d54f25d3102abbcbd62d4c8bbb5883c3a2cbcfb5c6d}}' + - name: parent-dag-id + value: '{{inputs.parameters.parent-dag-id}}' + name: print-op-driver + template: system-container-driver + - arguments: + parameters: + - name: pod-spec-patch + value: '{{tasks.print-op-driver.outputs.parameters.pod-spec-patch}}' + - default: "false" + name: cached-decision + value: '{{tasks.print-op-driver.outputs.parameters.cached-decision}}' + depends: print-op-driver.Succeeded + name: print-op + template: system-container-executor + - arguments: + parameters: + - name: component + value: '{{workflow.parameters.components-e7a1060777c9ef84e36a4d54f25d3102abbcbd62d4c8bbb5883c3a2cbcfb5c6d}}' + - name: task + value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-print-op-2"},"inputs":{"parameters":{"s":{"componentInputParameter":"pipelinechannel--loop-item-param-1","parameterExpressionSelector":"parseJson(string_value)[\"B_b\"]"}}},"taskInfo":{"name":"print-op-2"}}' + - name: container + value: '{{workflow.parameters.implementations-e7a1060777c9ef84e36a4d54f25d3102abbcbd62d4c8bbb5883c3a2cbcfb5c6d}}' + - name: parent-dag-id + value: '{{inputs.parameters.parent-dag-id}}' + name: print-op-2-driver + template: system-container-driver + - arguments: + parameters: + - name: pod-spec-patch + value: '{{tasks.print-op-2-driver.outputs.parameters.pod-spec-patch}}' + - default: "false" + name: cached-decision + value: '{{tasks.print-op-2-driver.outputs.parameters.cached-decision}}' + depends: print-op-2-driver.Succeeded + name: print-op-2 + template: system-container-executor + inputs: + parameters: + - name: parent-dag-id + metadata: {} + name: comp-for-loop-2 + outputs: {} + - dag: + tasks: + - arguments: + parameters: + - name: component + value: '{{workflow.parameters.components-e7a1060777c9ef84e36a4d54f25d3102abbcbd62d4c8bbb5883c3a2cbcfb5c6d}}' + - name: task + value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-print-op-3"},"inputs":{"parameters":{"s":{"componentInputParameter":"pipelinechannel--loop-item-param-3","parameterExpressionSelector":"parseJson(string_value)[\"A_a\"]"}}},"taskInfo":{"name":"print-op-3"}}' + - name: container + value: '{{workflow.parameters.implementations-e7a1060777c9ef84e36a4d54f25d3102abbcbd62d4c8bbb5883c3a2cbcfb5c6d}}' + - name: parent-dag-id + value: '{{inputs.parameters.parent-dag-id}}' + name: print-op-3-driver + template: system-container-driver + - arguments: + parameters: + - name: pod-spec-patch + value: '{{tasks.print-op-3-driver.outputs.parameters.pod-spec-patch}}' + - default: "false" + name: cached-decision + value: '{{tasks.print-op-3-driver.outputs.parameters.cached-decision}}' + depends: print-op-3-driver.Succeeded + name: print-op-3 + template: system-container-executor + - arguments: + parameters: + - name: component + value: '{{workflow.parameters.components-e7a1060777c9ef84e36a4d54f25d3102abbcbd62d4c8bbb5883c3a2cbcfb5c6d}}' + - name: task + value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-print-op-4"},"inputs":{"parameters":{"s":{"componentInputParameter":"pipelinechannel--loop-item-param-3","parameterExpressionSelector":"parseJson(string_value)[\"B_b\"]"}}},"taskInfo":{"name":"print-op-4"}}' + - name: container + value: '{{workflow.parameters.implementations-e7a1060777c9ef84e36a4d54f25d3102abbcbd62d4c8bbb5883c3a2cbcfb5c6d}}' + - name: parent-dag-id + value: '{{inputs.parameters.parent-dag-id}}' + name: print-op-4-driver + template: system-container-driver + - arguments: + parameters: + - name: pod-spec-patch + value: '{{tasks.print-op-4-driver.outputs.parameters.pod-spec-patch}}' + - default: "false" + name: cached-decision + value: '{{tasks.print-op-4-driver.outputs.parameters.cached-decision}}' + depends: print-op-4-driver.Succeeded + name: print-op-4 + template: system-container-executor + inputs: + parameters: + - name: parent-dag-id + metadata: {} + name: comp-for-loop-4 + outputs: {} + - container: + args: + - --type + - '{{inputs.parameters.driver-type}}' + - --pipeline_name + - my-pipeline + - --run_id + - '{{workflow.uid}}' + - --dag_execution_id + - '{{inputs.parameters.parent-dag-id}}' + - --component + - '{{inputs.parameters.component}}' + - --task + - '{{inputs.parameters.task}}' + - --runtime_config + - '{{inputs.parameters.runtime-config}}' + - --iteration_index + - '{{inputs.parameters.iteration-index}}' + - --execution_id_path + - '{{outputs.parameters.execution-id.path}}' + - --iteration_count_path + - '{{outputs.parameters.iteration-count.path}}' + - --condition_path + - '{{outputs.parameters.condition.path}}' + command: + - driver + image: gcr.io/ml-pipeline/kfp-driver + name: "" + resources: + limits: + cpu: 500m + memory: 512Mi + requests: + cpu: 100m + memory: 64Mi + inputs: + parameters: + - name: component + - default: "" + name: runtime-config + - default: "" + name: task + - default: "0" + name: parent-dag-id + - default: "-1" + name: iteration-index + - default: DAG + name: driver-type + metadata: {} + name: system-dag-driver + outputs: + parameters: + - name: execution-id + valueFrom: + path: /tmp/outputs/execution-id + - name: iteration-count + valueFrom: + default: "0" + path: /tmp/outputs/iteration-count + - name: condition + valueFrom: + default: "true" + path: /tmp/outputs/condition + - dag: + tasks: + - arguments: + parameters: + - name: component + value: '{{workflow.parameters.components-comp-for-loop-2}}' + - name: iteration-index + value: '{{inputs.parameters.iteration-index}}' + - name: parent-dag-id + value: '{{inputs.parameters.parent-dag-id}}' + - name: task + value: '{"componentRef":{"name":"comp-for-loop-2"},"iteratorPolicy":{"parallelismLimit":2},"parameterIterator":{"itemInput":"pipelinechannel--loop-item-param-1","items":{"raw":"[{\"A_a\": + \"1\", \"B_b\": \"10\"}, {\"A_a\": \"2\", \"B_b\": \"20\"}, {\"A_a\": + \"3\", \"B_b\": \"30\"}, {\"A_a\": \"4\", \"B_b\": \"40\"}, {\"A_a\": + \"5\", \"B_b\": \"50\"}, {\"A_a\": \"6\", \"B_b\": \"60\"}, {\"A_a\": + \"7\", \"B_b\": \"70\"}, {\"A_a\": \"8\", \"B_b\": \"80\"}, {\"A_a\": + \"9\", \"B_b\": \"90\"}, {\"A_a\": \"10\", \"B_b\": \"100\"}]"}},"taskInfo":{"name":"foo"}}' + name: iteration-item-driver + template: system-dag-driver + - arguments: + parameters: + - name: parent-dag-id + value: '{{tasks.iteration-item-driver.outputs.parameters.execution-id}}' + - name: condition + value: '{{tasks.iteration-item-driver.outputs.parameters.condition}}' + depends: iteration-item-driver.Succeeded + name: iteration-item + template: comp-for-loop-2 + inputs: + parameters: + - name: parent-dag-id + - name: iteration-index + metadata: {} + name: comp-for-loop-2-iteration + outputs: {} + - dag: + tasks: + - arguments: + parameters: + - name: component + value: '{{workflow.parameters.components-comp-for-loop-2}}' + - name: parent-dag-id + value: '{{inputs.parameters.parent-dag-id}}' + - name: task + value: '{"componentRef":{"name":"comp-for-loop-2"},"iteratorPolicy":{"parallelismLimit":2},"parameterIterator":{"itemInput":"pipelinechannel--loop-item-param-1","items":{"raw":"[{\"A_a\": + \"1\", \"B_b\": \"10\"}, {\"A_a\": \"2\", \"B_b\": \"20\"}, {\"A_a\": + \"3\", \"B_b\": \"30\"}, {\"A_a\": \"4\", \"B_b\": \"40\"}, {\"A_a\": + \"5\", \"B_b\": \"50\"}, {\"A_a\": \"6\", \"B_b\": \"60\"}, {\"A_a\": + \"7\", \"B_b\": \"70\"}, {\"A_a\": \"8\", \"B_b\": \"80\"}, {\"A_a\": + \"9\", \"B_b\": \"90\"}, {\"A_a\": \"10\", \"B_b\": \"100\"}]"}},"taskInfo":{"name":"foo"}}' + name: iteration-driver + template: system-dag-driver + - arguments: + parameters: + - name: parent-dag-id + value: '{{tasks.iteration-driver.outputs.parameters.execution-id}}' + - name: iteration-index + value: '{{item}}' + depends: iteration-driver.Succeeded + name: iteration-iterations + template: comp-for-loop-2-iteration + withSequence: + count: '{{tasks.iteration-driver.outputs.parameters.iteration-count}}' + inputs: + parameters: + - name: parent-dag-id + metadata: {} + name: comp-for-loop-2-for-loop-2-iterator + outputs: {} + parallelism: 2 + - dag: + tasks: + - arguments: + parameters: + - name: component + value: '{{workflow.parameters.components-comp-for-loop-4}}' + - name: iteration-index + value: '{{inputs.parameters.iteration-index}}' + - name: parent-dag-id + value: '{{inputs.parameters.parent-dag-id}}' + - name: task + value: '{"componentRef":{"name":"comp-for-loop-4"},"iteratorPolicy":{"parallelismLimit":4},"parameterIterator":{"itemInput":"pipelinechannel--loop-item-param-3","items":{"raw":"[{\"A_a\": + \"1\", \"B_b\": \"10\"}, {\"A_a\": \"2\", \"B_b\": \"20\"}, {\"A_a\": + \"3\", \"B_b\": \"30\"}, {\"A_a\": \"4\", \"B_b\": \"40\"}, {\"A_a\": + \"5\", \"B_b\": \"50\"}, {\"A_a\": \"6\", \"B_b\": \"60\"}, {\"A_a\": + \"7\", \"B_b\": \"70\"}, {\"A_a\": \"8\", \"B_b\": \"80\"}, {\"A_a\": + \"9\", \"B_b\": \"90\"}, {\"A_a\": \"10\", \"B_b\": \"100\"}]"}},"taskInfo":{"name":"bar"}}' + name: iteration-item-driver + template: system-dag-driver + - arguments: + parameters: + - name: parent-dag-id + value: '{{tasks.iteration-item-driver.outputs.parameters.execution-id}}' + - name: condition + value: '{{tasks.iteration-item-driver.outputs.parameters.condition}}' + depends: iteration-item-driver.Succeeded + name: iteration-item + template: comp-for-loop-4 + inputs: + parameters: + - name: parent-dag-id + - name: iteration-index + metadata: {} + name: comp-for-loop-4-iteration + outputs: {} + - dag: + tasks: + - arguments: + parameters: + - name: component + value: '{{workflow.parameters.components-comp-for-loop-4}}' + - name: parent-dag-id + value: '{{inputs.parameters.parent-dag-id}}' + - name: task + value: '{"componentRef":{"name":"comp-for-loop-4"},"iteratorPolicy":{"parallelismLimit":4},"parameterIterator":{"itemInput":"pipelinechannel--loop-item-param-3","items":{"raw":"[{\"A_a\": + \"1\", \"B_b\": \"10\"}, {\"A_a\": \"2\", \"B_b\": \"20\"}, {\"A_a\": + \"3\", \"B_b\": \"30\"}, {\"A_a\": \"4\", \"B_b\": \"40\"}, {\"A_a\": + \"5\", \"B_b\": \"50\"}, {\"A_a\": \"6\", \"B_b\": \"60\"}, {\"A_a\": + \"7\", \"B_b\": \"70\"}, {\"A_a\": \"8\", \"B_b\": \"80\"}, {\"A_a\": + \"9\", \"B_b\": \"90\"}, {\"A_a\": \"10\", \"B_b\": \"100\"}]"}},"taskInfo":{"name":"bar"}}' + name: iteration-driver + template: system-dag-driver + - arguments: + parameters: + - name: parent-dag-id + value: '{{tasks.iteration-driver.outputs.parameters.execution-id}}' + - name: iteration-index + value: '{{item}}' + depends: iteration-driver.Succeeded + name: iteration-iterations + template: comp-for-loop-4-iteration + withSequence: + count: '{{tasks.iteration-driver.outputs.parameters.iteration-count}}' + inputs: + parameters: + - name: parent-dag-id + metadata: {} + name: comp-for-loop-4-for-loop-4-iterator + outputs: {} + parallelism: 4 + - dag: + tasks: + - arguments: + parameters: + - name: parent-dag-id + value: '{{inputs.parameters.parent-dag-id}}' + depends: "" + name: for-loop-2-loop + template: comp-for-loop-2-for-loop-2-iterator + - arguments: + parameters: + - name: parent-dag-id + value: '{{inputs.parameters.parent-dag-id}}' + depends: "" + name: for-loop-4-loop + template: comp-for-loop-4-for-loop-4-iterator + inputs: + parameters: + - name: parent-dag-id + metadata: {} + name: root + outputs: {} + - dag: + tasks: + - arguments: + parameters: + - name: component + value: '{{workflow.parameters.components-root}}' + - name: runtime-config + value: '{"parameters":{"text":{"stringValue":"hello world"}}}' + - name: driver-type + value: ROOT_DAG + name: root-driver + template: system-dag-driver + - arguments: + parameters: + - name: parent-dag-id + value: '{{tasks.root-driver.outputs.parameters.execution-id}}' + - name: condition + value: "" + depends: root-driver.Succeeded + name: root + template: root + inputs: {} + metadata: {} + name: entrypoint + outputs: {} +status: + finishedAt: null + startedAt: null diff --git a/backend/src/v2/compiler/testdata/multiple_parallel_loops.json b/backend/src/v2/compiler/testdata/multiple_parallel_loops.json new file mode 100644 index 00000000000..1208aa20445 --- /dev/null +++ b/backend/src/v2/compiler/testdata/multiple_parallel_loops.json @@ -0,0 +1,284 @@ +{ + "pipelineSpec" :{ + "components": { + "comp-for-loop-2": { + "dag": { + "tasks": { + "print-op": { + "cachingOptions": { + "enableCache": true + }, + "componentRef": { + "name": "comp-print-op" + }, + "inputs": { + "parameters": { + "s": { + "componentInputParameter": "pipelinechannel--loop-item-param-1", + "parameterExpressionSelector": "parseJson(string_value)[\"A_a\"]" + } + } + }, + "taskInfo": { + "name": "print-op" + } + }, + "print-op-2": { + "cachingOptions": { + "enableCache": true + }, + "componentRef": { + "name": "comp-print-op-2" + }, + "inputs": { + "parameters": { + "s": { + "componentInputParameter": "pipelinechannel--loop-item-param-1", + "parameterExpressionSelector": "parseJson(string_value)[\"B_b\"]" + } + } + }, + "taskInfo": { + "name": "print-op-2" + } + } + } + }, + "inputDefinitions": { + "parameters": { + "pipelinechannel--loop-item-param-1": { + "parameterType": "STRUCT" + } + } + } + }, + "comp-for-loop-4": { + "dag": { + "tasks": { + "print-op-3": { + "cachingOptions": { + "enableCache": true + }, + "componentRef": { + "name": "comp-print-op-3" + }, + "inputs": { + "parameters": { + "s": { + "componentInputParameter": "pipelinechannel--loop-item-param-3", + "parameterExpressionSelector": "parseJson(string_value)[\"A_a\"]" + } + } + }, + "taskInfo": { + "name": "print-op-3" + } + }, + "print-op-4": { + "cachingOptions": { + "enableCache": true + }, + "componentRef": { + "name": "comp-print-op-4" + }, + "inputs": { + "parameters": { + "s": { + "componentInputParameter": "pipelinechannel--loop-item-param-3", + "parameterExpressionSelector": "parseJson(string_value)[\"B_b\"]" + } + } + }, + "taskInfo": { + "name": "print-op-4" + } + } + } + }, + "inputDefinitions": { + "parameters": { + "pipelinechannel--loop-item-param-3": { + "parameterType": "STRUCT" + } + } + } + }, + "comp-print-op": { + "executorLabel": "exec-print-op", + "inputDefinitions": { + "parameters": { + "s": { + "parameterType": "STRING" + } + } + } + }, + "comp-print-op-2": { + "executorLabel": "exec-print-op-2", + "inputDefinitions": { + "parameters": { + "s": { + "parameterType": "STRING" + } + } + } + }, + "comp-print-op-3": { + "executorLabel": "exec-print-op-3", + "inputDefinitions": { + "parameters": { + "s": { + "parameterType": "STRING" + } + } + } + }, + "comp-print-op-4": { + "executorLabel": "exec-print-op-4", + "inputDefinitions": { + "parameters": { + "s": { + "parameterType": "STRING" + } + } + } + } + }, + "deploymentSpec": { + "executors": { + "exec-print-op": { + "container": { + "args": [ + "--executor_input", + "{{$}}", + "--function_to_execute", + "print_op" + ], + "command": [ + "sh", + "-c", + "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==2.7.0' '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"$0\" \"$@\"\n", + "sh", + "-ec", + "program_path=$(mktemp -d)\n\nprintf \"%s\" \"$0\" > \"$program_path/ephemeral_component.py\"\n_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n", + "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import *\n\ndef print_op(s: str):\n print(s)\n\n" + ], + "image": "python:3.7" + } + }, + "exec-print-op-2": { + "container": { + "args": [ + "--executor_input", + "{{$}}", + "--function_to_execute", + "print_op" + ], + "command": [ + "sh", + "-c", + "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==2.7.0' '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"$0\" \"$@\"\n", + "sh", + "-ec", + "program_path=$(mktemp -d)\n\nprintf \"%s\" \"$0\" > \"$program_path/ephemeral_component.py\"\n_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n", + "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import *\n\ndef print_op(s: str):\n print(s)\n\n" + ], + "image": "python:3.7" + } + }, + "exec-print-op-3": { + "container": { + "args": [ + "--executor_input", + "{{$}}", + "--function_to_execute", + "print_op" + ], + "command": [ + "sh", + "-c", + "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==2.7.0' '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"$0\" \"$@\"\n", + "sh", + "-ec", + "program_path=$(mktemp -d)\n\nprintf \"%s\" \"$0\" > \"$program_path/ephemeral_component.py\"\n_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n", + "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import *\n\ndef print_op(s: str):\n print(s)\n\n" + ], + "image": "python:3.7" + } + }, + "exec-print-op-4": { + "container": { + "args": [ + "--executor_input", + "{{$}}", + "--function_to_execute", + "print_op" + ], + "command": [ + "sh", + "-c", + "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==2.7.0' '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"$0\" \"$@\"\n", + "sh", + "-ec", + "program_path=$(mktemp -d)\n\nprintf \"%s\" \"$0\" > \"$program_path/ephemeral_component.py\"\n_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n", + "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import *\n\ndef print_op(s: str):\n print(s)\n\n" + ], + "image": "python:3.7" + } + } + } + }, + "pipelineInfo": { + "name": "my-pipeline" + }, + "root": { + "dag": { + "tasks": { + "for-loop-2": { + "componentRef": { + "name": "comp-for-loop-2" + }, + "iteratorPolicy": { + "parallelismLimit": 2 + }, + "parameterIterator": { + "itemInput": "pipelinechannel--loop-item-param-1", + "items": { + "raw": "[{\"A_a\": \"1\", \"B_b\": \"10\"}, {\"A_a\": \"2\", \"B_b\": \"20\"}, {\"A_a\": \"3\", \"B_b\": \"30\"}, {\"A_a\": \"4\", \"B_b\": \"40\"}, {\"A_a\": \"5\", \"B_b\": \"50\"}, {\"A_a\": \"6\", \"B_b\": \"60\"}, {\"A_a\": \"7\", \"B_b\": \"70\"}, {\"A_a\": \"8\", \"B_b\": \"80\"}, {\"A_a\": \"9\", \"B_b\": \"90\"}, {\"A_a\": \"10\", \"B_b\": \"100\"}]" + } + }, + "taskInfo": { + "name": "foo" + } + }, + "for-loop-4": { + "componentRef": { + "name": "comp-for-loop-4" + }, + "iteratorPolicy": { + "parallelismLimit": 4 + }, + "parameterIterator": { + "itemInput": "pipelinechannel--loop-item-param-3", + "items": { + "raw": "[{\"A_a\": \"1\", \"B_b\": \"10\"}, {\"A_a\": \"2\", \"B_b\": \"20\"}, {\"A_a\": \"3\", \"B_b\": \"30\"}, {\"A_a\": \"4\", \"B_b\": \"40\"}, {\"A_a\": \"5\", \"B_b\": \"50\"}, {\"A_a\": \"6\", \"B_b\": \"60\"}, {\"A_a\": \"7\", \"B_b\": \"70\"}, {\"A_a\": \"8\", \"B_b\": \"80\"}, {\"A_a\": \"9\", \"B_b\": \"90\"}, {\"A_a\": \"10\", \"B_b\": \"100\"}]" + } + }, + "taskInfo": { + "name": "bar" + } + } + } + } + }, + "schemaVersion": "2.1.0", + "sdkVersion": "kfp-2.7.0" + }, + "runtimeConfig": { + "parameters": { + "text": { + "stringValue": "hello world" + } + } + } +}