From 13bb2826d36d7b1e2edf1fe95be3f9db5d333d07 Mon Sep 17 00:00:00 2001 From: Christian Stefanescu Date: Tue, 5 Dec 2023 18:39:38 +0100 Subject: [PATCH] Adds a last_updated timestamp to the dataset status (this works for the redis queue implementation) --- servicelayer/jobs.py | 16 +++++++++++++++- tests/test_jobs.py | 16 ++++++++++++++++ 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/servicelayer/jobs.py b/servicelayer/jobs.py index 0499bd2..8ba33d1 100644 --- a/servicelayer/jobs.py +++ b/servicelayer/jobs.py @@ -87,6 +87,7 @@ def __init__(self, conn, dataset, job_id): # noqa self.dataset = Dataset.ensure(conn, dataset) self.start_key = make_key(PREFIX, "qd", self.id, dataset, "start") self.end_key = make_key(PREFIX, "qd", self.id, dataset, "end") + self.last_update_key = make_key(PREFIX, "qd", self.id, dataset, "last_update") self.active_jobs_key = make_key(PREFIX, "qdja") def get_stage(self, name): @@ -117,6 +118,7 @@ def _create(self, pipe): pipe.sadd(self.active_jobs_key, make_key(self.dataset.name, self.id)) pipe.delete(self.end_key) pipe.setnx(self.start_key, pack_now()) + self.touch_last_updated(pipe) def _remove(self, pipe): for stage in self.get_stages(): @@ -125,6 +127,7 @@ def _remove(self, pipe): pipe.srem(self.dataset.jobs_key, self.id) pipe.srem(self.active_jobs_key, make_key(self.dataset.name, self.id)) pipe.delete(self.start_key) + pipe.delete(self.last_update_key) pipe.setnx(self.end_key, pack_now()) pipe.expire(self.end_key, REDIS_EXPIRE) @@ -144,9 +147,12 @@ def _get_active_keys(self): def get_status(self): """Aggregate status for all stages on the given job.""" status = {"finished": 0, "running": 0, "pending": 0, "stages": []} - start, end = self.conn.mget((self.start_key, self.end_key)) + start, end, last_update = self.conn.mget( + (self.start_key, self.end_key, self.last_update_key) + ) status["start_time"] = start status["end_time"] = end + status["last_update"] = last_update for stage in self.get_stages(): progress = stage.get_status() status["stages"].append(progress) @@ -155,6 +161,10 @@ def get_status(self): status["pending"] += progress["pending"] return status + def touch_last_updated(self, pipe): + """Update the last_updated key for this job.""" + pipe.set(self.last_update_key, pack_now()) + @classmethod def random_id(cls): return uuid.uuid4().hex @@ -194,6 +204,7 @@ def _check_out(self, count=1): self._create(pipe) pipe.decr(self.pending_key, amount=count) pipe.incr(self.running_key, amount=count) + self.job.touch_last_updated(pipe) pipe.execute() def mark_done(self, count=1): @@ -202,6 +213,7 @@ def mark_done(self, count=1): self._create(pipe) pipe.decr(self.running_key, amount=count) pipe.incr(self.finished_key, amount=count) + self.job.touch_last_updated(pipe) pipe.execute() def report_finished(self, count=1): @@ -209,6 +221,7 @@ def report_finished(self, count=1): pipe = self.conn.pipeline() self._create(pipe) pipe.incr(self.finished_key, amount=count) + self.job.touch_last_updated(pipe) pipe.execute() def queue(self, payload={}, context={}): @@ -218,6 +231,7 @@ def queue(self, payload={}, context={}): self._create(pipe) pipe.rpush(self.queue_key, data) pipe.incr(self.pending_key) + self.job.touch_last_updated(pipe) pipe.execute() return task diff --git a/tests/test_jobs.py b/tests/test_jobs.py index 6b04b98..e4c8db2 100644 --- a/tests/test_jobs.py +++ b/tests/test_jobs.py @@ -1,7 +1,10 @@ +import datetime + from unittest import TestCase from servicelayer.cache import get_fakeredis from servicelayer.jobs import Job, Stage, Task, Dataset +from servicelayer.util import unpack_datetime class ProcessTest(TestCase): @@ -11,6 +14,7 @@ def setUp(self): self.dataset = "test_1" def test_job_queue(self): + ds = Dataset(self.conn, self.dataset) job = Job.create(self.conn, self.dataset) stage = job.get_stage("ingest") status = stage.get_status() @@ -19,6 +23,7 @@ def test_job_queue(self): assert job.is_done() stage.queue({"test": "foo"}, {}) status = job.get_status() + last_updated_pending = ds.get_status()["jobs"][0]["last_update"] assert status["pending"] == 1 assert status["finished"] == 0 assert status["running"] == 0 @@ -31,12 +36,16 @@ def test_job_queue(self): assert status["running"] == 1 assert status["finished"] == 0 assert not job.is_done() + last_updated_running = ds.get_status()["jobs"][0]["last_update"] + assert last_updated_running > last_updated_pending task.done() status = job.get_status() assert status["pending"] == 0 assert status["running"] == 0 assert status["finished"] == 1 assert job.is_done() + last_updated_finished = ds.get_status()["jobs"][0]["last_update"] + assert last_updated_finished > last_updated_running def test_queue_clear(self): job = Job.create(self.conn, self.dataset) @@ -91,6 +100,13 @@ def test_active_dataset_status(self): assert len(status["datasets"]) == 1 assert status["total"] == 1 assert status["datasets"]["test_1"]["pending"] == 2 + started = status["datasets"]["test_1"]["jobs"][0]["start_time"] + assert started + last_updated = status["datasets"]["test_1"]["jobs"][0]["last_update"] + assert last_updated + assert abs( + unpack_datetime(started) - unpack_datetime(last_updated) + ) < datetime.timedelta(seconds=1) job.dataset.cancel() status = Dataset.get_active_dataset_status(self.conn) assert status["datasets"] == {}