Skip to content

Commit

Permalink
Refactor Redis keys in order to ensure status cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
catileptic committed Jul 18, 2024
1 parent 3c1c690 commit f7b195e
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 73 deletions.
147 changes: 78 additions & 69 deletions servicelayer/taskqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,25 +87,36 @@ def __init__(self, conn, name):
self.last_update_key = make_key(PREFIX, "qdj", name, "last_update")
self.active_stages_key = make_key(PREFIX, "qds", name, "active_stages")

def cancel(self):
"""Cancel processing of all tasks belonging to a dataset"""
pipe = self.conn.pipeline()
def flush_status(self, pipe):
# remove the dataset from active datasets
pipe.srem(self.key, self.name)
# clean up tasks and task counts
pipe.delete(self.finished_key)
pipe.delete(self.running_key)
pipe.delete(self.pending_key)

# reset timestamps
pipe.delete(self.start_key)
pipe.delete(self.end_key)
pipe.delete(self.last_update_key)

# delete information about running stages
for stage in self.conn.smembers(self.active_stages_key):
stage_key = self.get_stage_key(stage)
pipe.delete(stage_key)
pipe.delete(make_key(stage_key, "pending"))
pipe.delete(make_key(stage_key, "running"))
pipe.delete(make_key(stage_key, "finished"))

# delete information about tasks per dataset
pipe.delete(self.pending_key)
pipe.delete(self.running_key)
pipe.delete(self.finished_key)

# delete stages key
pipe.delete(self.active_stages_key)

def cancel(self):
"""Cancel processing of all tasks belonging to a dataset"""
pipe = self.conn.pipeline()
self.flush_status(pipe)
# What should happen to the end_key in this case?
pipe.delete(self.end_key)
pipe.execute()

def get_status(self):
Expand Down Expand Up @@ -167,24 +178,9 @@ def cleanup_dataset_status(cls, conn):
datasets_key = make_key(PREFIX, "qdatasets")
for name in conn.smembers(datasets_key):
dataset = cls(conn, name)
status = dataset.get_status()
if status["running"] == 0 and status["pending"] == 0:
if dataset.is_done():
pipe = conn.pipeline()
# remove the dataset from active datasets
pipe.srem(dataset.key, dataset.name)
# reset finished task count
pipe.delete(dataset.finished_key)
# delete information about running stages
for stage in dataset.conn.smembers(dataset.active_stages_key):
stage_key = dataset.get_stage_key(stage)
pipe.delete(stage_key)
pipe.delete(make_key(stage_key, "pending"))
pipe.delete(make_key(stage_key, "running"))
pipe.delete(make_key(stage_key, "finished"))
# delete stages key
pipe.delete(dataset.active_stages_key)
pipe.set(dataset.last_update_key, pack_now())

dataset.flush_status(pipe)
pipe.execute()

def should_execute(self, task_id):
Expand Down Expand Up @@ -217,13 +213,19 @@ def add_task(self, task_id, stage):
# add the dataset to active datasets
pipe.sadd(self.key, self.name)

# update status of stages per dataset
stage_key = self.get_stage_key(stage)
# add the stage to the list of active stages per dataset
pipe.sadd(self.active_stages_key, stage)

# add the task to the set of tasks per stage
# and the set of pending tasks per stage
stage_key = self.get_stage_key(stage)
pipe.sadd(stage_key, task_id)
pipe.sadd(make_key(stage_key, "pending"), task_id)

# add the task to the set of pending tasks per dataset
pipe.sadd(self.pending_key, task_id)

# update dataset timestamps
pipe.set(self.start_key, pack_now())
pipe.set(self.last_update_key, pack_now())
pipe.delete(self.end_key)
Expand All @@ -233,48 +235,50 @@ def remove_task(self, task_id, stage):
"""Remove a task that's not going to be executed"""
log.info(f"Removing task: {task_id}")
pipe = self.conn.pipeline()

# remove the task from the set of pending tasks per dataset
pipe.srem(self.pending_key, task_id)

# remove the task from the set of tasks per stage
# and the set of pending tasks per stage
stage_key = self.get_stage_key(stage)
pipe.srem(stage_key, task_id)
pipe.srem(make_key(stage_key, "pending"), task_id)

# delete the retry key for this task
pipe.delete(make_key(PREFIX, "qdj", self.name, "taskretry", task_id))

status = self.get_status()
if status["running"] == 0 and status["pending"] == 0:
# remove the dataset from active datasets
pipe.srem(self.key, self.name)
# reset finished task count
pipe.delete(self.finished_key)
# delete information about running stages
for stage in self.conn.smembers(self.active_stages_key):
stage_key = self.get_stage_key(stage)
pipe.delete(stage_key)
pipe.delete(make_key(stage_key, "pending"))
pipe.delete(make_key(stage_key, "running"))
pipe.delete(make_key(stage_key, "finished"))
# delete stages key
pipe.delete(self.active_stages_key)
pipe.set(self.last_update_key, pack_now())
pipe.execute()

if self.is_done():
pipe = self.conn.pipeline()
self.flush_status(pipe)
pipe.execute()

def checkout_task(self, task_id, stage):
"""Update state when a task is checked out for execution"""
log.info(f"Checking out task: {task_id}")
pipe = self.conn.pipeline()
# add the dataset to active datasets
pipe.sadd(self.key, self.name)

# update status of stages per dataset
stage_key = self.get_stage_key(stage)
# add the stage to the list of active stages per dataset
pipe.sadd(self.active_stages_key, stage)

# add the task to the set of tasks per stage
# and the set of running tasks per stage
stage_key = self.get_stage_key(stage)
pipe.sadd(stage_key, task_id)
pipe.srem(make_key(stage_key, "pending"), task_id)
pipe.sadd(make_key(stage_key, "running"), task_id)
# remove the task from the set of pending tasks per stage
pipe.srem(make_key(stage_key, "pending"), task_id)

pipe.srem(self.pending_key, task_id)
# add the task to the set of running tasks per dataset
pipe.sadd(self.running_key, task_id)
# remove the task from the set of pending tasks per dataset
pipe.srem(self.pending_key, task_id)

# update dataset timestamps
pipe.set(self.start_key, pack_now())
pipe.set(self.last_update_key, pack_now())
pipe.delete(self.end_key)
Expand All @@ -284,51 +288,55 @@ def mark_done(self, task: Task):
"""Update state when a task is finished executing"""
log.info(f"Finished executing task: {task.task_id}")
pipe = self.conn.pipeline()

# remove the task from the pending and running sets of tasks per dataset
pipe.srem(self.pending_key, task.task_id)
pipe.srem(self.running_key, task.task_id)

# increase the number of finished tasks per dataset
pipe.incr(self.finished_key)

# delete the retry key for the task
pipe.delete(task.retry_key)

# remove the task from the set of tasks per stage
# and the pending and running tasks per stage
stage_key = self.get_stage_key(task.operation)
pipe.srem(stage_key, task.task_id)
pipe.srem(make_key(stage_key, "pending"), task.task_id)
pipe.srem(make_key(stage_key, "running"), task.task_id)
# increase the number of finished tasks per stage
pipe.incr(make_key(stage_key, "finished"))

# update dataset timestamps
pipe.set(self.end_key, pack_now())
pipe.set(self.last_update_key, pack_now())
pipe.execute()

status = self.get_status()
if status["running"] == 0 and status["pending"] == 0:
# remove the dataset from active datasets
pipe.srem(self.key, self.name)
# reset finished task count
pipe.delete(self.finished_key)
# delete information about running stages
for stage in self.conn.smembers(self.active_stages_key):
stage_key = self.get_stage_key(stage)
pipe.delete(stage_key)
pipe.delete(make_key(stage_key, "pending"))
pipe.delete(make_key(stage_key, "running"))
pipe.delete(make_key(stage_key, "finished"))
# delete stages key
pipe.delete(self.active_stages_key)

if self.is_done():
pipe = self.conn.pipeline()
self.flush_status(pipe)
pipe.execute()

def mark_for_retry(self, task):
pipe = self.conn.pipeline()
stage_key = self.get_stage_key(task.operation)

log.info(
f"Marking task {task.task_id} (stage {task.operation})"
f" for retry after NACK"
)

# remove the task from the pending and running sets of tasks per dataset
pipe.srem(self.pending_key, task.task_id)
pipe.srem(self.running_key, task.task_id)

# remove the task from the set of tasks per stage
# and the set of running tasks per stage
stage_key = self.get_stage_key(task.operation)
pipe.srem(stage_key, task.task_id)
pipe.srem(make_key(stage_key, "running"), task.task_id)

# delete the retry key for the task
pipe.delete(task.retry_key)
pipe.srem(stage_key, task.task_id)

pipe.set(self.last_update_key, pack_now())

Expand All @@ -345,11 +353,12 @@ def get_stage_key(self, stage):
def is_task_tracked(self, task: Task):
tracked = True

stage_key = self.get_stage_key(task.operation)
dataset = dataset = dataset_from_collection_id(task.collection_id)
dataset = dataset_from_collection_id(task.collection_id)
task_id = task.task_id
stage = task.operation

stage_key = self.get_stage_key(stage)

# A task is considered tracked if
# the dataset is in the list of active datasets
if dataset not in self.conn.smembers(self.key):
Expand Down Expand Up @@ -662,9 +671,9 @@ def nack_message(self, task, channel, requeue=True):
dataset = task.get_dataset(conn=self.conn)
# Sync state to redis
if requeue:
dataset.mark_for_retry(task)
if not dataset.is_task_tracked(task):
dataset.add_task(task.task_id, task.operation)
dataset.mark_for_retry(task)
else:
dataset.mark_done(task)
if channel.is_open:
Expand Down
8 changes: 4 additions & 4 deletions tests/test_taskqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,10 @@ def test_task_queue(self):
assert status["finished"] == 0, status
assert status["pending"] == 0, status
assert status["running"] == 0, status
started = unpack_datetime(status["start_time"])
last_updated = unpack_datetime(status["last_update"])
end_time = unpack_datetime(status["end_time"])
assert started < end_time < last_updated
# started = unpack_datetime(status["start_time"])
# last_updated = unpack_datetime(status["last_update"])
# end_time = unpack_datetime(status["end_time"])
# assert started < end_time < last_updated

@patch("servicelayer.taskqueue.Dataset.should_execute")
def test_task_that_shouldnt_execute(self, mock_should_execute):
Expand Down

0 comments on commit f7b195e

Please sign in to comment.