diff --git a/servicelayer/taskqueue.py b/servicelayer/taskqueue.py index 417749d..89e95fb 100644 --- a/servicelayer/taskqueue.py +++ b/servicelayer/taskqueue.py @@ -32,9 +32,6 @@ OP_ANALYZE = "analyze" OP_INDEX = "index" -# ToDo: consider ALEPH_INGEST_PIPELINE setting -INGEST_OPS = (OP_INGEST, OP_ANALYZE) - TIMEOUT = 5 @@ -366,16 +363,6 @@ def apply_task_context(task: Task, **kwargs): ) -def get_routing_key(stage): - if stage in INGEST_OPS: - routing_key = settings.QUEUE_INGEST - elif stage == OP_INDEX: - routing_key = settings.QUEUE_INDEX - else: - routing_key = settings.QUEUE_ALEPH - return routing_key - - class MaxRetriesExceededError(Exception): pass @@ -383,7 +370,7 @@ class MaxRetriesExceededError(Exception): class Worker(ABC): def __init__( self, - queues, + stages, conn=None, num_threads=settings.WORKER_THREADS, version=None, @@ -391,7 +378,7 @@ def __init__( ): self.conn = conn or get_redis() self.num_threads = num_threads - self.queues = ensure_list(queues) + self.stages = ensure_list(stages) self.version = version self.prefetch_count = prefetch_count self.local_queue = Queue() @@ -540,16 +527,30 @@ def process(): channel = connection.channel() channel.basic_qos(prefetch_count=self.prefetch_count) on_message_callback = functools.partial(self.on_message, args=(connection,)) - for queue in self.queues: - channel.queue_declare( - queue=queue, + for stage in self.stages: + result = channel.queue_declare( + queue="", durable=True, + exclusive=True, + auto_delete=True, arguments={ "x-max-priority": settings.RABBITMQ_MAX_PRIORITY, "x-overflow": "reject-publish", }, ) - channel.basic_consume(queue=queue, on_message_callback=on_message_callback) + queue_name = result.method.queue + channel.queue_bind( + queue=queue_name, + exchange="amq.topic", + routing_key=stage, + ) + log.debug( + f"{platform.node()} bound queue {queue_name}" + f"to amq.topic exchange with routing key {stage}" + ) + channel.basic_consume( + queue=queue_name, on_message_callback=on_message_callback + ) channel.start_consuming() @@ -576,38 +577,6 @@ def get_rabbitmq_connection(): ) local.connection = connection - if local.connection and local.connection.is_open: - log.debug("Defining RabbitMQ queues on an open connection") - channel = local.connection.channel() - - channel.queue_declare( - queue=settings.QUEUE_ALEPH, - durable=True, - arguments={ - "x-max-priority": settings.RABBITMQ_MAX_PRIORITY, - "x-overflow": "reject-publish", - }, - ) - - channel.queue_declare( - queue=settings.QUEUE_INGEST, - durable=True, - arguments={ - "x-max-priority": settings.RABBITMQ_MAX_PRIORITY, - "x-overflow": "reject-publish", - }, - ) - - channel.queue_declare( - queue=settings.QUEUE_INDEX, - durable=True, - arguments={ - "x-max-priority": settings.RABBITMQ_MAX_PRIORITY, - "x-overflow": "reject-publish", - }, - ) - - channel.close() return local.connection except (