Skip to content

Commit

Permalink
Rework connection creation
Browse files Browse the repository at this point in the history
  • Loading branch information
catileptic committed Jan 17, 2024
1 parent 69dc59b commit 67e868a
Showing 1 changed file with 44 additions and 2 deletions.
46 changes: 44 additions & 2 deletions servicelayer/taskqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,11 @@ def process():
def get_rabbitmq_connection():
for attempt in service_retries():
try:
if not hasattr(local, "connection") or not local.connection:
if (
not hasattr(local, "connection")
or not local.connection
or not local.connection.is_open
):
credentials = pika.PlainCredentials(
settings.RABBITMQ_USERNAME, settings.RABBITMQ_PASSWORD
)
Expand All @@ -464,7 +468,7 @@ def get_rabbitmq_connection():
)
local.connection = connection

if local.connection.is_open:
if local.connection and local.connection.is_open:
channel = local.connection.channel()

channel.queue_declare(
Expand All @@ -488,6 +492,44 @@ def get_rabbitmq_connection():
channel.close()
return local.connection

# if not hasattr(local, "connection") or not local.connection:
# credentials = pika.PlainCredentials(
# settings.RABBITMQ_USERNAME, settings.RABBITMQ_PASSWORD
# )
# connection = pika.BlockingConnection(
# pika.ConnectionParameters(
# host=settings.RABBITMQ_URL,
# credentials=credentials,
# heartbeat=settings.RABBITMQ_HEARTBEAT,
# blocked_connection_timeout=settings.RABBITMQ_BLOCKED_CONNECTION_TIMEOUT,
# )
# )
# local.connection = connection

# if local.connection.is_open:
# channel = local.connection.channel()

# channel.queue_declare(
# queue=settings.QUEUE_ALEPH,
# durable=True,
# arguments={"x-max-priority": settings.RABBITMQ_MAX_PRIORITY},
# )

# channel.queue_declare(
# queue=settings.QUEUE_INGEST,
# durable=True,
# arguments={"x-max-priority": settings.RABBITMQ_MAX_PRIORITY},
# )

# channel.queue_declare(
# queue=settings.QUEUE_INDEX,
# durable=True,
# arguments={"x-max-priority": settings.RABBITMQ_MAX_PRIORITY},
# )

# channel.close()
# return local.connection

except (pika.exceptions.AMQPConnectionError, pika.exceptions.AMQPError):
log.exception("RabbitMQ error")
local.connection = None
Expand Down

0 comments on commit 67e868a

Please sign in to comment.