Skip to content

Commit

Permalink
TEMPORARY. Implement rejection
Browse files Browse the repository at this point in the history
  • Loading branch information
catileptic committed Mar 19, 2024
1 parent cf2ccf1 commit f3228c3
Showing 1 changed file with 15 additions and 2 deletions.
17 changes: 15 additions & 2 deletions servicelayer/taskqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from abc import ABC, abstractmethod
import functools
from queue import Queue, Empty
from random import randrange

from structlog.contextvars import clear_contextvars, bind_contextvars
import pika
Expand Down Expand Up @@ -417,8 +418,13 @@ def process_blocking(self):
(task, channel, connection) = self.local_queue.get(timeout=TIMEOUT)
apply_task_context(task, v=self.version)
self.handle(task)
cb = functools.partial(self.ack_message, task, channel)
connection.add_callback_threadsafe(cb)
does_it_get_acked = randrange(1, 10)
if does_it_get_acked < 5:
cb = functools.partial(self.reject_message, task, channel)
connection.add_callback_threadsafe(cb)
else:
cb = functools.partial(self.ack_message, task, channel)
connection.add_callback_threadsafe(cb)
except Empty:
pass
finally:
Expand Down Expand Up @@ -507,6 +513,13 @@ def ack_message(self, task, channel):
channel.basic_ack(task.delivery_tag)
clear_contextvars()

def reject_message(self, task, channel):
log.critical(
f"Rejecting message {task.delivery_tag} for task_id {task.task_id}"
)
if channel.is_open:
channel.basic_reject(task.delivery_tag)

def run(self):
"""Run a blocking worker instance"""

Expand Down

0 comments on commit f3228c3

Please sign in to comment.