-
Notifications
You must be signed in to change notification settings - Fork 61
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Handle exceptions in created tasks #163
base: master
Are you sure you want to change the base?
Conversation
Hi @mosquito 👋🏻 I'm not 100% certain this change is the right fix, but I wanted to try and of course you can judge whether or not you think it makes sense. In an application at my job that uses aio-pika, we've seen quite a few instances of Task exceptions not being handled. Typically, they look like Adding this change makes my logs spit out this instead:
One reason I'm not sure if this is a 100% good fix, is that now I get two exception log outputs instead of one, with nearly identical stack traces. They start like this:
I don't know if you can make better sense of this than me, but at least it seems appropriate to raise task exceptions when they happen 🙂 |
Hello, @mosquito. Have you had any time to look at this and see whether you think it makes sense or not? 🙂 |
Oh, at first I answered, then I realized that this is PR. |
diff --git a/aiormq/abc.py b/aiormq/abc.py
index 751385f..7c192ae 100644
--- a/aiormq/abc.py
+++ b/aiormq/abc.py
@@ -27,24 +27,24 @@ ExceptionType = Union[BaseException, Type[BaseException]]
# noinspection PyShadowingNames
class TaskWrapper:
- __slots__ = "exception", "task"
+ __slots__ = "_exception", "task"
- exception: Union[BaseException, Type[BaseException]]
+ _exception: Union[BaseException, Type[BaseException]]
task: asyncio.Task
def __init__(self, task: asyncio.Task):
self.task = task
- self.exception = asyncio.CancelledError
+ self._exception = asyncio.CancelledError
def throw(self, exception: ExceptionType) -> None:
- self.exception = exception
+ self._exception = exception
self.task.cancel()
async def __inner(self) -> Any:
try:
return await self.task
except asyncio.CancelledError as e:
- raise self.exception from e
+ raise self._exception from e
def __await__(self, *args: Any, **kwargs: Any) -> Any:
return self.__inner().__await__() |
Good morning, @mosquito. Thanks for the reply, but I don't think I understand your suggestion... To my eyes, it seems it just renames My premise for this PR is that it's possible for a task (child) to raise an exception, and for that exception to not be propagated to the awaiting task (parent), because the child task is not raised in the done_callback. |
The main idea is the |
@mosquito Ah, so you mean with that patch, I could simply add exc = future.exception()
if exc is not None:
raise exc and skip the |
please rebase this against master branch |
60345ba
to
a38c064
Compare
@mosquito rebased now ✅ |
aiormq/base.py
Outdated
@@ -35,6 +35,10 @@ def remover(*_: Any) -> None: | |||
if future in self.futures: | |||
self.futures.remove(future) | |||
|
|||
exc = future.exception() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is asyncio.CancelledError
exception not handled here by intention?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No that was not by intention (I was just not aware this needed any special handling). I tried adding a workaround there now.
@torarvid could you please double check this on latest release? |
When create_task is called, the done_callback should check whether or not an exception was raised, and if so, re-raise it.
a38c064
to
fffa397
Compare
@mosquito Well, I have the problem that I don't know how 😬 🙈 We have seen some issues in production that occur if there is ever "a blip in the network", but we have not been able to reproduce these issues locally on developer machines. So this PR is not the result of me "finding a specific bug", but rather that we've seen cases — both in our own code and in the code of libraries that we use — that code sometimes call I consider it a good rule-of-thumb that "if one creates a task t = asyncio.create_task(some_coroutine)
# ...
await t # <-- crucial step ... then it's good hygiene to always add a done_callback where any exception from I don't consider myself an expert in the asyncio domain though, so I'm interested to hear if there are other arguments here that are relevant 🙂 |
@torarvid I'm glad to know that in your production it does no harm. However, to be honest, we have cases in our production when connections are broken, the network is generally not a very reliable thing, and I have been struggling with problems of this kind for several releases. It gets better every release, of course, and I encourage you to try updating to the latest version of the library. |
@mosquito yeah, we have noticed improved resilience towards network blips with the later releases, so kudos for all the recent improvements 🎉 |
@torarvid I think these changes break something. Somewhere it's important to get a CanceledError and this is a reason when tests has been broken. But what exactly I don't understand yet. |
@mosquito that could be true, but remember: before I made the most recent change today, only a single commit was in the PR, and it didn't have any special handling of CancelledError, so it could possibly be something else? |
@mosquito I found out some details by hacking ~/dev/aiormq on check-for-task-exceptions [!] via 🐍 pyenv 3.11.1
15:02:49 ❯ poetry run pytest -vv tests/test_connection.py -k test_no_free_channels -s -rP
======================================================================================= test session starts ========================================================================================
platform darwin -- Python 3.11.1, pytest-7.2.0, pluggy-1.0.0 -- /Users/tor/dev/aiormq/.venv/bin/python
cachedir: .pytest_cache
rootdir: /Users/tor/dev/aiormq
plugins: rst-0.0.7, cov-4.0.0, aiomisc-16.2.10, pylama-8.4.1
collected 237 items / 233 deselected / 4 selected
tests/test_connection.py::test_no_free_channels[amqp] Unexpected connection close from remote "amqp://guest:******@localhost:5672/", Connection.Close(reply_code=530, reply_text='NOT_ALLOWED - number of channels opened (2) has reached the negotiated channel_max (2)')
NoneType: None
Exception in callback <function FutureStore.__on_task_done.<locals>.remover at 0x1092eb4c0>
handle: <Handle FutureStore.__on_task_done.<locals>.remover>
Traceback (most recent call last):
File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
File "/Users/tor/dev/aiormq/aiormq/base.py", line 41, in remover
raise exc
File "/Users/tor/dev/aiormq/aiormq/connection.py", line 624, in __reader
await handler(frame)
File "/Users/tor/dev/aiormq/aiormq/connection.py", line 557, in __handle_close
raise exception
aiormq.exceptions.ConnectionNotAllowed: NOT_ALLOWED - number of channels opened (2) has reached the negotiated channel_max (2)
Exception in callback <function FutureStore.__on_task_done.<locals>.remover at 0x1092eb600>
handle: <Handle FutureStore.__on_task_done.<locals>.remover>
Traceback (most recent call last):
File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
File "/Users/tor/dev/aiormq/aiormq/base.py", line 41, in remover
raise exc
File "/usr/local/Cellar/python@3.11/3.11.1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/tasks.py", line 684, in _wrap_awaitable
return (yield from awaitable.__await__())
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/tor/dev/aiormq/aiormq/abc.py", line 44, in __inner
return await self.task
^^^^^^^^^^^^^^^
File "/Users/tor/dev/aiormq/aiormq/connection.py", line 624, in __reader
await handler(frame)
File "/Users/tor/dev/aiormq/aiormq/connection.py", line 557, in __handle_close
raise exception
aiormq.exceptions.ConnectionNotAllowed: NOT_ALLOWED - number of channels opened (2) has reached the negotiated channel_max (2)
Exception in callback <function FutureStore.__on_task_done.<locals>.remover at 0x1092ebd80>
handle: <Handle FutureStore.__on_task_done.<locals>.remover>
Traceback (most recent call last):
File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
File "/Users/tor/dev/aiormq/aiormq/base.py", line 41, in remover
raise exc
File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
File "/Users/tor/dev/aiormq/aiormq/base.py", line 41, in remover
raise exc
File "/usr/local/Cellar/python@3.11/3.11.1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/tasks.py", line 684, in _wrap_awaitable
return (yield from awaitable.__await__())
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/tor/dev/aiormq/aiormq/abc.py", line 44, in __inner
return await self.task
^^^^^^^^^^^^^^^
File "/Users/tor/dev/aiormq/aiormq/connection.py", line 624, in __reader
await handler(frame)
File "/Users/tor/dev/aiormq/aiormq/connection.py", line 557, in __handle_close
raise exception
aiormq.exceptions.ConnectionNotAllowed: NOT_ALLOWED - number of channels opened (2) has reached the negotiated channel_max (2)
Exception in callback <function FutureStore.__on_task_done.<locals>.remover at 0x1092e9bc0>
handle: <Handle FutureStore.__on_task_done.<locals>.remover>
Traceback (most recent call last):
File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
File "/Users/tor/dev/aiormq/aiormq/base.py", line 41, in remover
raise exc
File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
File "/Users/tor/dev/aiormq/aiormq/base.py", line 41, in remover
raise exc
File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
File "/Users/tor/dev/aiormq/aiormq/base.py", line 41, in remover
raise exc
File "/usr/local/Cellar/python@3.11/3.11.1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/tasks.py", line 684, in _wrap_awaitable
return (yield from awaitable.__await__())
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/tor/dev/aiormq/aiormq/abc.py", line 44, in __inner
return await self.task
^^^^^^^^^^^^^^^
File "/Users/tor/dev/aiormq/aiormq/connection.py", line 624, in __reader
await handler(frame)
File "/Users/tor/dev/aiormq/aiormq/connection.py", line 557, in __handle_close
raise exception
aiormq.exceptions.ConnectionNotAllowed: NOT_ALLOWED - number of channels opened (2) has reached the negotiated channel_max (2)
Exception in callback <function FutureStore.__on_task_done.<locals>.remover at 0x1092eb740>
handle: <Handle FutureStore.__on_task_done.<locals>.remover>
Traceback (most recent call last):
File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
File "/Users/tor/dev/aiormq/aiormq/base.py", line 41, in remover
raise exc
File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
File "/Users/tor/dev/aiormq/aiormq/base.py", line 41, in remover
raise exc
File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
File "/Users/tor/dev/aiormq/aiormq/base.py", line 41, in remover
raise exc
File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
File "/Users/tor/dev/aiormq/aiormq/base.py", line 41, in remover
raise exc
File "/usr/local/Cellar/python@3.11/3.11.1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/tasks.py", line 684, in _wrap_awaitable
return (yield from awaitable.__await__())
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/tor/dev/aiormq/aiormq/abc.py", line 44, in __inner
return await self.task
^^^^^^^^^^^^^^^
File "/Users/tor/dev/aiormq/aiormq/connection.py", line 624, in __reader
await handler(frame)
File "/Users/tor/dev/aiormq/aiormq/connection.py", line 557, in __handle_close
raise exception
aiormq.exceptions.ConnectionNotAllowed: NOT_ALLOWED - number of channels opened (2) has reached the negotiated channel_max (2)
Exception in callback <function FutureStore.__on_task_done.<locals>.remover at 0x1092ebba0>
handle: <Handle FutureStore.__on_task_done.<locals>.remover>
Traceback (most recent call last):
File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
File "/Users/tor/dev/aiormq/aiormq/base.py", line 41, in remover
raise exc
File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
File "/Users/tor/dev/aiormq/aiormq/base.py", line 41, in remover
raise exc
File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
File "/Users/tor/dev/aiormq/aiormq/base.py", line 41, in remover
raise exc
File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
File "/Users/tor/dev/aiormq/aiormq/base.py", line 41, in remover
raise exc
File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
File "/Users/tor/dev/aiormq/aiormq/base.py", line 41, in remover
raise exc
File "/usr/local/Cellar/python@3.11/3.11.1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/tasks.py", line 684, in _wrap_awaitable
return (yield from awaitable.__await__())
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/tor/dev/aiormq/aiormq/abc.py", line 44, in __inner
return await self.task
^^^^^^^^^^^^^^^
File "/Users/tor/dev/aiormq/aiormq/connection.py", line 624, in __reader
await handler(frame)
File "/Users/tor/dev/aiormq/aiormq/connection.py", line 557, in __handle_close
raise exception
aiormq.exceptions.ConnectionNotAllowed: NOT_ALLOWED - number of channels opened (2) has reached the negotiated channel_max (2)
Exception in callback <function FutureStore.__on_task_done.<locals>.remover at 0x1092eb7e0>
handle: <Handle FutureStore.__on_task_done.<locals>.remover>
Traceback (most recent call last):
File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
File "/Users/tor/dev/aiormq/aiormq/base.py", line 41, in remover
raise exc
File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
File "/Users/tor/dev/aiormq/aiormq/base.py", line 41, in remover
raise exc
File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
File "/Users/tor/dev/aiormq/aiormq/base.py", line 41, in remover
raise exc
File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
File "/Users/tor/dev/aiormq/aiormq/base.py", line 41, in remover
raise exc
File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
File "/Users/tor/dev/aiormq/aiormq/base.py", line 41, in remover
raise exc
File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
File "/Users/tor/dev/aiormq/aiormq/base.py", line 41, in remover
raise exc
File "/usr/local/Cellar/python@3.11/3.11.1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/tasks.py", line 684, in _wrap_awaitable
return (yield from awaitable.__await__())
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/tor/dev/aiormq/aiormq/abc.py", line 44, in __inner
return await self.task
^^^^^^^^^^^^^^^
File "/Users/tor/dev/aiormq/aiormq/connection.py", line 624, in __reader
await handler(frame)
File "/Users/tor/dev/aiormq/aiormq/connection.py", line 557, in __handle_close
raise exception
aiormq.exceptions.ConnectionNotAllowed: NOT_ALLOWED - number of channels opened (2) has reached the negotiated channel_max (2)
Exception in callback <function FutureStore.__on_task_done.<locals>.remover at 0x1092eb880>
handle: <Handle FutureStore.__on_task_done.<locals>.remover>
Traceback (most recent call last):
File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
File "/Users/tor/dev/aiormq/aiormq/base.py", line 41, in remover
raise exc
File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
File "/Users/tor/dev/aiormq/aiormq/base.py", line 41, in remover
raise exc
File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
File "/Users/tor/dev/aiormq/aiormq/base.py", line 41, in remover
raise exc
File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
File "/Users/tor/dev/aiormq/aiormq/base.py", line 41, in remover
raise exc
File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
File "/Users/tor/dev/aiormq/aiormq/base.py", line 41, in remover
raise exc
File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
File "/Users/tor/dev/aiormq/aiormq/base.py", line 41, in remover
raise exc
File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
File "/Users/tor/dev/aiormq/aiormq/base.py", line 41, in remover
raise exc
File "/usr/local/Cellar/python@3.11/3.11.1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/tasks.py", line 684, in _wrap_awaitable
return (yield from awaitable.__await__())
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/tor/dev/aiormq/aiormq/abc.py", line 44, in __inner
return await self.task
^^^^^^^^^^^^^^^
File "/Users/tor/dev/aiormq/aiormq/connection.py", line 624, in __reader
await handler(frame)
File "/Users/tor/dev/aiormq/aiormq/connection.py", line 557, in __handle_close
raise exception
aiormq.exceptions.ConnectionNotAllowed: NOT_ALLOWED - number of channels opened (2) has reached the negotiated channel_max (2)
PASSED |
@torarvid If I correctly understood your idea, then the result should be obtained only from tasks. |
@mosquito I think my idea is mostly that we should (both for Futures and Tasks) ensure that if the caller does not await the future, we don't swallow exceptions. And as far as I know, the only way to do that is to have a done_callback that checks whether or not there are any exceptions. |
@mosquito I see that there is a FutureStore instance, and it also has a parent FutureStore. It looks like a single future could be added both to the child and the parent FutureStore (possibly more than 2 as well in theory, as long as I don't understand the code too well, but it feels like the concept of "remove the future from the self.futures set" (you probably want to do this through the whole chain of futurestore.parent.parent.parent...) is mixed up with "ensuring coroutine exceptions are raised to the caller" (you probably want to do this once regardless of the number of future stores in the .parent.parent... chain) |
I did a bit more debugging.
(My command line: |
When create_task is called, the done_callback should check whether or not an exception was raised, and if so, re-raise it.