Skip to content

Commit

Permalink
Wait for Postgres queue tasks to finish cancelling when disconnecting
Browse files Browse the repository at this point in the history
  • Loading branch information
vchan committed Sep 3, 2024
1 parent d5be27c commit 096b6a2
Showing 1 changed file with 11 additions and 8 deletions.
19 changes: 11 additions & 8 deletions saq/queue/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ def __init__(
self.connection_lock = asyncio.Lock()
self.released: list[str] = []
self.has_sweep_lock = False
self.tasks: list[asyncio.Task] = []

async def connect(self) -> None:
if self.connection:
Expand All @@ -128,11 +129,11 @@ async def connect(self) -> None:
# Reserve a connection for dequeue and advisory locks
self.connection = await self.pool.getconn()

self.wait_for_job_task = asyncio.create_task(self.wait_for_job())
self.listen_for_enqueues_task = asyncio.create_task(self.listen_for_enqueues())
self.tasks.append(asyncio.create_task(self.wait_for_job()))
self.tasks.append(asyncio.create_task(self.listen_for_enqueues()))
if self.poll_interval > 0:
self.dequeue_timer_task = asyncio.create_task(
self.dequeue_timer(self.poll_interval)
self.tasks.append(
asyncio.create_task(self.dequeue_timer(self.poll_interval))
)

def job_id(self, job_key: str) -> str:
Expand All @@ -151,10 +152,12 @@ async def disconnect(self) -> None:
await self.pool.putconn(self.connection)
self.connection = None
await self.pool.close()
self.wait_for_job_task.cancel()
self.listen_for_enqueues_task.cancel()
if hasattr(self, "dequeue_timer_task"):
self.dequeue_timer_task.cancel()
for task in self.tasks:
task.cancel()
try:
await asyncio.gather(*self.tasks, return_exceptions=True)
except asyncio.exceptions.CancelledError:
pass
self.has_sweep_lock = False

async def info(
Expand Down

0 comments on commit 096b6a2

Please sign in to comment.