From 8e14595cda70c4f39b8d83ee1b138a641de03aed Mon Sep 17 00:00:00 2001 From: Aleksandr Kiselev <62915291+chandr-andr@users.noreply.github.com> Date: Wed, 19 Feb 2025 17:42:36 +0100 Subject: [PATCH] Added new base class for exceptions, added templates (#393) * Added new base class for exceptions, added templates Signed-off-by: chandr-andr (Kiselev Aleksandr) --- poetry.lock | 15 ++++++-- pyproject.toml | 1 + taskiq/brokers/inmemory_broker.py | 4 +-- taskiq/brokers/shared_broker.py | 9 ++--- taskiq/exceptions.py | 59 ++++++++++++++++++++++++++++++- taskiq/funcs.py | 2 +- taskiq/receiver/receiver.py | 4 ++- taskiq/serialization.py | 5 ++- taskiq/task.py | 2 +- 9 files changed, 86 insertions(+), 15 deletions(-) diff --git a/poetry.lock b/poetry.lock index b9c9d173..7e034620 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.5 and should not be changed by hand. [[package]] name = "annotated-types" @@ -497,6 +497,17 @@ files = [ {file = "iniconfig-2.0.0.tar.gz", hash = "sha256:2d91e135bf72d31a410b17c16da610a82cb55f6b0477d1a902134b24a455b8b3"}, ] +[[package]] +name = "izulu" +version = "0.5.4" +description = "The exceptional library" +optional = false +python-versions = ">=3.6" +files = [ + {file = "izulu-0.5.4-py3-none-any.whl", hash = "sha256:6431499a04f68daca0b852dfa5cfbcb7be804166bcdc4efd4dd4e6dd7a3e5898"}, + {file = "izulu-0.5.4.tar.gz", hash = "sha256:a6619402ab3c04ca32bbfb5000138287691e0b47d9794ba55a10af403ed23644"}, +] + [[package]] name = "mock" version = "4.0.3" @@ -1630,4 +1641,4 @@ zmq = ["pyzmq"] [metadata] lock-version = "2.0" python-versions = "^3.8.1" -content-hash = "6e401fe3e65494a0945f525e207303290200068700a8412488f4174df6b83a47" +content-hash = "97c66e7f9707b27a6257d06b208419542114f9933d829a0aa94ca86ea9d239c5" diff --git a/pyproject.toml b/pyproject.toml index 355e69de..a41eec82 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -47,6 +47,7 @@ pytz = "*" orjson = { version = "^3", optional = true } msgpack = { version = "^1.0.7", optional = true } cbor2 = { version = "^5", optional = true } +izulu = "0.5.4" [tool.poetry.dev-dependencies] pytest = "^7.1.2" diff --git a/taskiq/brokers/inmemory_broker.py b/taskiq/brokers/inmemory_broker.py index 23892f3b..3fd2975a 100644 --- a/taskiq/brokers/inmemory_broker.py +++ b/taskiq/brokers/inmemory_broker.py @@ -7,7 +7,7 @@ from taskiq.abc.result_backend import AsyncResultBackend, TaskiqResult from taskiq.depends.progress_tracker import TaskProgress from taskiq.events import TaskiqEvents -from taskiq.exceptions import TaskiqError +from taskiq.exceptions import UnknownTaskError from taskiq.message import BrokerMessage from taskiq.receiver import Receiver from taskiq.utils import maybe_awaitable @@ -156,7 +156,7 @@ async def kick(self, message: BrokerMessage) -> None: """ target_task = self.find_task(message.task_name) if target_task is None: - raise TaskiqError("Unknown task.") + raise UnknownTaskError(task_name=message.task_name) receiver_cb = self.receiver.callback(message=message.message) if self.await_inplace: diff --git a/taskiq/brokers/shared_broker.py b/taskiq/brokers/shared_broker.py index 6dc42139..d6574e60 100644 --- a/taskiq/brokers/shared_broker.py +++ b/taskiq/brokers/shared_broker.py @@ -4,7 +4,7 @@ from taskiq.abc.broker import AsyncBroker from taskiq.decor import AsyncTaskiqDecoratedTask -from taskiq.exceptions import TaskiqError +from taskiq.exceptions import SharedBrokerListenError, SharedBrokerSendTaskError from taskiq.kicker import AsyncKicker from taskiq.message import BrokerMessage @@ -56,10 +56,7 @@ async def kick(self, message: BrokerMessage) -> None: :param message: message to send. :raises TaskiqError: if called. """ - raise TaskiqError( - "You cannot use kiq directly on shared task " - "without setting the default_broker.", - ) + raise SharedBrokerSendTaskError async def listen(self) -> AsyncGenerator[bytes, None]: # type: ignore """ @@ -69,7 +66,7 @@ async def listen(self) -> AsyncGenerator[bytes, None]: # type: ignore :raises TaskiqError: if called. """ - raise TaskiqError("Shared broker cannot listen") + raise SharedBrokerListenError def _register_task( self, diff --git a/taskiq/exceptions.py b/taskiq/exceptions.py index be5e4cc9..671bd373 100644 --- a/taskiq/exceptions.py +++ b/taskiq/exceptions.py @@ -1,46 +1,103 @@ -class TaskiqError(Exception): +from typing import Optional + +from izulu import root + + +class TaskiqError(root.Error): """Base exception for all errors.""" + __template__ = "Exception occurred" + class TaskiqResultTimeoutError(TaskiqError): """Waiting for task results has timed out.""" + __template__ = "Waiting for task results has timed out, timeout={timeout}" + timeout: Optional[float] = None + class BrokerError(TaskiqError): """Base class for all broker errors.""" + __template__ = "Base exception for all broker errors" + + +class ListenError(TaskiqError): + """Error if the broker is unable to listen to the queue.""" + + +class SharedBrokerListenError(ListenError): + """Error when someone tries to listen to the queue with shared broker.""" + + __template__ = "Shared broker cannot listen" + class SendTaskError(BrokerError): """Error if the broker was unable to send the task to the queue.""" + __template__ = "Cannot send task to the queue" + + +class SharedBrokerSendTaskError(SendTaskError): + """Error when someone tries to send task with shared broker.""" + + __template__ = ( + "You cannot use kiq directly on shared task " + "without setting the default_broker." + ) + + +class UnknownTaskError(SendTaskError): + """Error if task is unknown.""" + + __template__ = "Cannot send unknown task to the queue, task name - {task_name}" + task_name: str + class ResultBackendError(TaskiqError): """Base class for all ResultBackend errors.""" + __template__ = "Base exception for all result backend errors" + class ResultGetError(ResultBackendError): """Error if ResultBackend was unable to get result.""" + __template__ = "Cannot get result for the task" + class ResultSetError(ResultBackendError): """Error if ResultBackend was unable to set result.""" + __template__ = "Cannot set result for the task" + class ResultIsReadyError(ResultBackendError): """Error if ResultBackend was unable to find out if the task is ready.""" + __template__ = "Cannot find out if the task is ready" + class SecurityError(TaskiqError): """Security related exception.""" + __template__ = "Security exception occurred: {description}" + description: str + class NoResultError(TaskiqError): """Error if user does not want to set result.""" + __template__ = "User doesn't want to set result" + class TaskRejectedError(TaskiqError): """Task was rejected.""" + __template__ = "Task was rejected" + class ScheduledTaskCancelledError(TaskiqError): """Scheduled task was cancelled and not sent to the queue.""" + + __template__ = "Cannot send scheduled task to the queue." diff --git a/taskiq/funcs.py b/taskiq/funcs.py index 09d32d97..4e49e811 100644 --- a/taskiq/funcs.py +++ b/taskiq/funcs.py @@ -47,7 +47,7 @@ async def check_task(task: AsyncTaskiqTask[Any]) -> None: while task_ids: if 0 < timeout < time() - start_time: - raise TaskiqResultTimeoutError("Timed out") + raise TaskiqResultTimeoutError(timeout=timeout) check_tasks = [] for task in tasks: check_tasks.append(loop.create_task(check_task(task))) diff --git a/taskiq/receiver/receiver.py b/taskiq/receiver/receiver.py index afa5d4c9..41e687a6 100644 --- a/taskiq/receiver/receiver.py +++ b/taskiq/receiver/receiver.py @@ -353,7 +353,9 @@ async def prefetcher( """ fetched_tasks: int = 0 iterator = self.broker.listen() - current_message: asyncio.Task[bytes | AckableMessage] = asyncio.create_task( + current_message: asyncio.Task[ + Union[bytes, AckableMessage] + ] = asyncio.create_task( iterator.__anext__(), # type: ignore ) diff --git a/taskiq/serialization.py b/taskiq/serialization.py index 1aecfed9..cb927c9a 100644 --- a/taskiq/serialization.py +++ b/taskiq/serialization.py @@ -378,7 +378,10 @@ def exception_to_python( if not isinstance(cls, type) or not issubclass(cls, BaseException): fake_exc_type = exc_type if exc_module is None else f"{exc_module}.{exc_type}" raise taskiq.exceptions.SecurityError( - f"Expected an exception class, got {fake_exc_type} with payload {exc_msg}", + description=( + f"Expected an exception class, " + f"got {fake_exc_type} with payload {exc_msg}" + ), ) # XXX: Without verifying `cls` is actually an exception class, diff --git a/taskiq/task.py b/taskiq/task.py index b4d2be61..a66b2d8e 100644 --- a/taskiq/task.py +++ b/taskiq/task.py @@ -151,7 +151,7 @@ async def wait_result( while not await self.is_ready(): await asyncio.sleep(check_interval) if 0 < timeout < time() - start_time: - raise TaskiqResultTimeoutError + raise TaskiqResultTimeoutError(timeout=timeout) return await self.get_result(with_logs=with_logs) async def get_progress(self) -> "Optional[TaskProgress[Any]]":