From ee5569a58d01d016811a813b8d8eb2ca79483e44 Mon Sep 17 00:00:00 2001 From: Omer Murat Yildirim Date: Thu, 6 Jun 2024 14:14:39 +0300 Subject: [PATCH] add ability to set priority for workers --- kuyruk/__main__.py | 5 +++++ kuyruk/config.py | 3 +++ kuyruk/worker.py | 14 +++++++++++++- 3 files changed, 21 insertions(+), 1 deletion(-) diff --git a/kuyruk/__main__.py b/kuyruk/__main__.py index aa5b01b1..7bf2bb37 100644 --- a/kuyruk/__main__.py +++ b/kuyruk/__main__.py @@ -46,6 +46,11 @@ def main() -> None: type=int, help='gracefully shutdown worker after this duration in seconds, ' 'set to 0 for running forever') + parser_worker.add_argument( + '--priority', + type=int, + help='sets priority for the worker' + 'Default is 0. Larger numbers indicate higher priority, and both positive and negative numbers can be used.') parser_worker.add_argument( '-l', '--logging-level', diff --git a/kuyruk/config.py b/kuyruk/config.py index f53443b9..b1008a6f 100644 --- a/kuyruk/config.py +++ b/kuyruk/config.py @@ -75,6 +75,9 @@ class attributes. Additional attributes may be added by extensions. WORKER_RECONNECT_INTERVAL = 5 """Number of seconds to wait after a connection error.""" + WORKER_PRIORITY = None + """Sets worker priority. Larger number means higher priority.""" + def from_object(self, obj: Union[str, Any]) -> None: """Load values from an object.""" if isinstance(obj, str): diff --git a/kuyruk/worker.py b/kuyruk/worker.py index 29eef507..48b1724b 100644 --- a/kuyruk/worker.py +++ b/kuyruk/worker.py @@ -72,6 +72,10 @@ def add_host(queue: str) -> str: if self._max_load == -1: self._max_load == multiprocessing.cpu_count() + self._priority = app.config.WORKER_PRIORITY + if args.priority is not None: + self._priority = args.priority + self._reconnect_interval = app.config.WORKER_RECONNECT_INTERVAL self._threads: List[threading.Thread] = [] @@ -183,7 +187,15 @@ def _consume_queues(self, ch: amqp.Channel) -> None: self.consuming = True for queue in self.queues: logger.debug("basic_consume: %s", queue) - ch.basic_consume(queue=queue, consumer_tag=self._consumer_tag(queue), callback=self._process_message) + + arguments = {} + if self._priority: + arguments['x-priority'] = self._priority + + ch.basic_consume(queue=queue, + consumer_tag=self._consumer_tag(queue), + callback=self._process_message, + arguments=arguments) def _cancel_queues(self, ch: amqp.Channel) -> None: self.consuming = False