Skip to content

Commit

Permalink
fix: some minor issues (#342)
Browse files Browse the repository at this point in the history
Co-authored-by: Anton <myprojectorterrypratchett@gmail.com>
  • Loading branch information
Sobes76rus and Anton authored Jul 18, 2024
1 parent b83c927 commit 49c0408
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 12 deletions.
25 changes: 15 additions & 10 deletions taskiq/depends/progress_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,6 @@
from taskiq.compat import IS_PYDANTIC2
from taskiq.context import Context

if IS_PYDANTIC2:
from pydantic import BaseModel as GenericModel
else:
from pydantic.generics import GenericModel # type: ignore[no-redef]


_ProgressType = TypeVar("_ProgressType")


Expand All @@ -25,15 +19,26 @@ class TaskState(str, enum.Enum):
RETRY = "RETRY"


class TaskProgress(GenericModel, Generic[_ProgressType]):
if IS_PYDANTIC2:
from pydantic import BaseModel, ConfigDict

class _TaskProgressConfig(BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True)

else:
from pydantic.generics import GenericModel

class _TaskProgressConfig(GenericModel): # type: ignore[no-redef]
class Config:
arbitrary_types_allowed = True


class TaskProgress(_TaskProgressConfig, Generic[_ProgressType]):
"""Progress of task execution."""

state: Union[TaskState, str]
meta: Optional[_ProgressType]

class Config:
arbitrary_types_allowed = True


class ProgressTracker(Generic[_ProgressType]):
"""Task's dependency to set progress."""
Expand Down
6 changes: 4 additions & 2 deletions taskiq/receiver/receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,8 +401,10 @@ def task_cb(task: "asyncio.Task[Any]") -> None:
self.sem_prefetch.release()
message = await queue.get()
if message is QUEUE_DONE:
logger.info("Waiting for running tasks to complete.")
await asyncio.wait(tasks, timeout=self.wait_tasks_timeout)
# asyncio.wait will throw an error if there is nothing to wait for
if tasks:
logger.info("Waiting for running tasks to complete.")
await asyncio.wait(tasks, timeout=self.wait_tasks_timeout)
break

task = asyncio.create_task(
Expand Down

0 comments on commit 49c0408

Please sign in to comment.