Skip to content

Commit

Permalink
Temporary commit
Browse files Browse the repository at this point in the history
  • Loading branch information
catileptic committed May 21, 2024
1 parent 696a8c2 commit 0dd7b1a
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 82 deletions.
28 changes: 14 additions & 14 deletions servicelayer/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,21 +43,21 @@
)
RABBITMQ_MAX_PRIORITY = 10

QUEUE_ALEPH = "aleph_queue"
QUEUE_INGEST = "ingest_queue"
QUEUE_INDEX = "index_queue"
QUEUE_BULK_INDEX = "bulk_index_queue"
QUEUE_XREF = "xref_queue"
QUEUE_EXPORT = "export_queue"
# QUEUE_ALEPH = "aleph_queue"
# QUEUE_INGEST = "ingest_queue"
# QUEUE_INDEX = "index_queue"
# QUEUE_BULK_INDEX = "bulk_index_queue"
# QUEUE_XREF = "xref_queue"
# QUEUE_EXPORT = "export_queue"

RABBITMQ_QUEUES = [
QUEUE_ALEPH,
QUEUE_INGEST,
QUEUE_INDEX,
QUEUE_BULK_INDEX,
QUEUE_XREF,
QUEUE_EXPORT,
]
# RABBITMQ_QUEUES = [
# QUEUE_ALEPH,
# QUEUE_INGEST,
# QUEUE_INDEX,
# QUEUE_BULK_INDEX,
# QUEUE_XREF,
# QUEUE_EXPORT,
# ]

# Sentry
SENTRY_DSN = env.get("SENTRY_DSN")
Expand Down
126 changes: 67 additions & 59 deletions servicelayer/taskqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,34 +29,34 @@

# TODO - there should be one source of truth for these constants,
# perhaps here, not in the aleph repo
OP_INGEST = "ingest"
OP_ANALYZE = "analyze"
OP_INDEX = "index"
OP_XREF = "xref"
OP_REINGEST = "reingest"
OP_REINDEX = "reindex"
OP_LOAD_MAPPING = "loadmapping"
OP_FLUSH_MAPPING = "flushmapping"
OP_EXPORT_SEARCH = "exportsearch"
OP_EXPORT_XREF = "exportxref"
OP_UPDATE_ENTITY = "updateentity"
OP_PRUNE_ENTITY = "pruneentity"
# OP_INGEST = "ingest"
# OP_ANALYZE = "analyze"
# OP_INDEX = "index"
# OP_XREF = "xref"
# OP_REINGEST = "reingest"
# OP_REINDEX = "reindex"
# OP_LOAD_MAPPING = "loadmapping"
# OP_FLUSH_MAPPING = "flushmapping"
# OP_EXPORT_SEARCH = "exportsearch"
# OP_EXPORT_XREF = "exportxref"
# OP_UPDATE_ENTITY = "updateentity"
# OP_PRUNE_ENTITY = "pruneentity"

# ToDo: consider ALEPH_INGEST_PIPELINE setting
INGEST_OPS = (OP_INGEST, OP_ANALYZE)
# INGEST_OPS = (OP_INGEST, OP_ANALYZE)

TIMEOUT = 5

# Default values. Each specific type of worker
# can override these with env vars.
QOS_MAPPING = {
settings.QUEUE_ALEPH: 1,
settings.QUEUE_INGEST: 1,
settings.QUEUE_INDEX: 100,
settings.QUEUE_BULK_INDEX: 1,
settings.QUEUE_XREF: 1,
settings.QUEUE_EXPORT: 1,
}
# QOS_MAPPING = {
# settings.QUEUE_ALEPH: 1,
# settings.QUEUE_INGEST: 1,
# settings.QUEUE_INDEX: 100,
# settings.QUEUE_BULK_INDEX: 1,
# settings.QUEUE_XREF: 1,
# settings.QUEUE_EXPORT: 1,
# }


@dataclass
Expand Down Expand Up @@ -387,26 +387,47 @@ def apply_task_context(task: Task, **kwargs):
)


def get_routing_key(stage):
if stage in INGEST_OPS:
routing_key = settings.QUEUE_INGEST
elif stage == OP_INDEX:
routing_key = settings.QUEUE_INDEX
elif stage == OP_XREF:
routing_key = settings.QUEUE_XREF
elif stage == OP_REINDEX:
routing_key = settings.QUEUE_BULK_INDEX
elif stage == OP_EXPORT_SEARCH or stage == OP_EXPORT_XREF:
routing_key = settings.QUEUE_EXPORT
else:
routing_key = settings.QUEUE_ALEPH
return routing_key
# def get_routing_key(stage):
# if stage in INGEST_OPS:
# routing_key = settings.QUEUE_INGEST
# elif stage == OP_INDEX:
# routing_key = settings.QUEUE_INDEX
# elif stage == OP_XREF:
# routing_key = settings.QUEUE_XREF
# elif stage == OP_REINDEX:
# routing_key = settings.QUEUE_BULK_INDEX
# elif stage == OP_EXPORT_SEARCH or stage == OP_EXPORT_XREF:
# routing_key = settings.QUEUE_EXPORT
# else:
# routing_key = settings.QUEUE_ALEPH
# return routing_key


def declare_rabbitmq_queue(channel, queue, prefetch_count=1):
channel.basic_qos(global_qos=False, prefetch_count=prefetch_count)
channel.queue_declare(
queue=queue,
durable=True,
arguments={
"x-max-priority": settings.RABBITMQ_MAX_PRIORITY,
"x-overflow": "reject-publish",
},
)


class MaxRetriesExceededError(Exception):
pass


# A custom dictionary object that will return 1 for any key
class QoSMappingDict(dict):
def __init__(self, *arg, **kw):
super(QoSMappingDict, self).__init__(*arg, **kw)

def __getitem__(self, key):
return 1


class Worker(ABC):
def __init__(
self,
Expand All @@ -420,7 +441,7 @@ def __init__(
self.queues = ensure_list(queues)
self.version = version
self.local_queue = Queue()
self.prefetch_count_mapping = QOS_MAPPING
self.prefetch_count_mapping = QoSMappingDict()

def on_signal(self, signal, _):
log.warning(f"Shutting down worker (signal {signal})")
Expand Down Expand Up @@ -567,14 +588,8 @@ def process():
on_message_callback = functools.partial(self.on_message, args=(connection,))

for queue in self.queues:
channel.basic_qos(global_qos=False, prefetch_count=QOS_MAPPING[queue])
channel.queue_declare(
queue=queue,
durable=True,
arguments={
"x-max-priority": settings.RABBITMQ_MAX_PRIORITY,
"x-overflow": "reject-publish",
},
declare_rabbitmq_queue(
channel, queue, prefetch_count=self.prefetch_count_mapping[queue]
)
channel.basic_consume(queue=queue, on_message_callback=on_message_callback)
channel.start_consuming()
Expand Down Expand Up @@ -602,22 +617,15 @@ def get_rabbitmq_connection():
)
local.connection = connection

if local.connection and local.connection.is_open:
log.debug("Defining RabbitMQ queues on an open connection")
channel = local.connection.channel()

for queue in settings.RABBITMQ_QUEUES:
channel.queue_declare(
queue=queue,
durable=True,
arguments={
"x-max-priority": settings.RABBITMQ_MAX_PRIORITY,
"x-overflow": "reject-publish",
},
)
# Do we need to re-declare the queues?
# if local.connection and local.connection.is_open:
# log.debug("Defining RabbitMQ queues on an open connection")
# channel = local.connection.channel()

# declare_rabbitmq_queue()

channel.close()
return local.connection
# channel.close()
# return local.connection

except (
pika.exceptions.AMQPConnectionError,
Expand Down
17 changes: 8 additions & 9 deletions tests/test_taskqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
Task,
get_rabbitmq_connection,
dataset_from_collection_id,
declare_rabbitmq_queue,
)
from servicelayer.util import unpack_datetime

Expand All @@ -29,8 +30,7 @@ def dispatch_task(self, task):

class TaskQueueTest(TestCase):
def test_task_queue(self):
settings.QUEUE_INGEST = "sls-queue-ingest"
settings.RABBITMQ_QUEUES.append(settings.QUEUE_INGEST)
test_queue_name = "sls-queue-ingest"
conn = get_fakeredis()
collection_id = 2
task_id = "test-task"
Expand All @@ -46,11 +46,12 @@ def test_task_queue(self):
}
connection = get_rabbitmq_connection()
channel = connection.channel()
channel.queue_purge(settings.QUEUE_INGEST)
declare_rabbitmq_queue(channel, test_queue_name)
channel.queue_purge(test_queue_name)
channel.basic_publish(
properties=pika.BasicProperties(priority=priority),
exchange="",
routing_key=settings.QUEUE_INGEST,
routing_key=test_queue_name,
body=json.dumps(body),
)
dataset = Dataset(conn=conn, name=dataset_from_collection_id(collection_id))
Expand All @@ -67,9 +68,7 @@ def test_task_queue(self):
assert started < last_updated
assert abs(started - last_updated) < datetime.timedelta(seconds=1)

worker = CountingWorker(
queues=[settings.QUEUE_INGEST], conn=conn, num_threads=1
)
worker = CountingWorker(queues=[test_queue_name], conn=conn, num_threads=1)
worker.process(blocking=False)

status = dataset.get_status()
Expand All @@ -82,11 +81,11 @@ def test_task_queue(self):

with patch("servicelayer.settings.WORKER_RETRY", 0):
channel = connection.channel()
channel.queue_purge(settings.QUEUE_INGEST)
channel.queue_purge(test_queue_name)
channel.basic_publish(
properties=pika.BasicProperties(priority=priority),
exchange="",
routing_key=settings.QUEUE_INGEST,
routing_key=test_queue_name,
body=json.dumps(body),
)
dataset = Dataset(conn=conn, name=dataset_from_collection_id(collection_id))
Expand Down

0 comments on commit 0dd7b1a

Please sign in to comment.