You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
In the pull section of the deployment, I use git_clone and follow by pip_install_requirements.
You can find the simplified version of this problem at this repo https://github.com/jomariya23156/prefect-mp-test. I included everything e.g. flow code, Dockerfile of the worker, prefect.yaml, etc.
Description:
When running a flow with DaskTaskRunner and submitting tasks using .submit like this
If the task, in this case parallel_execute, uses imported functions from a local module or .py, it will raise ModuleNotFoundError and show this message distributed.protocol.core - CRITICAL - Failed to deserialize. But this happened only when triggering the deployment, if I ran the flow locally it worked fine, for example
if __name__ == "__main__":
input_ids = list(range(100))
main_execute_flow(input_ids)
This worked fine. But if I trigger the deployment run from UI or with an HTTP request, it will cause this error.
Workaround
Including current working directory to the system path fixed the problem because it might help the script locate local modules (of course, doesn't sound like a proper solution)
cwd = os.getcwd()
sys.path.insert(0, cwd)
Debugs
os.listdir(".") showed that the local modules/files did exist
The task can open and read a .txt file located at the exact same location as local module
Full error trace (using 2 processes)
Error trace
prefect_worker-1 | 08:04:03.093 | INFO | Flow run 'run-test-from-jupyter' - Waiting all processes to finish...
0%| | 0/2 [00:00<?, ?it/s]2024-12-17 08:04:04,192 - distributed.protocol.core - CRITICAL - Failed to deserialize
prefect_worker-1 | Traceback (most recent call last):
prefect_worker-1 | File "/usr/local/lib/python3.10/site-packages/distributed/protocol/core.py", line 175, in loads
prefect_worker-1 | return msgpack.loads(
prefect_worker-1 | File "msgpack/_unpacker.pyx", line 194, in msgpack._cmsgpack.unpackb
prefect_worker-1 | File "/usr/local/lib/python3.10/site-packages/distributed/protocol/core.py", line 172, in _decode_default
prefect_worker-1 | return pickle.loads(sub_header["pickled-obj"], buffers=sub_frames)
prefect_worker-1 | File "/usr/local/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 92, in loads
prefect_worker-1 | return pickle.loads(x)
prefect_worker-1 | ModuleNotFoundError: No module named 'utils'
prefect_worker-1 | 08:04:04.193 | INFO | distributed.core - Connection to tcp://172.19.0.2:38400 has been closed.
prefect_worker-1 | 08:04:04.193 | INFO | distributed.scheduler - Remove worker <WorkerState 'tcp://172.19.0.2:37795', name: 0, status: running, memory: 0, processing: 1> (stimulus_id='handle-worker-cleanup-1734422644.1938918')
prefect_worker-1 | 2024-12-17 08:04:04,196 - distributed.protocol.core - CRITICAL - Failed to deserialize
prefect_worker-1 | Traceback (most recent call last):
prefect_worker-1 | File "/usr/local/lib/python3.10/site-packages/distributed/protocol/core.py", line 175, in loads
prefect_worker-1 | return msgpack.loads(
prefect_worker-1 | File "msgpack/_unpacker.pyx", line 194, in msgpack._cmsgpack.unpackb
prefect_worker-1 | File "/usr/local/lib/python3.10/site-packages/distributed/protocol/core.py", line 172, in _decode_default
prefect_worker-1 | return pickle.loads(sub_header["pickled-obj"], buffers=sub_frames)
prefect_worker-1 | File "/usr/local/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 92, in loads
prefect_worker-1 | return pickle.loads(x)
prefect_worker-1 | ModuleNotFoundError: No module named 'utils'
prefect_worker-1 | 08:04:04.199 | INFO | distributed.nanny - Closing Nanny gracefully at 'tcp://172.19.0.2:43335'. Reason: worker-handle-scheduler-connection-broken
prefect_worker-1 | 2024-12-17 08:04:04,200 - distributed.worker - ERROR - Failed to communicate with scheduler during heartbeat.
prefect_worker-1 | Traceback (most recent call last):
prefect_worker-1 | File "/usr/local/lib/python3.10/site-packages/distributed/core.py", line 1425, in _connect
prefect_worker-1 | comm = await connect(
prefect_worker-1 | File "/usr/local/lib/python3.10/site-packages/distributed/comm/core.py", line 377, in connect
prefect_worker-1 | handshake = await comm.read()
prefect_worker-1 | File "/usr/local/lib/python3.10/site-packages/distributed/comm/tcp.py", line 225, in read
prefect_worker-1 | frames_nosplit_nbytes_bin = await stream.read_bytes(fmt_size)
prefect_worker-1 | asyncio.exceptions.CancelledError
prefect_worker-1 |
prefect_worker-1 | During handling of the above exception, another exception occurred:
prefect_worker-1 |
prefect_worker-1 | Traceback (most recent call last):
prefect_worker-1 | File "/usr/local/lib/python3.10/site-packages/distributed/core.py", line 1535, in connect
prefect_worker-1 | return connect_attempt.result()
prefect_worker-1 | asyncio.exceptions.CancelledError
prefect_worker-1 |
prefect_worker-1 | During handling of the above exception, another exception occurred:
prefect_worker-1 |
prefect_worker-1 | Traceback (most recent call last):
prefect_worker-1 | File "/usr/local/lib/python3.10/site-packages/distributed/worker.py", line 1250, in heartbeat
prefect_worker-1 | response = await retry_operation(
prefect_worker-1 | File "/usr/local/lib/python3.10/site-packages/distributed/utils_comm.py", line 459, in retry_operation
prefect_worker-1 | return await retry(
prefect_worker-1 | File "/usr/local/lib/python3.10/site-packages/distributed/utils_comm.py", line 438, in retry
prefect_worker-1 | return await coro()
prefect_worker-1 | File "/usr/local/lib/python3.10/site-packages/distributed/core.py", line 1253, in send_recv_from_rpc
prefect_worker-1 | comm = await self.pool.connect(self.addr)
prefect_worker-1 | File "/usr/local/lib/python3.10/site-packages/distributed/core.py", line 1537, in connect
prefect_worker-1 | raise CommClosedError(reason)
prefect_worker-1 | distributed.comm.core.CommClosedError: ConnectionPool closing.
prefect_worker-1 | 08:04:04.201 | INFO | distributed.core - Connection to tcp://172.19.0.2:38410 has been closed.
prefect_worker-1 | 08:04:04.201 | INFO | distributed.scheduler - Remove worker <WorkerState 'tcp://172.19.0.2:34047', name: 1, status: running, memory: 0, processing: 2> (stimulus_id='handle-worker-cleanup-1734422644.2017796')
prefect_worker-1 | 08:04:04.202 | INFO | distributed.scheduler - Lost all workers
prefect_worker-1 | 2024-12-17 08:04:04,203 - tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOMainLoop object at 0x780ed2bc2d40>>, <Task finished name='Task-5' coro=<Worker.handle_scheduler() done, defined at /usr/local/lib/python3.10/site-packages/distributed/worker.py:202> exception=ModuleNotFoundError("No module named 'utils'")>)
prefect_worker-1 | Traceback (most recent call last):
prefect_worker-1 | File "/usr/local/lib/python3.10/site-packages/tornado/ioloop.py", line 750, in _run_callback
prefect_worker-1 | ret = callback()
prefect_worker-1 | File "/usr/local/lib/python3.10/site-packages/tornado/ioloop.py", line 774, in _discard_future_result
prefect_worker-1 | future.result()
prefect_worker-1 | File "/usr/local/lib/python3.10/site-packages/distributed/worker.py", line 205, in wrapper
prefect_worker-1 | return await method(self, *args, **kwargs) # type: ignore
prefect_worker-1 | File "/usr/local/lib/python3.10/site-packages/distributed/worker.py", line 1300, in handle_scheduler
prefect_worker-1 | await self.handle_stream(comm)
prefect_worker-1 | File "/usr/local/lib/python3.10/site-packages/distributed/core.py", line 886, in handle_stream
prefect_worker-1 | msgs = await comm.read()
prefect_worker-1 | File "/usr/local/lib/python3.10/site-packages/distributed/comm/tcp.py", line 247, in read
prefect_worker-1 | msg = await from_frames(
prefect_worker-1 | File "/usr/local/lib/python3.10/site-packages/distributed/comm/utils.py", line 78, in from_frames
prefect_worker-1 | res = _from_frames()
prefect_worker-1 | File "/usr/local/lib/python3.10/site-packages/distributed/comm/utils.py", line 61, in _from_frames
prefect_worker-1 | return protocol.loads(
prefect_worker-1 | File "/usr/local/lib/python3.10/site-packages/distributed/protocol/core.py", line 175, in loads
prefect_worker-1 | return msgpack.loads(
prefect_worker-1 | File "msgpack/_unpacker.pyx", line 194, in msgpack._cmsgpack.unpackb
prefect_worker-1 | File "/usr/local/lib/python3.10/site-packages/distributed/protocol/core.py", line 172, in _decode_default
prefect_worker-1 | return pickle.loads(sub_header["pickled-obj"], buffers=sub_frames)
prefect_worker-1 | File "/usr/local/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 92, in loads
prefect_worker-1 | return pickle.loads(x)
prefect_worker-1 | ModuleNotFoundError: No module named 'utils'
prefect_worker-1 | unhandled exception during asyncio.run() shutdown
prefect_worker-1 | task: <Task finished name='Task-5' coro=<Worker.handle_scheduler() done, defined at /usr/local/lib/python3.10/site-packages/distributed/worker.py:202> exception=ModuleNotFoundError("No module named 'utils'")>
prefect_worker-1 | Traceback (most recent call last):
prefect_worker-1 | File "/usr/local/lib/python3.10/site-packages/tornado/ioloop.py", line 750, in _run_callback
prefect_worker-1 | ret = callback()
prefect_worker-1 | File "/usr/local/lib/python3.10/site-packages/tornado/ioloop.py", line 774, in _discard_future_result
prefect_worker-1 | future.result()
prefect_worker-1 | File "/usr/local/lib/python3.10/site-packages/distributed/worker.py", line 205, in wrapper
prefect_worker-1 | return await method(self, *args, **kwargs) # type: ignore
prefect_worker-1 | File "/usr/local/lib/python3.10/site-packages/distributed/worker.py", line 1300, in handle_scheduler
prefect_worker-1 | await self.handle_stream(comm)
prefect_worker-1 | File "/usr/local/lib/python3.10/site-packages/distributed/core.py", line 886, in handle_stream
prefect_worker-1 | msgs = await comm.read()
prefect_worker-1 | File "/usr/local/lib/python3.10/site-packages/distributed/comm/tcp.py", line 247, in read
prefect_worker-1 | msg = await from_frames(
prefect_worker-1 | File "/usr/local/lib/python3.10/site-packages/distributed/comm/utils.py", line 78, in from_frames
prefect_worker-1 | res = _from_frames()
prefect_worker-1 | File "/usr/local/lib/python3.10/site-packages/distributed/comm/utils.py", line 61, in _from_frames
prefect_worker-1 | return protocol.loads(
prefect_worker-1 | File "/usr/local/lib/python3.10/site-packages/distributed/protocol/core.py", line 175, in loads
prefect_worker-1 | return msgpack.loads(
prefect_worker-1 | File "msgpack/_unpacker.pyx", line 194, in msgpack._cmsgpack.unpackb
prefect_worker-1 | File "/usr/local/lib/python3.10/site-packages/distributed/protocol/core.py", line 172, in _decode_default
prefect_worker-1 | return pickle.loads(sub_header["pickled-obj"], buffers=sub_frames)
prefect_worker-1 | File "/usr/local/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 92, in loads
prefect_worker-1 | return pickle.loads(x)
prefect_worker-1 | ModuleNotFoundError: No module named 'utils'
prefect_worker-1 | 08:04:04.204 | WARNING | distributed.scheduler - Received heartbeat from unregistered worker 'tcp://172.19.0.2:34047'.
prefect_worker-1 | 2024-12-17 08:04:04,205 - distributed.worker - WARNING - Scheduler was unaware of this worker; shutting down.
prefect_worker-1 | 08:04:04.205 | INFO | distributed.nanny - Closing Nanny gracefully at 'tcp://172.19.0.2:38137'. Reason: worker-handle-scheduler-connection-broken
prefect_worker-1 | 2024-12-17 08:04:04,208 - tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOMainLoop object at 0x7e753a0c2d40>>, <Task finished name='Task-5' coro=<Worker.handle_scheduler() done, defined at /usr/local/lib/python3.10/site-packages/distributed/worker.py:202> exception=ModuleNotFoundError("No module named 'utils'")>)
prefect_worker-1 | Traceback (most recent call last):
prefect_worker-1 | File "/usr/local/lib/python3.10/site-packages/tornado/ioloop.py", line 750, in _run_callback
prefect_worker-1 | ret = callback()
prefect_worker-1 | File "/usr/local/lib/python3.10/site-packages/tornado/ioloop.py", line 774, in _discard_future_result
prefect_worker-1 | future.result()
prefect_worker-1 | File "/usr/local/lib/python3.10/site-packages/distributed/worker.py", line 205, in wrapper
prefect_worker-1 | return await method(self, *args, **kwargs) # type: ignore
prefect_worker-1 | File "/usr/local/lib/python3.10/site-packages/distributed/worker.py", line 1300, in handle_scheduler
prefect_worker-1 | await self.handle_stream(comm)
prefect_worker-1 | File "/usr/local/lib/python3.10/site-packages/distributed/core.py", line 886, in handle_stream
prefect_worker-1 | msgs = await comm.read()
prefect_worker-1 | File "/usr/local/lib/python3.10/site-packages/distributed/comm/tcp.py", line 247, in read
prefect_worker-1 | msg = await from_frames(
prefect_worker-1 | File "/usr/local/lib/python3.10/site-packages/distributed/comm/utils.py", line 78, in from_frames
prefect_worker-1 | res = _from_frames()
prefect_worker-1 | File "/usr/local/lib/python3.10/site-packages/distributed/comm/utils.py", line 61, in _from_frames
prefect_worker-1 | return protocol.loads(
prefect_worker-1 | File "/usr/local/lib/python3.10/site-packages/distributed/protocol/core.py", line 175, in loads
prefect_worker-1 | return msgpack.loads(
prefect_worker-1 | File "msgpack/_unpacker.pyx", line 194, in msgpack._cmsgpack.unpackb
prefect_worker-1 | File "/usr/local/lib/python3.10/site-packages/distributed/protocol/core.py", line 172, in _decode_default
prefect_worker-1 | return pickle.loads(sub_header["pickled-obj"], buffers=sub_frames)
prefect_worker-1 | File "/usr/local/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 92, in loads
prefect_worker-1 | return pickle.loads(x)
prefect_worker-1 | ModuleNotFoundError: No module named 'utils'
prefect_worker-1 | unhandled exception during asyncio.run() shutdown
prefect_worker-1 | task: <Task finished name='Task-5' coro=<Worker.handle_scheduler() done, defined at /usr/local/lib/python3.10/site-packages/distributed/worker.py:202> exception=ModuleNotFoundError("No module named 'utils'")>
prefect_worker-1 | Traceback (most recent call last):
prefect_worker-1 | File "/usr/local/lib/python3.10/site-packages/tornado/ioloop.py", line 750, in _run_callback
prefect_worker-1 | ret = callback()
prefect_worker-1 | File "/usr/local/lib/python3.10/site-packages/tornado/ioloop.py", line 774, in _discard_future_result
prefect_worker-1 | future.result()
prefect_worker-1 | File "/usr/local/lib/python3.10/site-packages/distributed/worker.py", line 205, in wrapper
prefect_worker-1 | return await method(self, *args, **kwargs) # type: ignore
prefect_worker-1 | File "/usr/local/lib/python3.10/site-packages/distributed/worker.py", line 1300, in handle_scheduler
prefect_worker-1 | await self.handle_stream(comm)
prefect_worker-1 | File "/usr/local/lib/python3.10/site-packages/distributed/core.py", line 886, in handle_stream
prefect_worker-1 | msgs = await comm.read()
prefect_worker-1 | File "/usr/local/lib/python3.10/site-packages/distributed/comm/tcp.py", line 247, in read
prefect_worker-1 | msg = await from_frames(
prefect_worker-1 | File "/usr/local/lib/python3.10/site-packages/distributed/comm/utils.py", line 78, in from_frames
prefect_worker-1 | res = _from_frames()
prefect_worker-1 | File "/usr/local/lib/python3.10/site-packages/distributed/comm/utils.py", line 61, in _from_frames
prefect_worker-1 | return protocol.loads(
prefect_worker-1 | File "/usr/local/lib/python3.10/site-packages/distributed/protocol/core.py", line 175, in loads
prefect_worker-1 | return msgpack.loads(
prefect_worker-1 | File "msgpack/_unpacker.pyx", line 194, in msgpack._cmsgpack.unpackb
prefect_worker-1 | File "/usr/local/lib/python3.10/site-packages/distributed/protocol/core.py", line 172, in _decode_default
prefect_worker-1 | return pickle.loads(sub_header["pickled-obj"], buffers=sub_frames)
prefect_worker-1 | File "/usr/local/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 92, in loads
prefect_worker-1 | return pickle.loads(x)
prefect_worker-1 | ModuleNotFoundError: No module named 'utils'
prefect_worker-1 | 08:04:06.595 | INFO | distributed.nanny - Closing Nanny at 'tcp://172.19.0.2:43335'. Reason: nanny-close-gracefully
prefect_worker-1 | 08:04:06.600 | INFO | distributed.nanny - Closing Nanny at 'tcp://172.19.0.2:38137'. Reason: nanny-close-gracefully
Version info
Version: 3.1.5
API version: 0.8.4
Python version: 3.10.15
Git commit: 3c06654e
Built: Mon, Dec 2, 2024 6:57 PM
OS/Arch: linux/x86_64
Profile: ephemeral
Server type: server
Pydantic version: 2.9.2
Integrations:
prefect-dask: 0.3.2
prefect-github: 0.3.1
Additional context
No response
The text was updated successfully, but these errors were encountered:
Bug summary
Setting:
DaskTaskRunner
and.submit
git_clone
and follow bypip_install_requirements
.Description:
When running a flow with
DaskTaskRunner
and submitting tasks using.submit
like thisIf the task, in this case
parallel_execute
, uses imported functions from a local module or.py
, it will raiseModuleNotFoundError
and show this messagedistributed.protocol.core - CRITICAL - Failed to deserialize
. But this happened only when triggering the deployment, if I ran the flow locally it worked fine, for exampleThis worked fine. But if I trigger the deployment run from UI or with an HTTP request, it will cause this error.
Workaround
Including current working directory to the system path fixed the problem because it might help the script locate local modules (of course, doesn't sound like a proper solution)
Debugs
os.listdir(".")
showed that the local modules/files did exist.txt
file located at the exact same location as local moduleFull error trace (using 2 processes)
Version info
Additional context
No response
The text was updated successfully, but these errors were encountered: