Skip to content

Commit

Permalink
Feat: Allow users to select head node image in workflow decorator (#448)
Browse files Browse the repository at this point in the history
# The problem
Head node image is bounded to certain Python version. To overcome that
we might want more images of head node with more python versions.

# This PR's solution
For now lets allow users to select manually which head node image to use

# Checklist

_Check that this PR satisfies the following items:_

- [x] Tests have been added for new features/changed behavior (if no new
features have been added, check the box).
- [x] The [changelog file](CHANGELOG.md) has been updated with a
user-readable description of the changes (if the change isn't visible to
the user in any way, check the box).
- [x] The PR's title is prefixed with
`<feat/fix/chore/imp[rovement]/int[ernal]/docs>[!]:`
- [ ] The PR is linked to a JIRA ticket (if there's no suitable ticket,
check the box).
  • Loading branch information
SebastianMorawiec authored Oct 4, 2024
1 parent f8c4ed6 commit c510666
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 2 deletions.
20 changes: 20 additions & 0 deletions projects/orquestra-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,26 @@

🔥 *Features*

* `workflow` decorator now accepts optional `head_node_image` parameter to select what image should be used in head node.

🧟 *Deprecations*

👩‍🔬 *Experimental*

🐛 *Bug Fixes*

💅 *Improvements*

🥷 *Internal*

📃 *Docs*

## v1.0.0

🚨 *Breaking Changes*

🔥 *Features*

* New function: `sdk.infer_git_ref` to infer git refs for Git Imports.
* SDK dependency will be explicitly added as a PythonImport to every task submitted. Version will be inferred from local version during submission.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -927,7 +927,9 @@ def flatten_graph(
metadata=ir.WorkflowMetadata(
sdk_version=sdk_version,
python_version=python_version,
head_node_image=_docker_images.HEAD_NODE_IMAGE,
head_node_image=workflow_def._head_node_image
if workflow_def._head_node_image is not None
else _docker_images.HEAD_NODE_IMAGE,
),
resources=_make_resources_model(workflow_def._resources, is_task=False),
# At the moment 'orq submit workflow-def <name>' assumes that the <name> is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ def __init__(
workflow_fn: Callable[..., _R],
fn_ref: FunctionRef,
resources: _dsl.Resources,
head_node_image: Optional[str],
data_aggregation: Optional[DataAggregation] = None,
workflow_args: Optional[Tuple[Any, ...]] = None,
workflow_kwargs: Optional[Dict[str, Any]] = None,
Expand All @@ -84,6 +85,7 @@ def __init__(
self._data_aggregation = data_aggregation
self._workflow_args = workflow_args or ()
self._workflow_kwargs = workflow_kwargs or {}
self._head_node_image = head_node_image
self.default_source_import = default_source_import
self.default_dependency_imports = default_dependency_imports

Expand Down Expand Up @@ -246,6 +248,7 @@ def with_resources(
workflow_kwargs=self._workflow_kwargs,
default_source_import=self.default_source_import,
default_dependency_imports=self.default_dependency_imports,
head_node_image=self._head_node_image,
)

def with_head_node_resources(
Expand Down Expand Up @@ -297,6 +300,40 @@ def with_head_node_resources(
workflow_kwargs=self._workflow_kwargs,
default_source_import=self.default_source_import,
default_dependency_imports=self.default_dependency_imports,
head_node_image=self._head_node_image,
)

def with_head_node_image(
self,
*,
image: Optional[str],
) -> "WorkflowDef":
"""Assigns optional metadata related to this workflow definition object.
Doesn't modify the existing workflow definition, returns a new one.
Example usage::
wf_run = my_workflow().with_head_node_image(
image="abc"
).run("my_cluster")
Args:
image: docker image to be used as head node image,
Image should be full path to docker image with the tag, example:
"hub.stage.nexus.orquestra.io/zapatacomputing/workflow-driver-ray:orquestra-head-image-v1.0.0"
"""
return WorkflowDef(
name=self._name,
workflow_fn=self._fn,
fn_ref=self._fn_ref,
resources=self._resources,
data_aggregation=self._data_aggregation,
workflow_args=self._workflow_args,
workflow_kwargs=self._workflow_kwargs,
default_source_import=self.default_source_import,
default_dependency_imports=self.default_dependency_imports,
head_node_image=image,
)


Expand All @@ -313,6 +350,7 @@ def __init__(
data_aggregation: Optional[Union[DataAggregation, bool]] = None,
default_source_import: Optional[Import] = None,
default_dependency_imports: Optional[Iterable[Import]] = None,
head_node_image: Optional[str] = None,
):
self._custom_name = custom_name
self._fn = workflow_fn
Expand All @@ -322,6 +360,7 @@ def __init__(
self._data_aggregation = data_aggregation
self._default_source_import = default_source_import
self._default_dependency_imports = default_dependency_imports
self._head_node_image = head_node_image

# flake8: ignore=DOC101
def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> WorkflowDef[_R]:
Expand Down Expand Up @@ -413,6 +452,7 @@ def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> WorkflowDef[_R]:
workflow_kwargs=kwargs,
default_source_import=self._default_source_import,
default_dependency_imports=self._default_dependency_imports,
head_node_image=self._head_node_image,
)

@property
Expand Down Expand Up @@ -609,6 +649,7 @@ def workflow(
custom_name: Optional[str] = None,
default_source_import: Optional[Import] = None,
default_dependency_imports: Optional[Iterable[Import]] = None,
head_node_image: Optional[str] = None,
) -> Callable[[Callable[_P, _R]], WorkflowTemplate[_P, _R]]:
...

Expand All @@ -623,6 +664,7 @@ def workflow(
custom_name: Optional[str] = None,
default_source_import: Optional[Import] = None,
default_dependency_imports: Optional[Iterable[Import]] = None,
head_node_image: Optional[str] = None,
) -> WorkflowTemplate[_P, _R]:
...

Expand All @@ -636,6 +678,7 @@ def workflow(
custom_name: Optional[str] = None,
default_source_import: Optional[Import] = None,
default_dependency_imports: Union[Iterable[Import], Import, None] = None,
head_node_image: Optional[str] = None,
) -> Union[
WorkflowTemplate[_P, _R],
Callable[[Callable[_P, _R]], WorkflowTemplate[_P, _R]],
Expand Down Expand Up @@ -664,6 +707,9 @@ def workflow(
Important: if a task defines its own individual dependency imports, the
workflow scoped default_dependency_imports will be ignored for that
particular task
head_node_image: Path to docker image that will be used as head node image.
Image should be full path to docker image with the tag, example:
"hub.stage.nexus.orquestra.io/zapatacomputing/workflow-driver-ray:orquestra-head-image-v1.0.0"
You can use the Python API to submit workflows for execution::
Expand Down Expand Up @@ -730,6 +776,7 @@ def _inner(fn: Callable[_P, _R]):
data_aggregation=_data_aggregation,
default_source_import=default_source_import,
default_dependency_imports=workflow_default_dependency_imports,
head_node_image=head_node_image,
)
functools.update_wrapper(template, fn)
return template
Expand Down
24 changes: 23 additions & 1 deletion projects/orquestra-sdk/tests/sdk/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from orquestra.workflow_shared.schema import ir

import orquestra.sdk as sdk
from orquestra.sdk._client._base import _traversal, _workflow, loader
from orquestra.sdk._client._base import _docker_images, _traversal, _workflow, loader
from orquestra.sdk._client._base._dsl import InvalidPlaceholderInCustomTaskNameError

DEFAULT_LOCAL_REPO_PATH = Path(__file__).parent.resolve()
Expand Down Expand Up @@ -338,6 +338,28 @@ def test_is_exportable_to_dot(tmp_path):
}


@sdk.workflow(head_node_image="my_image")
def wf():
return _an_empty_task()


class TestHeadNodeImage:
def test_head_node_in_decorator(self):
model = wf().model
assert model.metadata is not None
assert model.metadata.head_node_image == "my_image"

def test_with_head_node_image(self):
new_wf_model = wf().with_head_node_image(image="different_image").model
assert new_wf_model.metadata is not None
assert new_wf_model.metadata.head_node_image == "different_image"

def test_default_head_node_image(self):
model = _simple_workflow().model
assert model.metadata is not None
assert model.metadata.head_node_image == _docker_images.HEAD_NODE_IMAGE


class TestResources:
@pytest.fixture
def resourced_workflow(self):
Expand Down

0 comments on commit c510666

Please sign in to comment.