diff --git a/servicelayer/taskqueue.py b/servicelayer/taskqueue.py index 67d0d54..11c2cbe 100644 --- a/servicelayer/taskqueue.py +++ b/servicelayer/taskqueue.py @@ -96,12 +96,6 @@ def flush_status(self, pipe): # delete information about running stages for stage in self.conn.smembers(self.active_stages_key): - # TODO - delete this block after all wrong keys are gone - stage_key = self.get_stage_key(stage) - pipe.delete(make_key(stage_key, "pending")) - pipe.delete(make_key(stage_key, "running")) - pipe.delete(make_key(stage_key, "finished")) - pipe.delete(make_key(PREFIX, "qds", self.name, stage)) pipe.delete(make_key(PREFIX, "qds", self.name, stage, "pending")) pipe.delete(make_key(PREFIX, "qds", self.name, stage, "running")) @@ -132,31 +126,13 @@ def get_status(self): for stage in self.conn.smembers(self.active_stages_key): stage_key = self.get_stage_key(stage) - # TODO - uncomment this after all wrong keys are gone - # num_pending = unpack_int( - # self.conn.scard(make_key(PREFIX, "qds", self.name, stage, "pending")) - # ) - # num_running = unpack_int( - # self.conn.scard(make_key(PREFIX, "qds", self.name, stage, "running")) - # ) - # num_finished = unpack_int( - # self.conn.get(make_key(PREFIX, "qds", self.name, stage, "finished")) - # ) - - # TODO - temporary hack num_pending = unpack_int( - self.conn.scard(make_key(stage_key, "pending")) - ) + unpack_int( self.conn.scard(make_key(PREFIX, "qds", self.name, stage, "pending")) ) num_running = unpack_int( - self.conn.scard(make_key(stage_key, "running")) - ) + unpack_int( self.conn.scard(make_key(PREFIX, "qds", self.name, stage, "running")) ) num_finished = unpack_int( - self.conn.get(make_key(stage_key, "finished")) - ) + unpack_int( self.conn.get(make_key(PREFIX, "qds", self.name, stage, "finished")) ) @@ -259,12 +235,6 @@ def remove_task(self, task_id, stage): pipe.srem(make_key(PREFIX, "qds", self.name, stage), task_id) pipe.srem(make_key(PREFIX, "qds", self.name, stage, "pending"), task_id) - # TODO - remove this after there are no wrong keys left - # It's fine if this is executed, because an inexistent key is - # treated as an empty set and SREM works fine. - stage_key = self.get_stage_key(stage) - pipe.srem(make_key(stage_key, "pending"), task_id) - # delete the retry key for this task pipe.delete(make_key(PREFIX, "qdj", self.name, "taskretry", task_id)) @@ -288,10 +258,6 @@ def checkout_task(self, task_id, stage): pipe.srem(make_key(PREFIX, "qds", self.name, stage, "pending"), task_id) pipe.sadd(make_key(PREFIX, "qds", self.name, stage, "running"), task_id) - # TODO - remove after all the wrong keys are gone - stage_key = self.get_stage_key(stage) - pipe.srem(make_key(stage_key, "pending"), task_id) - # add the task to the set of running tasks per dataset pipe.sadd(self.running_key, task_id) # remove the task from the set of pending tasks per dataset @@ -324,11 +290,6 @@ def mark_done(self, task: Task): pipe.srem(make_key(PREFIX, "qds", self.name, stage, "running"), task_id) pipe.incr(make_key(PREFIX, "qds", self.name, stage, "finished")) - # TODO - remove this after all the wrong keys are gone - stage_key = self.get_stage_key(stage) - pipe.srem(make_key(stage_key, "pending"), task_id) - pipe.srem(make_key(stage_key, "running"), task_id) - # update dataset timestamps pipe.set(self.last_update_key, pack_now()) @@ -358,11 +319,6 @@ def mark_for_retry(self, task): pipe.srem(make_key(PREFIX, "qds", self.name, stage, "running"), task_id) pipe.srem(make_key(PREFIX, "qds", self.name, stage), task_id) - # TODO - remove when there are no wrong tasks left - stage_key = self.get_stage_key(task.operation) - pipe.srem(make_key(stage_key, "pending"), task.task_id) - pipe.srem(make_key(stage_key, "running"), task.task_id) - # delete the retry key for the task pipe.delete(task.retry_key) @@ -377,10 +333,6 @@ def is_done(self): def __str__(self): return self.name - # TODO - remove this after all the wrong keys are gone - def get_stage_key(self, stage): - return make_key(PREFIX, "qds", self.name, stage) - def is_task_tracked(self, task: Task): tracked = True @@ -726,7 +678,7 @@ def process(): return self.process(blocking=True) if not self.num_threads: - # TODO - seems like we need at least one thread + # we need at least one thread # consuming and processing require separate threads self.num_threads = 1