diff --git a/servicelayer/taskqueue.py b/servicelayer/taskqueue.py index cd46570..07203f1 100644 --- a/servicelayer/taskqueue.py +++ b/servicelayer/taskqueue.py @@ -9,6 +9,7 @@ from abc import ABC, abstractmethod import functools from queue import Queue, Empty +import platform from structlog.contextvars import clear_contextvars, bind_contextvars import pika @@ -462,7 +463,7 @@ def handle(self, task: Task): ) dataset.checkout_task(task.task_id, task.operation) task.increment_retry_count(self.conn) - log.info(f"Dispatching task {task.task_id} from job {task.job_id}") + log.info(f"Dispatching task {task.task_id} from job {task.job_id} to worker {platform.node()}") task = self.dispatch_task(task) else: log.warn(f"Discarding task: {task.task_id}") @@ -605,7 +606,11 @@ def get_rabbitmq_connection(): channel.close() return local.connection - except (pika.exceptions.AMQPConnectionError, pika.exceptions.AMQPError): + except ( + pika.exceptions.AMQPConnectionError, + pika.exceptions.AMQPError, + ConnectionResetError, + ) log.exception(f"RabbitMQ error. Attempt: {attempt}") local.connection = None