Skip to content

Commit

Permalink
Use a timer for periodic functions
Browse files Browse the repository at this point in the history
  • Loading branch information
stchris committed May 21, 2024
1 parent 431fb72 commit bc6acac
Showing 1 changed file with 5 additions and 1 deletion.
6 changes: 5 additions & 1 deletion servicelayer/taskqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,9 +395,11 @@ def __init__(
self.version = version
self.prefetch_count = prefetch_count
self.local_queue = Queue()
self.periodic_timer = threading.Timer(interval=30, function=self.periodic)

def on_signal(self, signal, _):
log.warning(f"Shutting down worker (signal {signal})")
self.periodic_timer.cancel()
# Exit eagerly without waiting for current task to finish running
sys.exit(int(signal))

Expand All @@ -413,6 +415,8 @@ def on_message(self, channel, method, properties, body, args):

def process_blocking(self):
"""Blocking worker thread - executes tasks from a queue and periodic tasks"""
self.periodic_timer.start()
log.info("Starting periodic timer")
while True:
try:
(task, channel, connection) = self.local_queue.get(timeout=TIMEOUT)
Expand All @@ -424,7 +428,7 @@ def process_blocking(self):
pass
finally:
clear_contextvars()
self.periodic()
self.periodic_timer.cancel()

def process_nonblocking(self):
"""Non-blocking worker is used for tests only."""
Expand Down

0 comments on commit bc6acac

Please sign in to comment.