Skip to content

Commit

Permalink
Log worker hostname when dispatching task
Browse files Browse the repository at this point in the history
  • Loading branch information
catileptic committed May 8, 2024
1 parent b7df50c commit 4de7e00
Showing 1 changed file with 7 additions and 2 deletions.
9 changes: 7 additions & 2 deletions servicelayer/taskqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from abc import ABC, abstractmethod
import functools
from queue import Queue, Empty
import platform

from structlog.contextvars import clear_contextvars, bind_contextvars
import pika
Expand Down Expand Up @@ -462,7 +463,7 @@ def handle(self, task: Task):
)
dataset.checkout_task(task.task_id, task.operation)
task.increment_retry_count(self.conn)
log.info(f"Dispatching task {task.task_id} from job {task.job_id}")
log.info(f"Dispatching task {task.task_id} from job {task.job_id} to worker {platform.node()}")
task = self.dispatch_task(task)
else:
log.warn(f"Discarding task: {task.task_id}")
Expand Down Expand Up @@ -605,7 +606,11 @@ def get_rabbitmq_connection():
channel.close()
return local.connection

except (pika.exceptions.AMQPConnectionError, pika.exceptions.AMQPError):
except (
pika.exceptions.AMQPConnectionError,
pika.exceptions.AMQPError,
ConnectionResetError,
)
log.exception(f"RabbitMQ error. Attempt: {attempt}")
local.connection = None

Expand Down

0 comments on commit 4de7e00

Please sign in to comment.