Skip to content

Commit

Permalink
Make workers bind queues to topic exchange
Browse files Browse the repository at this point in the history
  • Loading branch information
stchris committed May 17, 2024
1 parent 431fb72 commit 05ecc2d
Showing 1 changed file with 20 additions and 51 deletions.
71 changes: 20 additions & 51 deletions servicelayer/taskqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@
OP_ANALYZE = "analyze"
OP_INDEX = "index"

# ToDo: consider ALEPH_INGEST_PIPELINE setting
INGEST_OPS = (OP_INGEST, OP_ANALYZE)

TIMEOUT = 5


Expand Down Expand Up @@ -366,32 +363,22 @@ def apply_task_context(task: Task, **kwargs):
)


def get_routing_key(stage):
if stage in INGEST_OPS:
routing_key = settings.QUEUE_INGEST
elif stage == OP_INDEX:
routing_key = settings.QUEUE_INDEX
else:
routing_key = settings.QUEUE_ALEPH
return routing_key


class MaxRetriesExceededError(Exception):
pass


class Worker(ABC):
def __init__(
self,
queues,
stages,
conn=None,
num_threads=settings.WORKER_THREADS,
version=None,
prefetch_count=100,
):
self.conn = conn or get_redis()
self.num_threads = num_threads
self.queues = ensure_list(queues)
self.stages = ensure_list(stages)
self.version = version
self.prefetch_count = prefetch_count
self.local_queue = Queue()
Expand Down Expand Up @@ -540,16 +527,30 @@ def process():
channel = connection.channel()
channel.basic_qos(prefetch_count=self.prefetch_count)
on_message_callback = functools.partial(self.on_message, args=(connection,))
for queue in self.queues:
channel.queue_declare(
queue=queue,
for stage in self.stages:
result = channel.queue_declare(
queue="",
durable=True,
exclusive=True,
auto_delete=True,
arguments={
"x-max-priority": settings.RABBITMQ_MAX_PRIORITY,
"x-overflow": "reject-publish",
},
)
channel.basic_consume(queue=queue, on_message_callback=on_message_callback)
queue_name = result.method.queue
channel.queue_bind(
queue=queue_name,
exchange="amq.topic",
routing_key=stage,
)
log.debug(
f"{platform.node()} bound queue {queue_name}"
f"to amq.topic exchange with routing key {routing_key}"
)
channel.basic_consume(
queue=queue_name, on_message_callback=on_message_callback
)
channel.start_consuming()


Expand All @@ -576,38 +577,6 @@ def get_rabbitmq_connection():
)
local.connection = connection

if local.connection and local.connection.is_open:
log.debug("Defining RabbitMQ queues on an open connection")
channel = local.connection.channel()

channel.queue_declare(
queue=settings.QUEUE_ALEPH,
durable=True,
arguments={
"x-max-priority": settings.RABBITMQ_MAX_PRIORITY,
"x-overflow": "reject-publish",
},
)

channel.queue_declare(
queue=settings.QUEUE_INGEST,
durable=True,
arguments={
"x-max-priority": settings.RABBITMQ_MAX_PRIORITY,
"x-overflow": "reject-publish",
},
)

channel.queue_declare(
queue=settings.QUEUE_INDEX,
durable=True,
arguments={
"x-max-priority": settings.RABBITMQ_MAX_PRIORITY,
"x-overflow": "reject-publish",
},
)

channel.close()
return local.connection

except (
Expand Down

0 comments on commit 05ecc2d

Please sign in to comment.