-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(backend): parallelFor resolve upstream inputs. Fixes #11520 #11627
Conversation
Hi @zazulam. Thanks for your PR. I'm waiting for a kubeflow member to verify that this patch is reasonable to test. If it is, they should reply with Once the patch is verified, the new status will be reflected by the I understand the commands that are listed here. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
/ok-to-test |
/retest |
Adding some details here related to The example used was from the comment and also added as the test case in the from kfp import dsl
@dsl.component
def print_op(message: str) -> str:
print(message)
return message
@dsl.component
def reduce_op(message: str) -> str:
print(message)
return message[0]
@dsl.pipeline()
def my_pipeline():
with dsl.ParallelFor([1, 2, 3]):
one = print_op(message='foo')
two = print_op(message='bar').after(one) |
3adfc00
to
bb48a9a
Compare
/lgtm Thanks for the quick turn around on this folks! |
@zazulam can you rebase? there are conflicts |
Signed-off-by: zazulam <m.zazula@gmail.com>
bb48a9a
to
c8a49fc
Compare
I'm going to save the |
@zazulam I think separate pr makes sense, if we can keep this one light weight I might be able to cherry pick this for the 2.4.1 patch release I'll make next week so we can address the regression for kubeflow 1.10. Feel free to hit me up on slack once the pr is ready, or if you get hit with flaky tests. |
/approve |
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: droctothorpe, HumairAK The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
/lgtm |
Description of your changes:
Updated
resolveUpstreamArtifacts
to usegetDAGTasks
to allow for it parse through parallelFor DAG contexts to retrieve the appropriate producerTask. This fixes [backend] dsl.ParallelFor loop: cannot resolve the upstream artifact output of a previous pod #11520, it seems that the call with the filter toGetExecutionsInDAG
was not reverted back to the same as it is inresolveUpstreamParameters
in feat(backend): implement subdag output resolution #11196.Updated the argocompiler for the
iteratorTask
section. The additional "-loop" added to the task associated with the parallelFor DAG breaks the dependency validation that the argoworkflows API calls in the submission of the pipeline. This partially resolves [sdk] Unable to aggregate results over ParallelFor in Kubeflow V2 using V1 workarounds such as.after()
#10050 for the.after()
usage of a parallelTask. The implementation fordsl.Collected
in the backend will be coming in a follow-up PR.cc: @droctothorpe
Checklist: