Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: add debug flag while starting workflows #454

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ def start_from_ir(
workspace_id: t.Optional[WorkspaceId] = None,
project_id: t.Optional[ProjectId] = None,
dry_run: bool = False,
debug: bool = False,
):
"""Start workflow run from its IR representation.

Expand All @@ -150,6 +151,7 @@ def start_from_ir(
project_id: ID of the project for workflow - supported only on CE
dry_run: Run the workflow without actually executing any task code.
Useful for testing infrastructure, dependency imports, etc.
debug: Run the workflow with debug flag on.
"""
_config = resolve_config(config)

Expand All @@ -168,6 +170,7 @@ def start_from_ir(
config=_config,
project=_project,
dry_run=dry_run,
debug=debug,
)

return wf_run
Expand All @@ -179,10 +182,11 @@ def _start(
runtime: RuntimeInterface,
config: t.Optional[RuntimeConfig],
dry_run: bool,
debug: bool,
project: t.Optional[ProjectRef] = None,
):
"""Schedule workflow for execution and return WorkflowRun."""
run_id = runtime.create_workflow_run(wf_def, project, dry_run)
run_id = runtime.create_workflow_run(wf_def, project, dry_run, debug)

workflow_run = WorkflowRun(
run_id=run_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,11 @@ def __init__(
self._token = token

def create_workflow_run(
self, workflow_def: WorkflowDef, project: Optional[ProjectRef], dry_run: bool
self,
workflow_def: WorkflowDef,
project: Optional[ProjectRef],
dry_run: bool,
debug: bool = False,
) -> WorkflowRunId:
"""Schedules a workflow definition for execution.

Expand All @@ -168,6 +172,7 @@ def create_workflow_run(
project: Project dir (workspace and project ID) on which the workflow
will be run.
dry_run: If True, code of the tasks will not be executed.
debug: If True, debug mode on CE will be enabled

Raises:
WorkflowSyntaxError: when the workflow definition was rejected by the remote
Expand Down Expand Up @@ -225,7 +230,11 @@ def create_workflow_run(
workflow_def_id = self._client.create_workflow_def(workflow_def, project)

workflow_run_id = self._client.create_workflow_run(
workflow_def_id, resources, dry_run, head_node_resources
workflow_def_id=workflow_def_id,
resources=resources,
dry_run=dry_run,
debug=debug,
head_node_resources=head_node_resources,
)
except _exceptions.InvalidWorkflowDef as e:
raise exceptions.WorkflowSyntaxError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,7 @@ def create_workflow_run(
workflow_def_id: _models.WorkflowDefID,
resources: _models.Resources,
dry_run: bool,
debug: bool,
head_node_resources: Optional[_models.HeadNodeResources],
) -> _models.WorkflowRunID:
"""Submit a workflow def to run in the workflow driver.
Expand All @@ -487,6 +488,7 @@ def create_workflow_run(
workflow_def_id: ID of the workflow definition to be submitted.
resources: The resources required to execute the workflow.
dry_run: Run the workflow without actually executing any task code.
debug: pass debug flag to workflow driver
head_node_resources: the requested resources for the head node

Raises:
Expand All @@ -507,6 +509,7 @@ def create_workflow_run(
workflowDefinitionID=workflow_def_id,
resources=resources,
dryRun=dry_run,
debug=debug,
headNodeResources=head_node_resources,
).model_dump(),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ class CreateWorkflowRunRequest(BaseModel):
workflowDefinitionID: WorkflowDefID
resources: Resources
dryRun: bool
debug: bool
headNodeResources: Optional[HeadNodeResources] = None


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,14 @@ def create_workflow_run(
workflow_def: ir.WorkflowDef,
project: t.Optional[ProjectRef],
dry_run: bool,
debug: bool = False,
) -> WfRunId:
if debug:
warnings.warn(
"InProcessRuntime doesn't support `debug`." " Flag will be ignored.",
category=exceptions.UnsupportedRuntimeFeature,
)

if project:
warnings.warn(
"in_process runtime doesn't support project-scoped workflows. "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ def run(
workspace_id: Optional[WorkspaceId] = None,
project_id: Optional[ProjectId] = None,
dry_run: bool = False,
debug: bool = False,
) -> _api.WorkflowRun:
"""Schedules workflow for execution.

Expand All @@ -167,6 +168,7 @@ def run(
project_id: ID of the project for workflow - supported only on CE
dry_run: Run the workflow without actually executing any task code.
Useful for testing infrastructure, dependency imports, etc.
debug: sends workflow to CE with debug flag

Raises:
orquestra.sdk.exceptions.DirtyGitRepo: (warning) when a task def used by
Expand All @@ -187,6 +189,7 @@ def run(
workspace_id=workspace_id,
project_id=project_id,
dry_run=dry_run,
debug=debug,
)
except exceptions.ProjectInvalidError:
raise
Expand Down
5 changes: 4 additions & 1 deletion projects/orquestra-sdk/tests/sdk/api/test_wf_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ def run(
config=None,
project=None,
dry_run=False,
debug=False,
)

class TestByID:
Expand Down Expand Up @@ -2182,7 +2183,9 @@ def test_run(
monkeypatch.setattr(_config.RuntimeConfig, "name", "auto")
with raises:
wf_def.run("in_process", workspace_id=workspace_id, project_id=project_id)
workflow_create_mock.assert_called_once_with(wf_def.model, expected, False)
workflow_create_mock.assert_called_once_with(
wf_def.model, expected, False, False
)


class TestListWorkspaces:
Expand Down
90 changes: 53 additions & 37 deletions projects/orquestra-sdk/tests/sdk/driver/test_ce_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,11 @@ def test_happy_path(
my_workflow.model, ProjectRef(workspace_id="a", project_id="b")
)
mocked_client.create_workflow_run.assert_called_once_with(
workflow_def_id,
_models.Resources(cpu=None, memory=None, gpu=None, nodes=None),
False,
None,
workflow_def_id=workflow_def_id,
resources=_models.Resources(cpu=None, memory=None, gpu=None, nodes=None),
dry_run=False,
debug=False,
head_node_resources=None,
)
assert isinstance(wf_run_id, WorkflowRunId)
assert (
Expand All @@ -136,12 +137,15 @@ def test_with_memory(
dry_run=False,
)

# Then
# then
mocked_client.create_workflow_run.assert_called_once_with(
workflow_def_id,
_models.Resources(cpu=None, memory="10Gi", gpu=None, nodes=None),
False,
None,
workflow_def_id=workflow_def_id,
resources=_models.Resources(
cpu=None, memory="10Gi", gpu=None, nodes=None
),
dry_run=False,
debug=False,
head_node_resources=None,
)

def test_with_cpu(
Expand All @@ -164,10 +168,13 @@ def test_with_cpu(

# Then
mocked_client.create_workflow_run.assert_called_once_with(
workflow_def_id,
_models.Resources(cpu="1000m", memory=None, gpu=None, nodes=None),
False,
None,
workflow_def_id=workflow_def_id,
resources=_models.Resources(
cpu="1000m", memory=None, gpu=None, nodes=None
),
dry_run=False,
debug=False,
head_node_resources=None,
)

def test_with_gpu(
Expand All @@ -190,10 +197,11 @@ def test_with_gpu(

# Then
mocked_client.create_workflow_run.assert_called_once_with(
workflow_def_id,
_models.Resources(cpu=None, memory=None, gpu="1", nodes=None),
False,
None,
workflow_def_id=workflow_def_id,
resources=_models.Resources(cpu=None, memory=None, gpu="1", nodes=None),
dry_run=False,
debug=False,
head_node_resources=None,
)

def test_maximum_resource(
Expand All @@ -216,10 +224,13 @@ def test_maximum_resource(

# Then
mocked_client.create_workflow_run.assert_called_once_with(
workflow_def_id,
_models.Resources(cpu="5000m", memory="3G", gpu="1", nodes=None),
False,
None,
workflow_def_id=workflow_def_id,
resources=_models.Resources(
cpu="5000m", memory="3G", gpu="1", nodes=None
),
dry_run=False,
debug=False,
head_node_resources=None,
)

def test_resources_from_workflow(
Expand All @@ -244,10 +255,11 @@ def test_resources_from_workflow(

# Then
mocked_client.create_workflow_run.assert_called_once_with(
workflow_def_id,
_models.Resources(cpu="1", memory="1.5G", gpu="1", nodes=20),
False,
None,
workflow_def_id=workflow_def_id,
resources=_models.Resources(cpu="1", memory="1.5G", gpu="1", nodes=20),
dry_run=False,
debug=False,
head_node_resources=None,
)

@pytest.mark.parametrize(
Expand Down Expand Up @@ -284,10 +296,11 @@ def wf():

# Then
mocked_client.create_workflow_run.assert_called_once_with(
workflow_def_id,
_models.Resources(cpu=None, memory=None, nodes=None, gpu=None),
False,
None,
workflow_def_id=workflow_def_id,
resources=_models.Resources(cpu=None, memory=None, gpu=None, nodes=None),
dry_run=False,
debug=False,
head_node_resources=None,
)

@pytest.mark.parametrize(
Expand Down Expand Up @@ -323,11 +336,13 @@ def wf():
)

# Then

mocked_client.create_workflow_run.assert_called_once_with(
workflow_def_id,
_models.Resources(cpu=None, memory=None, nodes=None, gpu=None),
False,
_models.HeadNodeResources(cpu=cpu, memory=memory),
workflow_def_id=workflow_def_id,
resources=_models.Resources(cpu=None, memory=None, gpu=None, nodes=None),
dry_run=False,
debug=False,
head_node_resources=_models.HeadNodeResources(cpu=cpu, memory=memory),
)

class TestWorkflowDefFailure:
Expand Down Expand Up @@ -2159,10 +2174,11 @@ def wf():
assert all([telltale in str(exec_info) for telltale in telltales])
else:
mocked_client.create_workflow_run.assert_called_once_with(
workflow_def_id,
expected_resources,
False,
None,
workflow_def_id=workflow_def_id,
resources=expected_resources,
dry_run=False,
debug=False,
head_node_resources=None,
)


Expand Down
Loading
Loading