Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ModuleNotFoundError for local modules when running a deployed flow with DaskTaskRunner #16419

Open
jomariya23156 opened this issue Dec 17, 2024 · 0 comments
Labels
bug Something isn't working

Comments

@jomariya23156
Copy link

Bug summary

Setting:

  • Use DaskTaskRunner and .submit
  • Happen with Deployment only
  • Worker is a Docker container
  • In the pull section of the deployment, I use git_clone and follow by pip_install_requirements.
  • You can find the simplified version of this problem at this repo https://github.com/jomariya23156/prefect-mp-test. I included everything e.g. flow code, Dockerfile of the worker, prefect.yaml, etc.

Description:

When running a flow with DaskTaskRunner and submitting tasks using .submit like this

@flow(name="main_execute_flow", task_runner=dask_runner, log_prints=True)
def main_execute_flow(input_ids: List[int]):
    ...
    futures = []
    for idx, chunk in enumerate(ids_chunks):
        future = parallel_execute.submit(base_file_path=".", input_ids=chunk)
        futures.append(future)
        logger.info(f"Submitted chunk no. {idx}")
    ...

If the task, in this case parallel_execute, uses imported functions from a local module or .py, it will raise ModuleNotFoundError and show this message distributed.protocol.core - CRITICAL - Failed to deserialize. But this happened only when triggering the deployment, if I ran the flow locally it worked fine, for example

if __name__ == "__main__":
    input_ids = list(range(100))
    main_execute_flow(input_ids)

This worked fine. But if I trigger the deployment run from UI or with an HTTP request, it will cause this error.

Workaround

Including current working directory to the system path fixed the problem because it might help the script locate local modules (of course, doesn't sound like a proper solution)

cwd = os.getcwd()
sys.path.insert(0, cwd)

Debugs

  • os.listdir(".") showed that the local modules/files did exist
  • The task can open and read a .txt file located at the exact same location as local module

Full error trace (using 2 processes)

Error trace

prefect_worker-1  | 08:04:03.093 | INFO    | Flow run 'run-test-from-jupyter' - Waiting all processes to finish...
  0%|          | 0/2 [00:00<?, ?it/s]2024-12-17 08:04:04,192 - distributed.protocol.core - CRITICAL - Failed to deserialize
prefect_worker-1  | Traceback (most recent call last):
prefect_worker-1  |   File "/usr/local/lib/python3.10/site-packages/distributed/protocol/core.py", line 175, in loads
prefect_worker-1  |     return msgpack.loads(
prefect_worker-1  |   File "msgpack/_unpacker.pyx", line 194, in msgpack._cmsgpack.unpackb
prefect_worker-1  |   File "/usr/local/lib/python3.10/site-packages/distributed/protocol/core.py", line 172, in _decode_default
prefect_worker-1  |     return pickle.loads(sub_header["pickled-obj"], buffers=sub_frames)
prefect_worker-1  |   File "/usr/local/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 92, in loads
prefect_worker-1  |     return pickle.loads(x)
prefect_worker-1  | ModuleNotFoundError: No module named 'utils'
prefect_worker-1  | 08:04:04.193 | INFO    | distributed.core - Connection to tcp://172.19.0.2:38400 has been closed.
prefect_worker-1  | 08:04:04.193 | INFO    | distributed.scheduler - Remove worker <WorkerState 'tcp://172.19.0.2:37795', name: 0, status: running, memory: 0, processing: 1> (stimulus_id='handle-worker-cleanup-1734422644.1938918')
prefect_worker-1  | 2024-12-17 08:04:04,196 - distributed.protocol.core - CRITICAL - Failed to deserialize
prefect_worker-1  | Traceback (most recent call last):
prefect_worker-1  |   File "/usr/local/lib/python3.10/site-packages/distributed/protocol/core.py", line 175, in loads
prefect_worker-1  |     return msgpack.loads(
prefect_worker-1  |   File "msgpack/_unpacker.pyx", line 194, in msgpack._cmsgpack.unpackb
prefect_worker-1  |   File "/usr/local/lib/python3.10/site-packages/distributed/protocol/core.py", line 172, in _decode_default
prefect_worker-1  |     return pickle.loads(sub_header["pickled-obj"], buffers=sub_frames)
prefect_worker-1  |   File "/usr/local/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 92, in loads
prefect_worker-1  |     return pickle.loads(x)
prefect_worker-1  | ModuleNotFoundError: No module named 'utils'
prefect_worker-1  | 08:04:04.199 | INFO    | distributed.nanny - Closing Nanny gracefully at 'tcp://172.19.0.2:43335'. Reason: worker-handle-scheduler-connection-broken
prefect_worker-1  | 2024-12-17 08:04:04,200 - distributed.worker - ERROR - Failed to communicate with scheduler during heartbeat.
prefect_worker-1  | Traceback (most recent call last):
prefect_worker-1  |   File "/usr/local/lib/python3.10/site-packages/distributed/core.py", line 1425, in _connect
prefect_worker-1  |     comm = await connect(
prefect_worker-1  |   File "/usr/local/lib/python3.10/site-packages/distributed/comm/core.py", line 377, in connect
prefect_worker-1  |     handshake = await comm.read()
prefect_worker-1  |   File "/usr/local/lib/python3.10/site-packages/distributed/comm/tcp.py", line 225, in read
prefect_worker-1  |     frames_nosplit_nbytes_bin = await stream.read_bytes(fmt_size)
prefect_worker-1  | asyncio.exceptions.CancelledError
prefect_worker-1  | 
prefect_worker-1  | During handling of the above exception, another exception occurred:
prefect_worker-1  | 
prefect_worker-1  | Traceback (most recent call last):
prefect_worker-1  |   File "/usr/local/lib/python3.10/site-packages/distributed/core.py", line 1535, in connect
prefect_worker-1  |     return connect_attempt.result()
prefect_worker-1  | asyncio.exceptions.CancelledError
prefect_worker-1  | 
prefect_worker-1  | During handling of the above exception, another exception occurred:
prefect_worker-1  | 
prefect_worker-1  | Traceback (most recent call last):
prefect_worker-1  |   File "/usr/local/lib/python3.10/site-packages/distributed/worker.py", line 1250, in heartbeat
prefect_worker-1  |     response = await retry_operation(
prefect_worker-1  |   File "/usr/local/lib/python3.10/site-packages/distributed/utils_comm.py", line 459, in retry_operation
prefect_worker-1  |     return await retry(
prefect_worker-1  |   File "/usr/local/lib/python3.10/site-packages/distributed/utils_comm.py", line 438, in retry
prefect_worker-1  |     return await coro()
prefect_worker-1  |   File "/usr/local/lib/python3.10/site-packages/distributed/core.py", line 1253, in send_recv_from_rpc
prefect_worker-1  |     comm = await self.pool.connect(self.addr)
prefect_worker-1  |   File "/usr/local/lib/python3.10/site-packages/distributed/core.py", line 1537, in connect
prefect_worker-1  |     raise CommClosedError(reason)
prefect_worker-1  | distributed.comm.core.CommClosedError: ConnectionPool closing.
prefect_worker-1  | 08:04:04.201 | INFO    | distributed.core - Connection to tcp://172.19.0.2:38410 has been closed.
prefect_worker-1  | 08:04:04.201 | INFO    | distributed.scheduler - Remove worker <WorkerState 'tcp://172.19.0.2:34047', name: 1, status: running, memory: 0, processing: 2> (stimulus_id='handle-worker-cleanup-1734422644.2017796')
prefect_worker-1  | 08:04:04.202 | INFO    | distributed.scheduler - Lost all workers
prefect_worker-1  | 2024-12-17 08:04:04,203 - tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOMainLoop object at 0x780ed2bc2d40>>, <Task finished name='Task-5' coro=<Worker.handle_scheduler() done, defined at /usr/local/lib/python3.10/site-packages/distributed/worker.py:202> exception=ModuleNotFoundError("No module named 'utils'")>)
prefect_worker-1  | Traceback (most recent call last):
prefect_worker-1  |   File "/usr/local/lib/python3.10/site-packages/tornado/ioloop.py", line 750, in _run_callback
prefect_worker-1  |     ret = callback()
prefect_worker-1  |   File "/usr/local/lib/python3.10/site-packages/tornado/ioloop.py", line 774, in _discard_future_result
prefect_worker-1  |     future.result()
prefect_worker-1  |   File "/usr/local/lib/python3.10/site-packages/distributed/worker.py", line 205, in wrapper
prefect_worker-1  |     return await method(self, *args, **kwargs)  # type: ignore
prefect_worker-1  |   File "/usr/local/lib/python3.10/site-packages/distributed/worker.py", line 1300, in handle_scheduler
prefect_worker-1  |     await self.handle_stream(comm)
prefect_worker-1  |   File "/usr/local/lib/python3.10/site-packages/distributed/core.py", line 886, in handle_stream
prefect_worker-1  |     msgs = await comm.read()
prefect_worker-1  |   File "/usr/local/lib/python3.10/site-packages/distributed/comm/tcp.py", line 247, in read
prefect_worker-1  |     msg = await from_frames(
prefect_worker-1  |   File "/usr/local/lib/python3.10/site-packages/distributed/comm/utils.py", line 78, in from_frames
prefect_worker-1  |     res = _from_frames()
prefect_worker-1  |   File "/usr/local/lib/python3.10/site-packages/distributed/comm/utils.py", line 61, in _from_frames
prefect_worker-1  |     return protocol.loads(
prefect_worker-1  |   File "/usr/local/lib/python3.10/site-packages/distributed/protocol/core.py", line 175, in loads
prefect_worker-1  |     return msgpack.loads(
prefect_worker-1  |   File "msgpack/_unpacker.pyx", line 194, in msgpack._cmsgpack.unpackb
prefect_worker-1  |   File "/usr/local/lib/python3.10/site-packages/distributed/protocol/core.py", line 172, in _decode_default
prefect_worker-1  |     return pickle.loads(sub_header["pickled-obj"], buffers=sub_frames)
prefect_worker-1  |   File "/usr/local/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 92, in loads
prefect_worker-1  |     return pickle.loads(x)
prefect_worker-1  | ModuleNotFoundError: No module named 'utils'
prefect_worker-1  | unhandled exception during asyncio.run() shutdown
prefect_worker-1  | task: <Task finished name='Task-5' coro=<Worker.handle_scheduler() done, defined at /usr/local/lib/python3.10/site-packages/distributed/worker.py:202> exception=ModuleNotFoundError("No module named 'utils'")>
prefect_worker-1  | Traceback (most recent call last):
prefect_worker-1  |   File "/usr/local/lib/python3.10/site-packages/tornado/ioloop.py", line 750, in _run_callback
prefect_worker-1  |     ret = callback()
prefect_worker-1  |   File "/usr/local/lib/python3.10/site-packages/tornado/ioloop.py", line 774, in _discard_future_result
prefect_worker-1  |     future.result()
prefect_worker-1  |   File "/usr/local/lib/python3.10/site-packages/distributed/worker.py", line 205, in wrapper
prefect_worker-1  |     return await method(self, *args, **kwargs)  # type: ignore
prefect_worker-1  |   File "/usr/local/lib/python3.10/site-packages/distributed/worker.py", line 1300, in handle_scheduler
prefect_worker-1  |     await self.handle_stream(comm)
prefect_worker-1  |   File "/usr/local/lib/python3.10/site-packages/distributed/core.py", line 886, in handle_stream
prefect_worker-1  |     msgs = await comm.read()
prefect_worker-1  |   File "/usr/local/lib/python3.10/site-packages/distributed/comm/tcp.py", line 247, in read
prefect_worker-1  |     msg = await from_frames(
prefect_worker-1  |   File "/usr/local/lib/python3.10/site-packages/distributed/comm/utils.py", line 78, in from_frames
prefect_worker-1  |     res = _from_frames()
prefect_worker-1  |   File "/usr/local/lib/python3.10/site-packages/distributed/comm/utils.py", line 61, in _from_frames
prefect_worker-1  |     return protocol.loads(
prefect_worker-1  |   File "/usr/local/lib/python3.10/site-packages/distributed/protocol/core.py", line 175, in loads
prefect_worker-1  |     return msgpack.loads(
prefect_worker-1  |   File "msgpack/_unpacker.pyx", line 194, in msgpack._cmsgpack.unpackb
prefect_worker-1  |   File "/usr/local/lib/python3.10/site-packages/distributed/protocol/core.py", line 172, in _decode_default
prefect_worker-1  |     return pickle.loads(sub_header["pickled-obj"], buffers=sub_frames)
prefect_worker-1  |   File "/usr/local/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 92, in loads
prefect_worker-1  |     return pickle.loads(x)
prefect_worker-1  | ModuleNotFoundError: No module named 'utils'
prefect_worker-1  | 08:04:04.204 | WARNING | distributed.scheduler - Received heartbeat from unregistered worker 'tcp://172.19.0.2:34047'.
prefect_worker-1  | 2024-12-17 08:04:04,205 - distributed.worker - WARNING - Scheduler was unaware of this worker; shutting down.
prefect_worker-1  | 08:04:04.205 | INFO    | distributed.nanny - Closing Nanny gracefully at 'tcp://172.19.0.2:38137'. Reason: worker-handle-scheduler-connection-broken
prefect_worker-1  | 2024-12-17 08:04:04,208 - tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOMainLoop object at 0x7e753a0c2d40>>, <Task finished name='Task-5' coro=<Worker.handle_scheduler() done, defined at /usr/local/lib/python3.10/site-packages/distributed/worker.py:202> exception=ModuleNotFoundError("No module named 'utils'")>)
prefect_worker-1  | Traceback (most recent call last):
prefect_worker-1  |   File "/usr/local/lib/python3.10/site-packages/tornado/ioloop.py", line 750, in _run_callback
prefect_worker-1  |     ret = callback()
prefect_worker-1  |   File "/usr/local/lib/python3.10/site-packages/tornado/ioloop.py", line 774, in _discard_future_result
prefect_worker-1  |     future.result()
prefect_worker-1  |   File "/usr/local/lib/python3.10/site-packages/distributed/worker.py", line 205, in wrapper
prefect_worker-1  |     return await method(self, *args, **kwargs)  # type: ignore
prefect_worker-1  |   File "/usr/local/lib/python3.10/site-packages/distributed/worker.py", line 1300, in handle_scheduler
prefect_worker-1  |     await self.handle_stream(comm)
prefect_worker-1  |   File "/usr/local/lib/python3.10/site-packages/distributed/core.py", line 886, in handle_stream
prefect_worker-1  |     msgs = await comm.read()
prefect_worker-1  |   File "/usr/local/lib/python3.10/site-packages/distributed/comm/tcp.py", line 247, in read
prefect_worker-1  |     msg = await from_frames(
prefect_worker-1  |   File "/usr/local/lib/python3.10/site-packages/distributed/comm/utils.py", line 78, in from_frames
prefect_worker-1  |     res = _from_frames()
prefect_worker-1  |   File "/usr/local/lib/python3.10/site-packages/distributed/comm/utils.py", line 61, in _from_frames
prefect_worker-1  |     return protocol.loads(
prefect_worker-1  |   File "/usr/local/lib/python3.10/site-packages/distributed/protocol/core.py", line 175, in loads
prefect_worker-1  |     return msgpack.loads(
prefect_worker-1  |   File "msgpack/_unpacker.pyx", line 194, in msgpack._cmsgpack.unpackb
prefect_worker-1  |   File "/usr/local/lib/python3.10/site-packages/distributed/protocol/core.py", line 172, in _decode_default
prefect_worker-1  |     return pickle.loads(sub_header["pickled-obj"], buffers=sub_frames)
prefect_worker-1  |   File "/usr/local/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 92, in loads
prefect_worker-1  |     return pickle.loads(x)
prefect_worker-1  | ModuleNotFoundError: No module named 'utils'
prefect_worker-1  | unhandled exception during asyncio.run() shutdown
prefect_worker-1  | task: <Task finished name='Task-5' coro=<Worker.handle_scheduler() done, defined at /usr/local/lib/python3.10/site-packages/distributed/worker.py:202> exception=ModuleNotFoundError("No module named 'utils'")>
prefect_worker-1  | Traceback (most recent call last):
prefect_worker-1  |   File "/usr/local/lib/python3.10/site-packages/tornado/ioloop.py", line 750, in _run_callback
prefect_worker-1  |     ret = callback()
prefect_worker-1  |   File "/usr/local/lib/python3.10/site-packages/tornado/ioloop.py", line 774, in _discard_future_result
prefect_worker-1  |     future.result()
prefect_worker-1  |   File "/usr/local/lib/python3.10/site-packages/distributed/worker.py", line 205, in wrapper
prefect_worker-1  |     return await method(self, *args, **kwargs)  # type: ignore
prefect_worker-1  |   File "/usr/local/lib/python3.10/site-packages/distributed/worker.py", line 1300, in handle_scheduler
prefect_worker-1  |     await self.handle_stream(comm)
prefect_worker-1  |   File "/usr/local/lib/python3.10/site-packages/distributed/core.py", line 886, in handle_stream
prefect_worker-1  |     msgs = await comm.read()
prefect_worker-1  |   File "/usr/local/lib/python3.10/site-packages/distributed/comm/tcp.py", line 247, in read
prefect_worker-1  |     msg = await from_frames(
prefect_worker-1  |   File "/usr/local/lib/python3.10/site-packages/distributed/comm/utils.py", line 78, in from_frames
prefect_worker-1  |     res = _from_frames()
prefect_worker-1  |   File "/usr/local/lib/python3.10/site-packages/distributed/comm/utils.py", line 61, in _from_frames
prefect_worker-1  |     return protocol.loads(
prefect_worker-1  |   File "/usr/local/lib/python3.10/site-packages/distributed/protocol/core.py", line 175, in loads
prefect_worker-1  |     return msgpack.loads(
prefect_worker-1  |   File "msgpack/_unpacker.pyx", line 194, in msgpack._cmsgpack.unpackb
prefect_worker-1  |   File "/usr/local/lib/python3.10/site-packages/distributed/protocol/core.py", line 172, in _decode_default
prefect_worker-1  |     return pickle.loads(sub_header["pickled-obj"], buffers=sub_frames)
prefect_worker-1  |   File "/usr/local/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 92, in loads
prefect_worker-1  |     return pickle.loads(x)
prefect_worker-1  | ModuleNotFoundError: No module named 'utils'
prefect_worker-1  | 08:04:06.595 | INFO    | distributed.nanny - Closing Nanny at 'tcp://172.19.0.2:43335'. Reason: nanny-close-gracefully
prefect_worker-1  | 08:04:06.600 | INFO    | distributed.nanny - Closing Nanny at 'tcp://172.19.0.2:38137'. Reason: nanny-close-gracefully

Version info

Version:             3.1.5
API version:         0.8.4
Python version:      3.10.15
Git commit:          3c06654e
Built:               Mon, Dec 2, 2024 6:57 PM
OS/Arch:             linux/x86_64
Profile:             ephemeral
Server type:         server
Pydantic version:    2.9.2
Integrations:
  prefect-dask:      0.3.2
  prefect-github:    0.3.1

Additional context

No response

@jomariya23156 jomariya23156 added the bug Something isn't working label Dec 17, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

1 participant