Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Redis keys #201

Merged
merged 9 commits into from
Oct 8, 2024
Merged
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 1.23.0-rc34
current_version = 1.23.0-rc36
commit = True
tag = True
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)([-](?P<release>(pre|rc))(?P<build>\d+))?
Expand Down
2 changes: 1 addition & 1 deletion servicelayer/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging

__version__ = "1.23.0-rc34"
__version__ = "1.23.0-rc36"

logging.getLogger("boto3").setLevel(logging.WARNING)
logging.getLogger("botocore").setLevel(logging.WARNING)
105 changes: 47 additions & 58 deletions servicelayer/taskqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,10 @@ def flush_status(self, pipe):

# delete information about running stages
for stage in self.conn.smembers(self.active_stages_key):
stage_key = self.get_stage_key(stage)
pipe.delete(stage_key)
pipe.delete(make_key(stage_key, "pending"))
pipe.delete(make_key(stage_key, "running"))
pipe.delete(make_key(stage_key, "finished"))
pipe.delete(make_key(PREFIX, "qds", self.name, stage))
pipe.delete(make_key(PREFIX, "qds", self.name, stage, "pending"))
pipe.delete(make_key(PREFIX, "qds", self.name, stage, "running"))
pipe.delete(make_key(PREFIX, "qds", self.name, stage, "finished"))

# delete information about tasks per dataset
pipe.delete(self.pending_key)
Expand All @@ -125,20 +124,23 @@ def get_status(self):
status["last_update"] = last_update

for stage in self.conn.smembers(self.active_stages_key):
stage_key = self.get_stage_key(stage)
num_pending = unpack_int(
self.conn.scard(make_key(PREFIX, "qds", self.name, stage, "pending"))
)
num_running = unpack_int(
self.conn.scard(make_key(PREFIX, "qds", self.name, stage, "running"))
)
num_finished = unpack_int(
self.conn.get(make_key(PREFIX, "qds", self.name, stage, "finished"))
)

status["stages"].append(
{
"job_id": "",
"stage": stage,
"pending": max(
0, unpack_int(self.conn.scard(make_key(stage_key, "pending")))
),
"running": max(
0, unpack_int(self.conn.scard(make_key(stage_key, "running")))
),
"finished": max(
0, unpack_int(self.conn.get(make_key(stage_key, "finished")))
),
"pending": max(0, num_pending),
"running": max(0, num_running),
"finished": max(0, num_finished),
}
)

Expand Down Expand Up @@ -207,14 +209,10 @@ def add_task(self, task_id, stage):
# add the dataset to active datasets
pipe.sadd(self.key, self.name)

# add the stage to the list of active stages per dataset
# update status of stages per dataset
pipe.sadd(self.active_stages_key, stage)

# add the task to the set of tasks per stage
# and the set of pending tasks per stage
stage_key = self.get_stage_key(stage)
pipe.sadd(stage_key, task_id)
pipe.sadd(make_key(stage_key, "pending"), task_id)
pipe.sadd(make_key(PREFIX, "qds", self.name, stage), task_id)
pipe.sadd(make_key(PREFIX, "qds", self.name, stage, "pending"), task_id)

# add the task to the set of pending tasks per dataset
pipe.sadd(self.pending_key, task_id)
Expand All @@ -232,11 +230,8 @@ def remove_task(self, task_id, stage):
# remove the task from the set of pending tasks per dataset
pipe.srem(self.pending_key, task_id)

# remove the task from the set of tasks per stage
# and the set of pending tasks per stage
stage_key = self.get_stage_key(stage)
pipe.srem(stage_key, task_id)
pipe.srem(make_key(stage_key, "pending"), task_id)
pipe.srem(make_key(PREFIX, "qds", self.name, stage), task_id)
pipe.srem(make_key(PREFIX, "qds", self.name, stage, "pending"), task_id)

# delete the retry key for this task
pipe.delete(make_key(PREFIX, "qdj", self.name, "taskretry", task_id))
Expand All @@ -255,16 +250,11 @@ def checkout_task(self, task_id, stage):
# add the dataset to active datasets
pipe.sadd(self.key, self.name)

# add the stage to the list of active stages per dataset
# update status of stages per dataset
pipe.sadd(self.active_stages_key, stage)

# add the task to the set of tasks per stage
# and the set of running tasks per stage
stage_key = self.get_stage_key(stage)
pipe.sadd(stage_key, task_id)
pipe.sadd(make_key(stage_key, "running"), task_id)
# remove the task from the set of pending tasks per stage
pipe.srem(make_key(stage_key, "pending"), task_id)
pipe.sadd(make_key(PREFIX, "qds", self.name, stage), task_id)
pipe.srem(make_key(PREFIX, "qds", self.name, stage, "pending"), task_id)
pipe.sadd(make_key(PREFIX, "qds", self.name, stage, "running"), task_id)

# add the task to the set of running tasks per dataset
pipe.sadd(self.running_key, task_id)
Expand All @@ -291,14 +281,12 @@ def mark_done(self, task: Task):
# delete the retry key for the task
pipe.delete(task.retry_key)

# remove the task from the set of tasks per stage
# and the pending and running tasks per stage
stage_key = self.get_stage_key(task.operation)
pipe.srem(stage_key, task.task_id)
pipe.srem(make_key(stage_key, "pending"), task.task_id)
pipe.srem(make_key(stage_key, "running"), task.task_id)
# increase the number of finished tasks per stage
pipe.incr(make_key(stage_key, "finished"))
stage = task.operation
task_id = task.task_id
pipe.srem(make_key(PREFIX, "qds", self.name, stage), task_id)
pipe.srem(make_key(PREFIX, "qds", self.name, stage, "pending"), task_id)
pipe.srem(make_key(PREFIX, "qds", self.name, stage, "running"), task_id)
pipe.incr(make_key(PREFIX, "qds", self.name, stage, "finished"))

# update dataset timestamps
pipe.set(self.last_update_key, pack_now())
Expand All @@ -312,45 +300,44 @@ def mark_done(self, task: Task):

def mark_for_retry(self, task):
pipe = self.conn.pipeline()

log.info(
f"Marking task {task.task_id} (stage {task.operation})"
f" for retry after NACK"
)

stage = task.operation
task_id = task.task_id

# remove the task from the pending and running sets of tasks per dataset
pipe.srem(self.pending_key, task.task_id)
pipe.srem(self.running_key, task.task_id)
pipe.srem(self.pending_key, task_id)
pipe.srem(self.running_key, task_id)

# remove the task from the set of tasks per stage
# and the set of running tasks per stage
stage_key = self.get_stage_key(task.operation)
pipe.srem(stage_key, task.task_id)
pipe.srem(make_key(stage_key, "running"), task.task_id)
pipe.srem(make_key(PREFIX, "qds", self.name, stage, "pending"), task_id)
pipe.srem(make_key(PREFIX, "qds", self.name, stage, "running"), task_id)
pipe.srem(make_key(PREFIX, "qds", self.name, stage), task_id)

# delete the retry key for the task
pipe.delete(task.retry_key)

pipe.set(self.last_update_key, pack_now())

pipe.execute()

def is_done(self):
status = self.get_status()
return status["pending"] == 0 and status["running"] == 0

def __str__(self):
return self.name

def get_stage_key(self, stage):
return make_key(PREFIX, "qds", self.name, stage)

def is_task_tracked(self, task: Task):
tracked = True

dataset = dataset_from_collection_id(task.collection_id)
task_id = task.task_id
stage = task.operation

stage_key = self.get_stage_key(stage)

# A task is considered tracked if
# the dataset is in the list of active datasets
if dataset not in self.conn.smembers(self.key):
Expand All @@ -359,7 +346,9 @@ def is_task_tracked(self, task: Task):
elif stage not in self.conn.smembers(self.active_stages_key):
tracked = False
# and the task_id is in the list of task_ids per stage
elif task_id not in self.conn.smembers(stage_key):
elif task_id not in self.conn.smembers(
make_key(PREFIX, "qds", self.name, stage)
):
tracked = False

return tracked
Expand Down Expand Up @@ -687,7 +676,7 @@ def process():
return self.process(blocking=True)

if not self.num_threads:
# TODO - seems like we need at least one thread
# we need at least one thread
# consuming and processing require separate threads
self.num_threads = 1

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

setup(
name="servicelayer",
version="1.23.0-rc34",
version="1.23.0-rc36",
description="Basic remote service functions for alephdata components",
classifiers=[
"Development Status :: 3 - Alpha",
Expand Down
39 changes: 39 additions & 0 deletions tests/test_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from unittest import TestCase


from servicelayer.cache import get_fakeredis
from servicelayer.taskqueue import (
Dataset,
dataset_from_collection_id,
)


class TestDataset(TestCase):
def setUp(self):
self.connection = get_fakeredis()
self.connection.flushdb()
self.collection_id = 1

self.dataset = Dataset(
conn=self.connection, name=dataset_from_collection_id(self.collection_id)
)

def test_get_active_datasets_key(self):
assert self.dataset.key == "tq:qdatasets"

def test_get_active_stages_key(self):
assert (
self.dataset.active_stages_key
== f"tq:qds:{self.collection_id}:active_stages"
)

def test_get_timestamp_keys(self):
assert self.dataset.start_key == f"tq:qdj:{self.collection_id}:start"
assert (
self.dataset.last_update_key == f"tq:qdj:{self.collection_id}:last_update"
)

def test_tasks_per_collection_keys(self):
assert self.dataset.finished_key == f"tq:qdj:{self.collection_id}:finished"
assert self.dataset.running_key == f"tq:qdj:{self.collection_id}:running"
assert self.dataset.pending_key == f"tq:qdj:{self.collection_id}:pending"
Loading