Skip to content

Commit

Permalink
Add Redis keys for tracking task stages
Browse files Browse the repository at this point in the history
  • Loading branch information
catileptic committed Feb 6, 2024
1 parent 5229f3f commit f6756bf
Showing 1 changed file with 100 additions and 5 deletions.
105 changes: 100 additions & 5 deletions servicelayer/taskqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def __init__(self, conn, name):
self.start_key = make_key(PREFIX, "qdj", name, "start")
self.end_key = make_key(PREFIX, "qdj", name, "end")
self.last_update_key = make_key(PREFIX, "qdj", name, "last_update")
self.active_stages_key = make_key(PREFIX, "qds", name, "active_stages")

def cancel(self):
"""Cancel processing of all tasks belonging to a dataset"""
Expand All @@ -95,11 +96,18 @@ def cancel(self):
pipe.delete(self.start_key)
pipe.delete(self.end_key)
pipe.delete(self.last_update_key)
for stage in self.conn.smembers(self.active_stages_key):
stage_key = self.get_stage_key(stage)
pipe.delete(stage_key)
pipe.delete(make_key(stage_key, "pending"))
pipe.delete(make_key(stage_key, "running"))
pipe.delete(make_key(stage_key, "finished"))
pipe.delete(self.active_stages_key)
pipe.execute()

def get_status(self):
"""Status of a given dataset."""
status = {"finished": 0, "running": 0, "pending": 0}
status = {"finished": 0, "running": 0, "pending": 0, "jobs": []}
finished = self.conn.get(self.finished_key)
running = self.conn.scard(self.running_key)
pending = self.conn.scard(self.pending_key)
Expand All @@ -112,6 +120,27 @@ def get_status(self):
status["start_time"] = start
status["end_time"] = end
status["last_update"] = last_update

jobs_info = {
"finished": self.conn.get(self.finished_key),
"running": self.conn.scard(self.running_key),
"pending": self.conn.scard(self.pending_key),
"stages": [],
}

for stage in self.conn.smembers(self.active_stages_key):
stage_key = self.get_stage_key(stage)
jobs_info["stages"].append(
{
"job_id": None,
"stage": stage,
"pending": self.conn.scard(make_key(stage_key, "pending")),
"running": self.conn.scard(make_key(stage_key, "running")),
"finished": self.conn.get(make_key(stage_key, "finished")),
}
)

status["jobs"].append(jobs_info)
return status

@classmethod
Expand Down Expand Up @@ -139,6 +168,17 @@ def cleanup_dataset_status(cls, conn):
pipe.srem(dataset.key, dataset.name)
# reset finished task count
pipe.delete(dataset.finished_key)
# delete information about running stages
for stage in dataset.conn.smembers(dataset.active_stages_key):
stage_key = dataset.get_stage_key(stage)
pipe.delete(stage_key)
pipe.delete(make_key(stage_key, "pending"))
pipe.delete(make_key(stage_key, "running"))
pipe.delete(make_key(stage_key, "finished"))
# delete stages key
pipe.delete(dataset.active_stages_key)
pipe.set(dataset.last_update_key, pack_now())

pipe.execute()

def should_execute(self, task_id):
Expand All @@ -164,40 +204,74 @@ def should_execute(self, task_id):
continue
return _should_execute

def add_task(self, task_id):
def add_task(self, task_id, stage):
"""Update state when a new task is added to the task queue"""
log.info(f"Adding task: {task_id}")
pipe = self.conn.pipeline()
# add the dataset to active datasets
pipe.sadd(self.key, self.name)

# update status of stages per dataset
stage_key = self.get_stage_key(stage)
pipe.sadd(self.active_stages_key, stage)
pipe.sadd(stage_key, task_id)
pipe.sadd(make_key(stage_key, "pending"), task_id)

pipe.sadd(self.pending_key, task_id)
pipe.set(self.start_key, pack_now())
pipe.set(self.last_update_key, pack_now())
pipe.delete(self.end_key)
pipe.execute()

def remove_task(self, task_id):
def remove_task(self, task_id, stage):
"""Remove a task that's not going to be executed"""
log.info(f"Removing task: {task_id}")
pipe = self.conn.pipeline()
pipe.srem(self.pending_key, task_id)

stage_key = self.get_stage_key(stage)
pipe.srem(stage_key, task_id)
pipe.srem(make_key(stage_key, "pending"), task_id)

pipe.delete(make_key(PREFIX, "qdj", self.name, "taskretry", task_id))

status = self.get_status()
if status["running"] == 0 and status["pending"] == 0:
# remove the dataset from active datasets
pipe.srem(self.key, self.name)
# reset finished task count
pipe.delete(self.finished_key)
# delete information about running stages
for stage in self.conn.smembers(self.active_stages_key):
stage_key = self.get_stage_key(stage)
pipe.delete(stage_key)
pipe.delete(make_key(stage_key, "pending"))
pipe.delete(make_key(stage_key, "running"))
pipe.delete(make_key(stage_key, "finished"))
# delete stages key
pipe.delete(self.active_stages_key)
pipe.set(self.last_update_key, pack_now())
pipe.execute()

def checkout_task(self, task_id):
def checkout_task(self, task_id, stage):
"""Update state when a task is checked out for execution"""
log.info(f"Checking out task: {task_id}")
pipe = self.conn.pipeline()
# add the dataset to active datasets
pipe.sadd(self.key, self.name)

# update status of stages per dataset
stage_key = self.get_stage_key(stage)
pipe.sadd(self.active_stages_key, stage)
pipe.sadd(stage_key, task_id)
pipe.srem(make_key(stage_key, "pending"), task_id)
pipe.sadd(make_key(stage_key, "running"), task_id)

pipe.srem(self.pending_key, task_id)
pipe.sadd(self.running_key, task_id)
pipe.set(self.start_key, pack_now())
pipe.set(self.last_update_key, pack_now())
pipe.delete(self.end_key)
pipe.execute()

def mark_done(self, task: Task):
Expand All @@ -208,13 +282,31 @@ def mark_done(self, task: Task):
pipe.srem(self.running_key, task.task_id)
pipe.incr(self.finished_key)
pipe.delete(task.retry_key)

stage_key = self.get_stage_key(task.operation)
pipe.srem(stage_key, task.task_id)
pipe.srem(make_key(stage_key, "pending"), task.task_id)
pipe.srem(make_key(stage_key, "running"), task.task_id)
pipe.incr(make_key(stage_key, "finished"), task.task_id)

pipe.set(self.end_key, pack_now())
pipe.set(self.last_update_key, pack_now())
pipe.execute()
status = self.get_status()
if status["running"] == 0 and status["pending"] == 0:
# remove the dataset from active datasets
self.conn.srem(self.key, self.name)
# reset finished task count
pipe.delete(self.finished_key)
# delete information about running stages
for stage in self.conn.smembers(self.active_stages_key):
stage_key = self.get_stage_key(stage)
pipe.delete(stage_key)
pipe.delete(make_key(stage_key, "pending"))
pipe.delete(make_key(stage_key, "running"))
pipe.delete(make_key(stage_key, "finished"))
# delete stages key
pipe.delete(self.active_stages_key)

def is_done(self):
status = self.get_status()
Expand All @@ -223,6 +315,9 @@ def is_done(self):
def __str__(self):
return self.name

def get_stage_key(self, stage):
return make_key(PREFIX, "qds", self.name, stage)


def get_task(body, delivery_tag) -> Task:
body = json.loads(body)
Expand Down Expand Up @@ -362,7 +457,7 @@ def handle(self, task: Task):
raise MaxRetriesExceededError(
f"Max retries reached for task {task.task_id}. Aborting."
)
dataset.checkout_task(task.task_id)
dataset.checkout_task(task.task_id, task.operation)
task.increment_retry_count(self.conn)
task = self.dispatch_task(task)
else:
Expand Down

0 comments on commit f6756bf

Please sign in to comment.