Skip to content

Commit

Permalink
Adds a last_updated timestamp to the dataset status
Browse files Browse the repository at this point in the history
(this works for the redis queue implementation)
  • Loading branch information
stchris committed Dec 5, 2023
1 parent bd68c29 commit 13bb282
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 1 deletion.
16 changes: 15 additions & 1 deletion servicelayer/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ def __init__(self, conn, dataset, job_id): # noqa
self.dataset = Dataset.ensure(conn, dataset)
self.start_key = make_key(PREFIX, "qd", self.id, dataset, "start")
self.end_key = make_key(PREFIX, "qd", self.id, dataset, "end")
self.last_update_key = make_key(PREFIX, "qd", self.id, dataset, "last_update")
self.active_jobs_key = make_key(PREFIX, "qdja")

def get_stage(self, name):
Expand Down Expand Up @@ -117,6 +118,7 @@ def _create(self, pipe):
pipe.sadd(self.active_jobs_key, make_key(self.dataset.name, self.id))
pipe.delete(self.end_key)
pipe.setnx(self.start_key, pack_now())
self.touch_last_updated(pipe)

def _remove(self, pipe):
for stage in self.get_stages():
Expand All @@ -125,6 +127,7 @@ def _remove(self, pipe):
pipe.srem(self.dataset.jobs_key, self.id)
pipe.srem(self.active_jobs_key, make_key(self.dataset.name, self.id))
pipe.delete(self.start_key)
pipe.delete(self.last_update_key)
pipe.setnx(self.end_key, pack_now())
pipe.expire(self.end_key, REDIS_EXPIRE)

Expand All @@ -144,9 +147,12 @@ def _get_active_keys(self):
def get_status(self):
"""Aggregate status for all stages on the given job."""
status = {"finished": 0, "running": 0, "pending": 0, "stages": []}
start, end = self.conn.mget((self.start_key, self.end_key))
start, end, last_update = self.conn.mget(
(self.start_key, self.end_key, self.last_update_key)
)
status["start_time"] = start
status["end_time"] = end
status["last_update"] = last_update
for stage in self.get_stages():
progress = stage.get_status()
status["stages"].append(progress)
Expand All @@ -155,6 +161,10 @@ def get_status(self):
status["pending"] += progress["pending"]
return status

def touch_last_updated(self, pipe):
"""Update the last_updated key for this job."""
pipe.set(self.last_update_key, pack_now())

@classmethod
def random_id(cls):
return uuid.uuid4().hex
Expand Down Expand Up @@ -194,6 +204,7 @@ def _check_out(self, count=1):
self._create(pipe)
pipe.decr(self.pending_key, amount=count)
pipe.incr(self.running_key, amount=count)
self.job.touch_last_updated(pipe)
pipe.execute()

def mark_done(self, count=1):
Expand All @@ -202,13 +213,15 @@ def mark_done(self, count=1):
self._create(pipe)
pipe.decr(self.running_key, amount=count)
pipe.incr(self.finished_key, amount=count)
self.job.touch_last_updated(pipe)
pipe.execute()

def report_finished(self, count=1):
"""Inflate the number of finished tasks."""
pipe = self.conn.pipeline()
self._create(pipe)
pipe.incr(self.finished_key, amount=count)
self.job.touch_last_updated(pipe)
pipe.execute()

def queue(self, payload={}, context={}):
Expand All @@ -218,6 +231,7 @@ def queue(self, payload={}, context={}):
self._create(pipe)
pipe.rpush(self.queue_key, data)
pipe.incr(self.pending_key)
self.job.touch_last_updated(pipe)
pipe.execute()
return task

Expand Down
16 changes: 16 additions & 0 deletions tests/test_jobs.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import datetime

from unittest import TestCase

from servicelayer.cache import get_fakeredis
from servicelayer.jobs import Job, Stage, Task, Dataset
from servicelayer.util import unpack_datetime


class ProcessTest(TestCase):
Expand All @@ -11,6 +14,7 @@ def setUp(self):
self.dataset = "test_1"

def test_job_queue(self):
ds = Dataset(self.conn, self.dataset)
job = Job.create(self.conn, self.dataset)
stage = job.get_stage("ingest")
status = stage.get_status()
Expand All @@ -19,6 +23,7 @@ def test_job_queue(self):
assert job.is_done()
stage.queue({"test": "foo"}, {})
status = job.get_status()
last_updated_pending = ds.get_status()["jobs"][0]["last_update"]
assert status["pending"] == 1
assert status["finished"] == 0
assert status["running"] == 0
Expand All @@ -31,12 +36,16 @@ def test_job_queue(self):
assert status["running"] == 1
assert status["finished"] == 0
assert not job.is_done()
last_updated_running = ds.get_status()["jobs"][0]["last_update"]
assert last_updated_running > last_updated_pending
task.done()
status = job.get_status()
assert status["pending"] == 0
assert status["running"] == 0
assert status["finished"] == 1
assert job.is_done()
last_updated_finished = ds.get_status()["jobs"][0]["last_update"]
assert last_updated_finished > last_updated_running

def test_queue_clear(self):
job = Job.create(self.conn, self.dataset)
Expand Down Expand Up @@ -91,6 +100,13 @@ def test_active_dataset_status(self):
assert len(status["datasets"]) == 1
assert status["total"] == 1
assert status["datasets"]["test_1"]["pending"] == 2
started = status["datasets"]["test_1"]["jobs"][0]["start_time"]
assert started
last_updated = status["datasets"]["test_1"]["jobs"][0]["last_update"]
assert last_updated
assert abs(
unpack_datetime(started) - unpack_datetime(last_updated)
) < datetime.timedelta(seconds=1)
job.dataset.cancel()
status = Dataset.get_active_dataset_status(self.conn)
assert status["datasets"] == {}
Expand Down

0 comments on commit 13bb282

Please sign in to comment.