diff --git a/servicelayer/taskqueue.py b/servicelayer/taskqueue.py index f442168..4f31ce0 100644 --- a/servicelayer/taskqueue.py +++ b/servicelayer/taskqueue.py @@ -9,6 +9,7 @@ from abc import ABC, abstractmethod import functools from queue import Queue, Empty +from random import randrange from structlog.contextvars import clear_contextvars, bind_contextvars import pika @@ -417,8 +418,13 @@ def process_blocking(self): (task, channel, connection) = self.local_queue.get(timeout=TIMEOUT) apply_task_context(task, v=self.version) self.handle(task) - cb = functools.partial(self.ack_message, task, channel) - connection.add_callback_threadsafe(cb) + does_it_get_acked = randrange(1, 10) + if does_it_get_acked < 5: + cb = functools.partial(self.reject_message, task, channel) + connection.add_callback_threadsafe(cb) + else: + cb = functools.partial(self.ack_message, task, channel) + connection.add_callback_threadsafe(cb) except Empty: pass finally: @@ -507,6 +513,13 @@ def ack_message(self, task, channel): channel.basic_ack(task.delivery_tag) clear_contextvars() + def reject_message(self, task, channel): + log.critical( + f"Rejecting message {task.delivery_tag} for task_id {task.task_id}" + ) + if channel.is_open: + channel.basic_reject(task.delivery_tag) + def run(self): """Run a blocking worker instance"""