Skip to content

Commit

Permalink
Added new base class for exceptions, added templates (#393)
Browse files Browse the repository at this point in the history
* Added new base class for exceptions, added templates

Signed-off-by: chandr-andr (Kiselev Aleksandr) <chandr@chandr.net>
  • Loading branch information
chandr-andr authored Feb 19, 2025
1 parent 40f5579 commit 8e14595
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 15 deletions.
15 changes: 13 additions & 2 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pytz = "*"
orjson = { version = "^3", optional = true }
msgpack = { version = "^1.0.7", optional = true }
cbor2 = { version = "^5", optional = true }
izulu = "0.5.4"

[tool.poetry.dev-dependencies]
pytest = "^7.1.2"
Expand Down
4 changes: 2 additions & 2 deletions taskiq/brokers/inmemory_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from taskiq.abc.result_backend import AsyncResultBackend, TaskiqResult
from taskiq.depends.progress_tracker import TaskProgress
from taskiq.events import TaskiqEvents
from taskiq.exceptions import TaskiqError
from taskiq.exceptions import UnknownTaskError
from taskiq.message import BrokerMessage
from taskiq.receiver import Receiver
from taskiq.utils import maybe_awaitable
Expand Down Expand Up @@ -156,7 +156,7 @@ async def kick(self, message: BrokerMessage) -> None:
"""
target_task = self.find_task(message.task_name)
if target_task is None:
raise TaskiqError("Unknown task.")
raise UnknownTaskError(task_name=message.task_name)

receiver_cb = self.receiver.callback(message=message.message)
if self.await_inplace:
Expand Down
9 changes: 3 additions & 6 deletions taskiq/brokers/shared_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from taskiq.abc.broker import AsyncBroker
from taskiq.decor import AsyncTaskiqDecoratedTask
from taskiq.exceptions import TaskiqError
from taskiq.exceptions import SharedBrokerListenError, SharedBrokerSendTaskError
from taskiq.kicker import AsyncKicker
from taskiq.message import BrokerMessage

Expand Down Expand Up @@ -56,10 +56,7 @@ async def kick(self, message: BrokerMessage) -> None:
:param message: message to send.
:raises TaskiqError: if called.
"""
raise TaskiqError(
"You cannot use kiq directly on shared task "
"without setting the default_broker.",
)
raise SharedBrokerSendTaskError

async def listen(self) -> AsyncGenerator[bytes, None]: # type: ignore
"""
Expand All @@ -69,7 +66,7 @@ async def listen(self) -> AsyncGenerator[bytes, None]: # type: ignore
:raises TaskiqError: if called.
"""
raise TaskiqError("Shared broker cannot listen")
raise SharedBrokerListenError

def _register_task(
self,
Expand Down
59 changes: 58 additions & 1 deletion taskiq/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,46 +1,103 @@
class TaskiqError(Exception):
from typing import Optional

from izulu import root


class TaskiqError(root.Error):
"""Base exception for all errors."""

__template__ = "Exception occurred"


class TaskiqResultTimeoutError(TaskiqError):
"""Waiting for task results has timed out."""

__template__ = "Waiting for task results has timed out, timeout={timeout}"
timeout: Optional[float] = None


class BrokerError(TaskiqError):
"""Base class for all broker errors."""

__template__ = "Base exception for all broker errors"


class ListenError(TaskiqError):
"""Error if the broker is unable to listen to the queue."""


class SharedBrokerListenError(ListenError):
"""Error when someone tries to listen to the queue with shared broker."""

__template__ = "Shared broker cannot listen"


class SendTaskError(BrokerError):
"""Error if the broker was unable to send the task to the queue."""

__template__ = "Cannot send task to the queue"


class SharedBrokerSendTaskError(SendTaskError):
"""Error when someone tries to send task with shared broker."""

__template__ = (
"You cannot use kiq directly on shared task "
"without setting the default_broker."
)


class UnknownTaskError(SendTaskError):
"""Error if task is unknown."""

__template__ = "Cannot send unknown task to the queue, task name - {task_name}"
task_name: str


class ResultBackendError(TaskiqError):
"""Base class for all ResultBackend errors."""

__template__ = "Base exception for all result backend errors"


class ResultGetError(ResultBackendError):
"""Error if ResultBackend was unable to get result."""

__template__ = "Cannot get result for the task"


class ResultSetError(ResultBackendError):
"""Error if ResultBackend was unable to set result."""

__template__ = "Cannot set result for the task"


class ResultIsReadyError(ResultBackendError):
"""Error if ResultBackend was unable to find out if the task is ready."""

__template__ = "Cannot find out if the task is ready"


class SecurityError(TaskiqError):
"""Security related exception."""

__template__ = "Security exception occurred: {description}"
description: str


class NoResultError(TaskiqError):
"""Error if user does not want to set result."""

__template__ = "User doesn't want to set result"


class TaskRejectedError(TaskiqError):
"""Task was rejected."""

__template__ = "Task was rejected"


class ScheduledTaskCancelledError(TaskiqError):
"""Scheduled task was cancelled and not sent to the queue."""

__template__ = "Cannot send scheduled task to the queue."
2 changes: 1 addition & 1 deletion taskiq/funcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ async def check_task(task: AsyncTaskiqTask[Any]) -> None:

while task_ids:
if 0 < timeout < time() - start_time:
raise TaskiqResultTimeoutError("Timed out")
raise TaskiqResultTimeoutError(timeout=timeout)
check_tasks = []
for task in tasks:
check_tasks.append(loop.create_task(check_task(task)))
Expand Down
4 changes: 3 additions & 1 deletion taskiq/receiver/receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,9 @@ async def prefetcher(
"""
fetched_tasks: int = 0
iterator = self.broker.listen()
current_message: asyncio.Task[bytes | AckableMessage] = asyncio.create_task(
current_message: asyncio.Task[
Union[bytes, AckableMessage]
] = asyncio.create_task(
iterator.__anext__(), # type: ignore
)

Expand Down
5 changes: 4 additions & 1 deletion taskiq/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,10 @@ def exception_to_python(
if not isinstance(cls, type) or not issubclass(cls, BaseException):
fake_exc_type = exc_type if exc_module is None else f"{exc_module}.{exc_type}"
raise taskiq.exceptions.SecurityError(
f"Expected an exception class, got {fake_exc_type} with payload {exc_msg}",
description=(
f"Expected an exception class, "
f"got {fake_exc_type} with payload {exc_msg}"
),
)

# XXX: Without verifying `cls` is actually an exception class,
Expand Down
2 changes: 1 addition & 1 deletion taskiq/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ async def wait_result(
while not await self.is_ready():
await asyncio.sleep(check_interval)
if 0 < timeout < time() - start_time:
raise TaskiqResultTimeoutError
raise TaskiqResultTimeoutError(timeout=timeout)
return await self.get_result(with_logs=with_logs)

async def get_progress(self) -> "Optional[TaskProgress[Any]]":
Expand Down

0 comments on commit 8e14595

Please sign in to comment.