Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Redis keys #201

Merged
merged 9 commits into from
Oct 8, 2024
119 changes: 77 additions & 42 deletions servicelayer/taskqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,17 @@ def flush_status(self, pipe):

# delete information about running stages
for stage in self.conn.smembers(self.active_stages_key):
# TODO - delete this block after all wrong keys are gone
stage_key = self.get_stage_key(stage)
pipe.delete(stage_key)
pipe.delete(make_key(stage_key, "pending"))
pipe.delete(make_key(stage_key, "running"))
pipe.delete(make_key(stage_key, "finished"))

pipe.delete(make_key(PREFIX, "qds", self.name, stage))
pipe.delete(make_key(PREFIX, "qds", self.name, stage, "pending"))
pipe.delete(make_key(PREFIX, "qds", self.name, stage, "running"))
pipe.delete(make_key(PREFIX, "qds", self.name, stage, "finished"))

# delete information about tasks per dataset
pipe.delete(self.pending_key)
pipe.delete(self.running_key)
Expand All @@ -126,19 +131,42 @@ def get_status(self):

for stage in self.conn.smembers(self.active_stages_key):
stage_key = self.get_stage_key(stage)

# TODO - uncomment this after all wrong keys are gone
# num_pending = unpack_int(
# self.conn.scard(make_key(PREFIX, "qds", self.name, stage, "pending"))
# )
# num_running = unpack_int(
# self.conn.scard(make_key(PREFIX, "qds", self.name, stage, "running"))
# )
# num_finished = unpack_int(
# self.conn.get(make_key(PREFIX, "qds", self.name, stage, "finished"))
# )

# TODO - temporary hack
num_pending = unpack_int(
self.conn.scard(make_key(stage_key, "pending"))
) + unpack_int(
self.conn.scard(make_key(PREFIX, "qds", self.name, stage, "pending"))
)
num_running = unpack_int(
self.conn.scard(make_key(stage_key, "running"))
) + unpack_int(
self.conn.scard(make_key(PREFIX, "qds", self.name, stage, "running"))
)
num_finished = unpack_int(
self.conn.get(make_key(stage_key, "finished"))
) + unpack_int(
self.conn.get(make_key(PREFIX, "qds", self.name, stage, "finished"))
)

status["stages"].append(
{
"job_id": "",
"stage": stage,
"pending": max(
0, unpack_int(self.conn.scard(make_key(stage_key, "pending")))
),
"running": max(
0, unpack_int(self.conn.scard(make_key(stage_key, "running")))
),
"finished": max(
0, unpack_int(self.conn.get(make_key(stage_key, "finished")))
),
"pending": max(0, num_pending),
"running": max(0, num_running),
"finished": max(0, num_finished),
}
)

Expand Down Expand Up @@ -207,14 +235,10 @@ def add_task(self, task_id, stage):
# add the dataset to active datasets
pipe.sadd(self.key, self.name)

# add the stage to the list of active stages per dataset
# update status of stages per dataset
pipe.sadd(self.active_stages_key, stage)

# add the task to the set of tasks per stage
# and the set of pending tasks per stage
stage_key = self.get_stage_key(stage)
pipe.sadd(stage_key, task_id)
pipe.sadd(make_key(stage_key, "pending"), task_id)
pipe.sadd(make_key(PREFIX, "qds", self.name, stage), task_id)
pipe.sadd(make_key(PREFIX, "qds", self.name, stage, "pending"), task_id)

# add the task to the set of pending tasks per dataset
pipe.sadd(self.pending_key, task_id)
Expand All @@ -232,10 +256,13 @@ def remove_task(self, task_id, stage):
# remove the task from the set of pending tasks per dataset
pipe.srem(self.pending_key, task_id)

# remove the task from the set of tasks per stage
# and the set of pending tasks per stage
pipe.srem(make_key(PREFIX, "qds", self.name, stage), task_id)
pipe.srem(make_key(PREFIX, "qds", self.name, stage, "pending"), task_id)

# TODO - remove this after there are no wrong keys left
# It's fine if this is executed, because an inexistent key is
# treated as an empty set and SREM works fine.
stage_key = self.get_stage_key(stage)
pipe.srem(stage_key, task_id)
pipe.srem(make_key(stage_key, "pending"), task_id)

# delete the retry key for this task
Expand All @@ -255,15 +282,14 @@ def checkout_task(self, task_id, stage):
# add the dataset to active datasets
pipe.sadd(self.key, self.name)

# add the stage to the list of active stages per dataset
# update status of stages per dataset
pipe.sadd(self.active_stages_key, stage)
pipe.sadd(make_key(PREFIX, "qds", self.name, stage), task_id)
pipe.srem(make_key(PREFIX, "qds", self.name, stage, "pending"), task_id)
pipe.sadd(make_key(PREFIX, "qds", self.name, stage, "running"), task_id)

# add the task to the set of tasks per stage
# and the set of running tasks per stage
# TODO - remove after all the wrong keys are gone
stage_key = self.get_stage_key(stage)
pipe.sadd(stage_key, task_id)
pipe.sadd(make_key(stage_key, "running"), task_id)
# remove the task from the set of pending tasks per stage
pipe.srem(make_key(stage_key, "pending"), task_id)

# add the task to the set of running tasks per dataset
Expand Down Expand Up @@ -291,14 +317,17 @@ def mark_done(self, task: Task):
# delete the retry key for the task
pipe.delete(task.retry_key)

# remove the task from the set of tasks per stage
# and the pending and running tasks per stage
stage_key = self.get_stage_key(task.operation)
pipe.srem(stage_key, task.task_id)
pipe.srem(make_key(stage_key, "pending"), task.task_id)
pipe.srem(make_key(stage_key, "running"), task.task_id)
# increase the number of finished tasks per stage
pipe.incr(make_key(stage_key, "finished"))
stage = task.operation
task_id = task.task_id
pipe.srem(make_key(PREFIX, "qds", self.name, stage), task_id)
pipe.srem(make_key(PREFIX, "qds", self.name, stage, "pending"), task_id)
pipe.srem(make_key(PREFIX, "qds", self.name, stage, "running"), task_id)
pipe.incr(make_key(PREFIX, "qds", self.name, stage, "finished"))

# TODO - remove this after all the wrong keys are gone
stage_key = self.get_stage_key(stage)
pipe.srem(make_key(stage_key, "pending"), task_id)
pipe.srem(make_key(stage_key, "running"), task_id)

# update dataset timestamps
pipe.set(self.last_update_key, pack_now())
Expand All @@ -312,19 +341,24 @@ def mark_done(self, task: Task):

def mark_for_retry(self, task):
pipe = self.conn.pipeline()

log.info(
f"Marking task {task.task_id} (stage {task.operation})"
f" for retry after NACK"
)

stage = task.operation
task_id = task.task_id

# remove the task from the pending and running sets of tasks per dataset
pipe.srem(self.pending_key, task.task_id)
pipe.srem(self.running_key, task.task_id)
pipe.srem(self.pending_key, task_id)
pipe.srem(self.running_key, task_id)

pipe.srem(make_key(PREFIX, "qds", self.name, stage, "running"), task_id)
pipe.srem(make_key(PREFIX, "qds", self.name, stage), task_id)

# remove the task from the set of tasks per stage
# and the set of running tasks per stage
# TODO - remove when there are no wrong tasks left
stage_key = self.get_stage_key(task.operation)
pipe.srem(stage_key, task.task_id)
pipe.srem(make_key(stage_key, "running"), task.task_id)

# delete the retry key for the task
Expand All @@ -339,6 +373,7 @@ def is_done(self):
def __str__(self):
return self.name

# TODO - remove this after all the wrong keys are gone
def get_stage_key(self, stage):
return make_key(PREFIX, "qds", self.name, stage)

Expand All @@ -349,8 +384,6 @@ def is_task_tracked(self, task: Task):
task_id = task.task_id
stage = task.operation

stage_key = self.get_stage_key(stage)

# A task is considered tracked if
# the dataset is in the list of active datasets
if dataset not in self.conn.smembers(self.key):
Expand All @@ -359,7 +392,9 @@ def is_task_tracked(self, task: Task):
elif stage not in self.conn.smembers(self.active_stages_key):
tracked = False
# and the task_id is in the list of task_ids per stage
elif task_id not in self.conn.smembers(stage_key):
elif task_id not in self.conn.smembers(
make_key(PREFIX, "qds", self.name, stage)
):
tracked = False

return tracked
Expand Down
45 changes: 45 additions & 0 deletions tests/test_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from unittest import TestCase


from servicelayer.cache import get_fakeredis
from servicelayer.taskqueue import (
Dataset,
dataset_from_collection_id,
)


class TestDataset(TestCase):
def setUp(self):
self.connection = get_fakeredis()
self.connection.flushdb()
self.collection_id = 1

self.dataset = Dataset(
conn=self.connection, name=dataset_from_collection_id(self.collection_id)
)

def test_get_stage_key(self):
stage = "ingest"
assert (
self.dataset.get_stage_key(stage) == f"tq:qds:{self.collection_id}:{stage}"
)

def test_get_active_datasets_key(self):
assert self.dataset.key == "tq:qdatasets"

def test_get_active_stages_key(self):
assert (
self.dataset.active_stages_key
== f"tq:qds:{self.collection_id}:active_stages"
)

def test_get_timestamp_keys(self):
assert self.dataset.start_key == f"tq:qdj:{self.collection_id}:start"
assert (
self.dataset.last_update_key == f"tq:qdj:{self.collection_id}:last_update"
)

def test_tasks_per_collection_keys(self):
assert self.dataset.finished_key == f"tq:qdj:{self.collection_id}:finished"
assert self.dataset.running_key == f"tq:qdj:{self.collection_id}:running"
assert self.dataset.pending_key == f"tq:qdj:{self.collection_id}:pending"
Loading