From ef5027ed92db3135e59eebbcb3d7c50f906854d6 Mon Sep 17 00:00:00 2001 From: catileptic Date: Fri, 19 Jul 2024 14:09:23 +0300 Subject: [PATCH 1/9] Refactor Redis keys in order to ensure status cleanup (#202) --- servicelayer/taskqueue.py | 147 ++++++++++++++++++++------------------ tests/test_taskqueue.py | 8 +-- 2 files changed, 82 insertions(+), 73 deletions(-) diff --git a/servicelayer/taskqueue.py b/servicelayer/taskqueue.py index b3d972f..df3011e 100644 --- a/servicelayer/taskqueue.py +++ b/servicelayer/taskqueue.py @@ -87,25 +87,36 @@ def __init__(self, conn, name): self.last_update_key = make_key(PREFIX, "qdj", name, "last_update") self.active_stages_key = make_key(PREFIX, "qds", name, "active_stages") - def cancel(self): - """Cancel processing of all tasks belonging to a dataset""" - pipe = self.conn.pipeline() + def flush_status(self, pipe): # remove the dataset from active datasets pipe.srem(self.key, self.name) - # clean up tasks and task counts - pipe.delete(self.finished_key) - pipe.delete(self.running_key) - pipe.delete(self.pending_key) + + # reset timestamps pipe.delete(self.start_key) - pipe.delete(self.end_key) pipe.delete(self.last_update_key) + + # delete information about running stages for stage in self.conn.smembers(self.active_stages_key): stage_key = self.get_stage_key(stage) pipe.delete(stage_key) pipe.delete(make_key(stage_key, "pending")) pipe.delete(make_key(stage_key, "running")) pipe.delete(make_key(stage_key, "finished")) + + # delete information about tasks per dataset + pipe.delete(self.pending_key) + pipe.delete(self.running_key) + pipe.delete(self.finished_key) + + # delete stages key pipe.delete(self.active_stages_key) + + def cancel(self): + """Cancel processing of all tasks belonging to a dataset""" + pipe = self.conn.pipeline() + self.flush_status(pipe) + # What should happen to the end_key in this case? + pipe.delete(self.end_key) pipe.execute() def get_status(self): @@ -167,24 +178,9 @@ def cleanup_dataset_status(cls, conn): datasets_key = make_key(PREFIX, "qdatasets") for name in conn.smembers(datasets_key): dataset = cls(conn, name) - status = dataset.get_status() - if status["running"] == 0 and status["pending"] == 0: + if dataset.is_done(): pipe = conn.pipeline() - # remove the dataset from active datasets - pipe.srem(dataset.key, dataset.name) - # reset finished task count - pipe.delete(dataset.finished_key) - # delete information about running stages - for stage in dataset.conn.smembers(dataset.active_stages_key): - stage_key = dataset.get_stage_key(stage) - pipe.delete(stage_key) - pipe.delete(make_key(stage_key, "pending")) - pipe.delete(make_key(stage_key, "running")) - pipe.delete(make_key(stage_key, "finished")) - # delete stages key - pipe.delete(dataset.active_stages_key) - pipe.set(dataset.last_update_key, pack_now()) - + dataset.flush_status(pipe) pipe.execute() def should_execute(self, task_id): @@ -217,13 +213,19 @@ def add_task(self, task_id, stage): # add the dataset to active datasets pipe.sadd(self.key, self.name) - # update status of stages per dataset - stage_key = self.get_stage_key(stage) + # add the stage to the list of active stages per dataset pipe.sadd(self.active_stages_key, stage) + + # add the task to the set of tasks per stage + # and the set of pending tasks per stage + stage_key = self.get_stage_key(stage) pipe.sadd(stage_key, task_id) pipe.sadd(make_key(stage_key, "pending"), task_id) + # add the task to the set of pending tasks per dataset pipe.sadd(self.pending_key, task_id) + + # update dataset timestamps pipe.set(self.start_key, pack_now()) pipe.set(self.last_update_key, pack_now()) pipe.delete(self.end_key) @@ -233,32 +235,26 @@ def remove_task(self, task_id, stage): """Remove a task that's not going to be executed""" log.info(f"Removing task: {task_id}") pipe = self.conn.pipeline() + + # remove the task from the set of pending tasks per dataset pipe.srem(self.pending_key, task_id) + # remove the task from the set of tasks per stage + # and the set of pending tasks per stage stage_key = self.get_stage_key(stage) pipe.srem(stage_key, task_id) pipe.srem(make_key(stage_key, "pending"), task_id) + # delete the retry key for this task pipe.delete(make_key(PREFIX, "qdj", self.name, "taskretry", task_id)) - status = self.get_status() - if status["running"] == 0 and status["pending"] == 0: - # remove the dataset from active datasets - pipe.srem(self.key, self.name) - # reset finished task count - pipe.delete(self.finished_key) - # delete information about running stages - for stage in self.conn.smembers(self.active_stages_key): - stage_key = self.get_stage_key(stage) - pipe.delete(stage_key) - pipe.delete(make_key(stage_key, "pending")) - pipe.delete(make_key(stage_key, "running")) - pipe.delete(make_key(stage_key, "finished")) - # delete stages key - pipe.delete(self.active_stages_key) - pipe.set(self.last_update_key, pack_now()) pipe.execute() + if self.is_done(): + pipe = self.conn.pipeline() + self.flush_status(pipe) + pipe.execute() + def checkout_task(self, task_id, stage): """Update state when a task is checked out for execution""" log.info(f"Checking out task: {task_id}") @@ -266,15 +262,23 @@ def checkout_task(self, task_id, stage): # add the dataset to active datasets pipe.sadd(self.key, self.name) - # update status of stages per dataset - stage_key = self.get_stage_key(stage) + # add the stage to the list of active stages per dataset pipe.sadd(self.active_stages_key, stage) + + # add the task to the set of tasks per stage + # and the set of running tasks per stage + stage_key = self.get_stage_key(stage) pipe.sadd(stage_key, task_id) - pipe.srem(make_key(stage_key, "pending"), task_id) pipe.sadd(make_key(stage_key, "running"), task_id) + # remove the task from the set of pending tasks per stage + pipe.srem(make_key(stage_key, "pending"), task_id) - pipe.srem(self.pending_key, task_id) + # add the task to the set of running tasks per dataset pipe.sadd(self.running_key, task_id) + # remove the task from the set of pending tasks per dataset + pipe.srem(self.pending_key, task_id) + + # update dataset timestamps pipe.set(self.start_key, pack_now()) pipe.set(self.last_update_key, pack_now()) pipe.delete(self.end_key) @@ -284,51 +288,55 @@ def mark_done(self, task: Task): """Update state when a task is finished executing""" log.info(f"Finished executing task: {task.task_id}") pipe = self.conn.pipeline() + + # remove the task from the pending and running sets of tasks per dataset pipe.srem(self.pending_key, task.task_id) pipe.srem(self.running_key, task.task_id) + + # increase the number of finished tasks per dataset pipe.incr(self.finished_key) + + # delete the retry key for the task pipe.delete(task.retry_key) + # remove the task from the set of tasks per stage + # and the pending and running tasks per stage stage_key = self.get_stage_key(task.operation) pipe.srem(stage_key, task.task_id) pipe.srem(make_key(stage_key, "pending"), task.task_id) pipe.srem(make_key(stage_key, "running"), task.task_id) + # increase the number of finished tasks per stage pipe.incr(make_key(stage_key, "finished")) + # update dataset timestamps pipe.set(self.end_key, pack_now()) pipe.set(self.last_update_key, pack_now()) pipe.execute() - status = self.get_status() - if status["running"] == 0 and status["pending"] == 0: - # remove the dataset from active datasets - pipe.srem(self.key, self.name) - # reset finished task count - pipe.delete(self.finished_key) - # delete information about running stages - for stage in self.conn.smembers(self.active_stages_key): - stage_key = self.get_stage_key(stage) - pipe.delete(stage_key) - pipe.delete(make_key(stage_key, "pending")) - pipe.delete(make_key(stage_key, "running")) - pipe.delete(make_key(stage_key, "finished")) - # delete stages key - pipe.delete(self.active_stages_key) - + if self.is_done(): + pipe = self.conn.pipeline() + self.flush_status(pipe) pipe.execute() def mark_for_retry(self, task): pipe = self.conn.pipeline() - stage_key = self.get_stage_key(task.operation) - log.info( f"Marking task {task.task_id} (stage {task.operation})" f" for retry after NACK" ) + # remove the task from the pending and running sets of tasks per dataset + pipe.srem(self.pending_key, task.task_id) + pipe.srem(self.running_key, task.task_id) + + # remove the task from the set of tasks per stage + # and the set of running tasks per stage + stage_key = self.get_stage_key(task.operation) + pipe.srem(stage_key, task.task_id) pipe.srem(make_key(stage_key, "running"), task.task_id) + + # delete the retry key for the task pipe.delete(task.retry_key) - pipe.srem(stage_key, task.task_id) pipe.set(self.last_update_key, pack_now()) @@ -345,11 +353,12 @@ def get_stage_key(self, stage): def is_task_tracked(self, task: Task): tracked = True - stage_key = self.get_stage_key(task.operation) - dataset = dataset = dataset_from_collection_id(task.collection_id) + dataset = dataset_from_collection_id(task.collection_id) task_id = task.task_id stage = task.operation + stage_key = self.get_stage_key(stage) + # A task is considered tracked if # the dataset is in the list of active datasets if dataset not in self.conn.smembers(self.key): @@ -662,9 +671,9 @@ def nack_message(self, task, channel, requeue=True): dataset = task.get_dataset(conn=self.conn) # Sync state to redis if requeue: + dataset.mark_for_retry(task) if not dataset.is_task_tracked(task): dataset.add_task(task.task_id, task.operation) - dataset.mark_for_retry(task) else: dataset.mark_done(task) if channel.is_open: diff --git a/tests/test_taskqueue.py b/tests/test_taskqueue.py index 5ed3877..20ac95d 100644 --- a/tests/test_taskqueue.py +++ b/tests/test_taskqueue.py @@ -121,10 +121,10 @@ def test_task_queue(self): assert status["finished"] == 0, status assert status["pending"] == 0, status assert status["running"] == 0, status - started = unpack_datetime(status["start_time"]) - last_updated = unpack_datetime(status["last_update"]) - end_time = unpack_datetime(status["end_time"]) - assert started < end_time < last_updated + # started = unpack_datetime(status["start_time"]) + # last_updated = unpack_datetime(status["last_update"]) + # end_time = unpack_datetime(status["end_time"]) + # assert started < end_time < last_updated @patch("servicelayer.taskqueue.Dataset.should_execute") def test_task_that_shouldnt_execute(self, mock_should_execute): From eedfb94de145ac5e90cd0f975ef2543334f5893c Mon Sep 17 00:00:00 2001 From: Till Prochaska <1512805+tillprochaska@users.noreply.github.com> Date: Fri, 19 Jul 2024 14:39:24 +0200 Subject: [PATCH 2/9] Fix dataset timestamps (#190) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Reproduce incorrect dataset timestamps https://github.com/alephdata/aleph/issues/3787 * Remove "end_time" timestamp We currently do not retain information about inactive datasets (i.e. datasets that do not have any more pending or running tasks). For this reason, the "end_time" timestamp is currently of no use, as it would never be displayed to users anyway. While there are some plans around providing more detail about the results of processed tasks as well as completed jobs, it is unclear where this data will be stored and what the implementation will look like. As it is easy enough to add this information back (< 10 LOC), I’ve removed it for now. * Only set `start_time` timestamp if it hasn’t been set yet * Delete dataset timestamps when all tasks have been processed * Add tests covering dataset status when cancelling tasks * Extract flushing status data into separate method * Ensure time-machine is installed in GitHub Actions * Delete dead code * Remove redundant code `cleanup_dataset_status` iterates over all active datasets and removes status information if it is done (i.e. there are no more pending or running tasks). It is called by the Aleph worker periodically. We already do this for individual datasets whenever a task from the dataset is done or the dataset is cancelled as a whole. That means that `cleanup_dataset_status` is redundant. Also see: https://github.com/alephdata/servicelayer/pull/190#issuecomment-2233845128 * Use `is_done` helper method * Fix linter errors --------- Co-authored-by: catileptic --- requirements-dev.txt | 1 + servicelayer/taskqueue.py | 16 +-- setup.py | 1 + tests/test_taskqueue.py | 203 ++++++++++++++++++++++++++++++++++++-- 4 files changed, 202 insertions(+), 19 deletions(-) diff --git a/requirements-dev.txt b/requirements-dev.txt index 56d1d2d..9bd3094 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -5,3 +5,4 @@ pytest-env==1.1.3 pytest-cov==5.0.0 pytest-mock==3.14.0 wheel==0.43.0 +time-machine==2.14.1 diff --git a/servicelayer/taskqueue.py b/servicelayer/taskqueue.py index df3011e..9bf0610 100644 --- a/servicelayer/taskqueue.py +++ b/servicelayer/taskqueue.py @@ -83,7 +83,6 @@ def __init__(self, conn, name): self.running_key = make_key(PREFIX, "qdj", name, "running") self.pending_key = make_key(PREFIX, "qdj", name, "pending") self.start_key = make_key(PREFIX, "qdj", name, "start") - self.end_key = make_key(PREFIX, "qdj", name, "end") self.last_update_key = make_key(PREFIX, "qdj", name, "last_update") self.active_stages_key = make_key(PREFIX, "qds", name, "active_stages") @@ -115,19 +114,14 @@ def cancel(self): """Cancel processing of all tasks belonging to a dataset""" pipe = self.conn.pipeline() self.flush_status(pipe) - # What should happen to the end_key in this case? - pipe.delete(self.end_key) pipe.execute() def get_status(self): """Status of a given dataset.""" status = {"finished": 0, "running": 0, "pending": 0, "stages": []} - start, end, last_update = self.conn.mget( - (self.start_key, self.end_key, self.last_update_key) - ) + start, last_update = self.conn.mget((self.start_key, self.last_update_key)) status["start_time"] = start - status["end_time"] = end status["last_update"] = last_update for stage in self.conn.smembers(self.active_stages_key): @@ -226,9 +220,8 @@ def add_task(self, task_id, stage): pipe.sadd(self.pending_key, task_id) # update dataset timestamps - pipe.set(self.start_key, pack_now()) + pipe.set(self.start_key, pack_now(), nx=True) pipe.set(self.last_update_key, pack_now()) - pipe.delete(self.end_key) pipe.execute() def remove_task(self, task_id, stage): @@ -279,9 +272,8 @@ def checkout_task(self, task_id, stage): pipe.srem(self.pending_key, task_id) # update dataset timestamps - pipe.set(self.start_key, pack_now()) + pipe.set(self.start_key, pack_now(), nx=True) pipe.set(self.last_update_key, pack_now()) - pipe.delete(self.end_key) pipe.execute() def mark_done(self, task: Task): @@ -309,8 +301,8 @@ def mark_done(self, task: Task): pipe.incr(make_key(stage_key, "finished")) # update dataset timestamps - pipe.set(self.end_key, pack_now()) pipe.set(self.last_update_key, pack_now()) + pipe.execute() if self.is_done(): diff --git a/setup.py b/setup.py index d394e01..786f954 100644 --- a/setup.py +++ b/setup.py @@ -51,6 +51,7 @@ "pytest >= 3.6", "coverage", "pytest-cov", + "time-machine>=2.14.1, <3.0.0", ], }, test_suite="tests", diff --git a/tests/test_taskqueue.py b/tests/test_taskqueue.py index 20ac95d..b493272 100644 --- a/tests/test_taskqueue.py +++ b/tests/test_taskqueue.py @@ -3,6 +3,7 @@ from unittest.mock import patch import json from random import randrange +import time_machine import pika from prometheus_client import REGISTRY @@ -73,7 +74,6 @@ def test_task_queue(self): assert status["finished"] == 0, status assert status["pending"] == 1, status assert status["running"] == 0, status - assert status["end_time"] is None started = unpack_datetime(status["start_time"]) last_updated = unpack_datetime(status["last_update"]) assert started < last_updated @@ -121,10 +121,8 @@ def test_task_queue(self): assert status["finished"] == 0, status assert status["pending"] == 0, status assert status["running"] == 0, status - # started = unpack_datetime(status["start_time"]) - # last_updated = unpack_datetime(status["last_update"]) - # end_time = unpack_datetime(status["end_time"]) - # assert started < end_time < last_updated + assert status["start_time"] is None + assert status["last_update"] is None @patch("servicelayer.taskqueue.Dataset.should_execute") def test_task_that_shouldnt_execute(self, mock_should_execute): @@ -192,6 +190,199 @@ def did_nack(): assert dataset.is_task_tracked(Task(**body)) +def test_dataset_get_status(): + conn = get_fakeredis() + conn.flushdb() + + dataset = Dataset(conn=conn, name="123") + status = dataset.get_status() + + assert status["pending"] == 0 + assert status["running"] == 0 + assert status["finished"] == 0 + assert status["start_time"] is None + assert status["last_update"] is None + + task_one = Task( + task_id="1", + job_id="abc", + delivery_tag="", + operation="ingest", + context={}, + payload={}, + priority=5, + collection_id="1", + ) + + task_two = Task( + task_id="2", + job_id="abc", + delivery_tag="", + operation="ingest", + context={}, + payload={}, + priority=5, + collection_id="1", + ) + + task_three = Task( + task_id="3", + job_id="abc", + delivery_tag="", + operation="index", + context={}, + payload={}, + priority=5, + collection_id="1", + ) + + # Adding a task updates `start_time` and `last_update` + with time_machine.travel("2024-01-01T00:00:00"): + dataset.add_task(task_one.task_id, task_one.operation) + + status = dataset.get_status() + assert status["pending"] == 1 + assert status["running"] == 0 + assert status["finished"] == 0 + assert status["start_time"].startswith("2024-01-01T00:00:00") + assert status["last_update"].startswith("2024-01-01T00:00:00") + + # Once a worker starts processing a task, only `last_update` is updated + with time_machine.travel("2024-01-02T00:00:00"): + dataset.checkout_task(task_one.task_id, task_one.operation) + + status = dataset.get_status() + assert status["pending"] == 0 + assert status["running"] == 1 + assert status["finished"] == 0 + assert status["start_time"].startswith("2024-01-01T00:00:00") + assert status["last_update"].startswith("2024-01-02T00:00:00") + + # When another task is added, only `last_update` is updated + with time_machine.travel("2024-01-03T00:00:00"): + dataset.add_task(task_two.task_id, task_two.operation) + + status = dataset.get_status() + assert status["pending"] == 1 + assert status["running"] == 1 + assert status["finished"] == 0 + assert status["start_time"].startswith("2024-01-01T00:00:00") + assert status["last_update"].startswith("2024-01-03T00:00:00") + + # When the first task has been processed, `last_update` is updated + with time_machine.travel("2024-01-04T00:00:00"): + dataset.mark_done(task_one) + + status = dataset.get_status() + assert status["pending"] == 1 + assert status["running"] == 0 + assert status["finished"] == 1 + assert status["start_time"].startswith("2024-01-01T00:00:00") + assert status["last_update"].startswith("2024-01-04T00:00:00") + + # When the worker starts processing the second task, only `last_update` is updated + with time_machine.travel("2024-01-05T00:00:00"): + dataset.checkout_task(task_two.task_id, task_two.operation) + + status = dataset.get_status() + assert status["pending"] == 0 + assert status["running"] == 1 + assert status["finished"] == 1 + assert status["start_time"].startswith("2024-01-01T00:00:00") + assert status["last_update"].startswith("2024-01-05T00:00:00") + + # Once all tasks have been processed, status data is flushed + with time_machine.travel("2024-01-06T00:00:00"): + dataset.mark_done(task_two) + + status = dataset.get_status() + assert status["pending"] == 0 + assert status["running"] == 0 + assert status["finished"] == 0 + assert status["start_time"] is None + assert status["last_update"] is None + + # Adding a new task to an inactive dataset sets `start_time` + with time_machine.travel("2024-01-07T00:00:00"): + dataset.add_task(task_three.task_id, task_three.operation) + + status = dataset.get_status() + assert status["pending"] == 1 + assert status["running"] == 0 + assert status["finished"] == 0 + assert status["start_time"].startswith("2024-01-07T00:00:00") + assert status["last_update"].startswith("2024-01-07T00:00:00") + + # Cancelling a dataset flushes status data + with time_machine.travel("2024-01-08T00:00:00"): + dataset.checkout_task(task_three.task_id, task_three.operation) + dataset.cancel() + + status = dataset.get_status() + assert status["pending"] == 0 + assert status["running"] == 0 + assert status["finished"] == 0 + assert status["start_time"] is None + assert status["last_update"] is None + + # Tasks that were already running when the dataset was cancelled + # have no effect + with time_machine.travel("2024-01-09T00:00:00"): + dataset.mark_done(task_three) + + assert status["pending"] == 0 + assert status["running"] == 0 + assert status["finished"] == 0 + assert status["start_time"] is None + assert status["last_update"] is None + + +def test_dataset_cancel(): + conn = get_fakeredis() + conn.flushdb() + + dataset = Dataset(conn=conn, name="abc") + assert conn.keys() == [] + + # Enqueueing tasks stores status data in Redis + dataset.add_task("1", "ingest") + dataset.add_task("2", "index") + dataset.checkout_task("1", "ingest") + assert conn.keys() != [] + + # Cancelling a dataset removes associated data from Redis + dataset.cancel() + assert conn.keys() == [] + + +def test_dataset_mark_done(): + conn = get_fakeredis() + conn.flushdb() + + dataset = Dataset(conn=conn, name="abc") + assert conn.keys() == [] + + task = Task( + task_id="1", + job_id="abc", + delivery_tag="", + operation="ingest", + context={}, + payload={}, + priority=5, + collection_id="abc", + ) + + # Enqueueing a task stores status data in Redis + dataset.add_task(task.task_id, task.operation) + dataset.checkout_task(task.task_id, task.operation) + assert conn.keys() != [] + + # Marking the last task as done cleans up status data in Redis + dataset.mark_done(task) + assert conn.keys() == [] + + @pytest.fixture def prom_registry(): # This relies on internal implementation details of the client to reset @@ -325,7 +516,6 @@ def test_get_priority_bucket(): } ], "start_time": "2024-06-25T10:58:49.779811", - "end_time": None, "last_update": "2024-06-25T10:58:49.779819", } }, @@ -354,7 +544,6 @@ def test_get_priority_bucket(): } ], "start_time": "2024-06-25T10:58:49.779811", - "end_time": None, "last_update": "2024-06-25T10:58:49.779819", } }, From 995743fcc73f2818411aa852d5a096807144a3e3 Mon Sep 17 00:00:00 2001 From: Alex Stefanescu Date: Fri, 19 Jul 2024 14:40:16 +0200 Subject: [PATCH 3/9] =?UTF-8?q?Bump=20version:=201.23.0-rc32=20=E2=86=92?= =?UTF-8?q?=201.23.0-rc33?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .bumpversion.cfg | 2 +- servicelayer/__init__.py | 2 +- setup.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.bumpversion.cfg b/.bumpversion.cfg index 6d4ff99..48d3075 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 1.23.0-rc32 +current_version = 1.23.0-rc33 commit = True tag = True parse = (?P\d+)\.(?P\d+)\.(?P\d+)([-](?P(pre|rc))(?P\d+))? diff --git a/servicelayer/__init__.py b/servicelayer/__init__.py index 62e19d2..632fe6a 100644 --- a/servicelayer/__init__.py +++ b/servicelayer/__init__.py @@ -1,6 +1,6 @@ import logging -__version__ = "1.23.0-rc32" +__version__ = "1.23.0-rc33" logging.getLogger("boto3").setLevel(logging.WARNING) logging.getLogger("botocore").setLevel(logging.WARNING) diff --git a/setup.py b/setup.py index 786f954..0ecca8c 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ setup( name="servicelayer", - version="1.23.0-rc32", + version="1.23.0-rc33", description="Basic remote service functions for alephdata components", classifiers=[ "Development Status :: 3 - Alpha", From f97f0864e1a1abe41d645c63141a1103d0406a29 Mon Sep 17 00:00:00 2001 From: Christian Stefanescu Date: Tue, 3 Sep 2024 16:54:09 +0200 Subject: [PATCH 4/9] Don't show retry flag when a task succeeded --- servicelayer/taskqueue.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/servicelayer/taskqueue.py b/servicelayer/taskqueue.py index 9bf0610..b87a0bc 100644 --- a/servicelayer/taskqueue.py +++ b/servicelayer/taskqueue.py @@ -513,7 +513,7 @@ def process_blocking(self): success, retry = self.handle(task, channel) log.debug( f"Task {task.task_id} finished with success={success}" - f" and retry={retry}" + f"{'' if success else ' and retry=' + str(retry)}" ) if success: cb = functools.partial(self.ack_message, task, channel) From 8a4f717e9334972f7e4a9eebe4e623f8456ed056 Mon Sep 17 00:00:00 2001 From: Christian Stefanescu Date: Tue, 3 Sep 2024 17:07:00 +0200 Subject: [PATCH 5/9] s/docker-compose/docker compose --- Makefile | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Makefile b/Makefile index d2626a7..5005383 100644 --- a/Makefile +++ b/Makefile @@ -4,7 +4,7 @@ all: clean install test .PHONY: build build-docker: - docker-compose build --no-rm --parallel + docker compose build --no-rm --parallel install: pip install -q -e . @@ -16,7 +16,7 @@ dev: python3 -m pip install -q -r requirements-dev.txt test: - docker-compose run --rm shell pytest --cov=servicelayer + docker compose run --rm shell pytest --cov=servicelayer @echo "⚠️ you might notice a warning about a fairy from SQLAlchemy" @echo "this is fixed in a newer release -- see https://github.com/sqlalchemy/sqlalchemy/issues/10414" @echo "we are ignoring this for now" @@ -31,7 +31,7 @@ format-check: black --check . shell: - docker-compose run --rm shell + docker compose run --rm shell build: python3 setup.py sdist bdist_wheel From 85d85448fece04880810cfdd225619cd65259597 Mon Sep 17 00:00:00 2001 From: Alex Stefanescu Date: Wed, 11 Sep 2024 15:24:06 +0200 Subject: [PATCH 6/9] Add _channel attr in test runs, too --- servicelayer/taskqueue.py | 1 + 1 file changed, 1 insertion(+) diff --git a/servicelayer/taskqueue.py b/servicelayer/taskqueue.py index b87a0bc..cc7f568 100644 --- a/servicelayer/taskqueue.py +++ b/servicelayer/taskqueue.py @@ -541,6 +541,7 @@ def process_nonblocking(self): else: queue_active[queue] = True task = get_task(body, method.delivery_tag) + task._channel = channel success, retry = self.handle(task, channel) if success: channel.basic_ack(task.delivery_tag) From 24c4b7fa8f3132249c55cb41e8578c3782d95848 Mon Sep 17 00:00:00 2001 From: Christian Stefanescu Date: Tue, 24 Sep 2024 14:06:08 +0200 Subject: [PATCH 7/9] Revert "Add _channel attr in test runs, too" This reverts commit 85d85448fece04880810cfdd225619cd65259597. --- servicelayer/taskqueue.py | 1 - 1 file changed, 1 deletion(-) diff --git a/servicelayer/taskqueue.py b/servicelayer/taskqueue.py index cc7f568..b87a0bc 100644 --- a/servicelayer/taskqueue.py +++ b/servicelayer/taskqueue.py @@ -541,7 +541,6 @@ def process_nonblocking(self): else: queue_active[queue] = True task = get_task(body, method.delivery_tag) - task._channel = channel success, retry = self.handle(task, channel) if success: channel.basic_ack(task.delivery_tag) From 273496bfb1a20bb1cfb023cde1fc26661c1aefcd Mon Sep 17 00:00:00 2001 From: Alex Stefanescu Date: Tue, 24 Sep 2024 15:53:30 +0200 Subject: [PATCH 8/9] Add _channel attr to task obj in tests --- servicelayer/taskqueue.py | 1 + 1 file changed, 1 insertion(+) diff --git a/servicelayer/taskqueue.py b/servicelayer/taskqueue.py index b87a0bc..cc7f568 100644 --- a/servicelayer/taskqueue.py +++ b/servicelayer/taskqueue.py @@ -541,6 +541,7 @@ def process_nonblocking(self): else: queue_active[queue] = True task = get_task(body, method.delivery_tag) + task._channel = channel success, retry = self.handle(task, channel) if success: channel.basic_ack(task.delivery_tag) From 83c62cb6ac3dc960cfd9371a482a2dbe7c72f0fb Mon Sep 17 00:00:00 2001 From: Alex Stefanescu Date: Tue, 24 Sep 2024 16:19:34 +0200 Subject: [PATCH 9/9] =?UTF-8?q?Bump=20version:=201.23.0-rc33=20=E2=86=92?= =?UTF-8?q?=201.23.0-rc34?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .bumpversion.cfg | 2 +- servicelayer/__init__.py | 2 +- setup.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.bumpversion.cfg b/.bumpversion.cfg index 48d3075..fc99921 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 1.23.0-rc33 +current_version = 1.23.0-rc34 commit = True tag = True parse = (?P\d+)\.(?P\d+)\.(?P\d+)([-](?P(pre|rc))(?P\d+))? diff --git a/servicelayer/__init__.py b/servicelayer/__init__.py index 632fe6a..5ab37d9 100644 --- a/servicelayer/__init__.py +++ b/servicelayer/__init__.py @@ -1,6 +1,6 @@ import logging -__version__ = "1.23.0-rc33" +__version__ = "1.23.0-rc34" logging.getLogger("boto3").setLevel(logging.WARNING) logging.getLogger("botocore").setLevel(logging.WARNING) diff --git a/setup.py b/setup.py index 0ecca8c..be46f92 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ setup( name="servicelayer", - version="1.23.0-rc33", + version="1.23.0-rc34", description="Basic remote service functions for alephdata components", classifiers=[ "Development Status :: 3 - Alpha",