Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Launching a task from a task is broken when using DaskTaskRunner due to a flow run context serialization failure. #16422

Open
kzvezdarov opened this issue Dec 17, 2024 · 2 comments
Labels
bug Something isn't working

Comments

@kzvezdarov
Copy link
Contributor

Bug summary

Launching a task from a task in a flow that's using the DaskTaskRunner and a process workpool fails with a flow run context serialization error.

The following flow, executed through a deployment and a process-based work pool, against an existing Dask cluster:

from prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner


@task
def subtask():
    return "Ok"


@task
def main_task():
    result = subtask.submit()
    result.wait()


@flow(
    flow_run_name="task_from_task",
    task_runner=DaskTaskRunner(address="tcp://dask_scheduler:8786"),
    log_prints=True,
)
def task_from_task():
    result = main_task.submit()

    result.wait()

Results in this traceback:

Task run failed with exception: TypeError("'MockValSer' object cannot be converted to 'SchemaSerializer'") - Retries are exhausted
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/site-packages/prefect/task_engine.py", line 810, in run_context
    yield self
  File "/usr/local/lib/python3.12/site-packages/prefect/task_engine.py", line 1398, in run_task_sync
    engine.call_task_fn(txn)
  File "/usr/local/lib/python3.12/site-packages/prefect/task_engine.py", line 833, in call_task_fn
    result = call_with_parameters(self.task.fn, parameters)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/prefect/utilities/callables.py", line 208, in call_with_parameters
    return fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^
  File "/tmp/tmpzxb8qrs_prefect/data_pipeline/flows/task_task.py", line 12, in main_task
  File "/usr/local/lib/python3.12/site-packages/prefect/tasks.py", line 1175, in submit
    future = task_runner.submit(self, parameters, wait_for)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/prefect_dask/task_runners.py", line 353, in submit
    future = self._client.submit(
             ^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/prefect_dask/client.py", line 35, in submit
    run_task_kwargs["context"] = serialize_context()
                                 ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/prefect/context.py", line 78, in serialize_context
    "flow_run_context": flow_run_context.serialize() if flow_run_context else {},
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/prefect/context.py", line 373, in serialize
    return self.model_dump(
           ^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/pydantic/main.py", line 426, in model_dump
    return self.__pydantic_serializer__.to_python(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: 'MockValSer' object cannot be converted to 'SchemaSerializer'
11:48:07 AM
main_task-ed4
prefect.task_runs

Finished in state Failed("Task run encountered an exception TypeError: 'MockValSer' object cannot be converted to 'SchemaSerializer'")

Removing the DaskTaskRunner lets the flow execute correctly. Removing the subtask call also fixes the issue.

Version info

Version:             3.1.7
API version:         0.8.4
Python version:      3.12.7
Git commit:          c05ffa6d
Built:               Mon, Dec 16, 2024 10:06 AM
OS/Arch:             linux/aarch64
Profile:             ephemeral
Server type:         server
Pydantic version:    2.10.3
Integrations:
  prefect-dask:      0.3.2
  prefect-gcp:       0.6.2

Additional context

task_from_task.csv

@kzvezdarov kzvezdarov added the bug Something isn't working label Dec 17, 2024
@kzvezdarov
Copy link
Contributor Author

Seems to be related to pydantic/pydantic#7713 ; setting serialize_as_any here https://github.com/PrefectHQ/prefect/blob/main/src/prefect/context.py#L385 seems to allow at least the flow model to serialize.

@cicdw
Copy link
Member

cicdw commented Dec 20, 2024

Submitting work to a dask cluster from a dask worker (which is where main_task will be executed in your example) is only experimentally supported in Dask itself, so this is something that we might not be able to fully support.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants