Skip to content

Commit

Permalink
Remove wrong keys clean-up code
Browse files Browse the repository at this point in the history
  • Loading branch information
catileptic committed Oct 8, 2024
1 parent 364af11 commit bde2b38
Showing 1 changed file with 1 addition and 49 deletions.
50 changes: 1 addition & 49 deletions servicelayer/taskqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,6 @@ def flush_status(self, pipe):

# delete information about running stages
for stage in self.conn.smembers(self.active_stages_key):
# TODO - delete this block after all wrong keys are gone
stage_key = self.get_stage_key(stage)
pipe.delete(make_key(stage_key, "pending"))
pipe.delete(make_key(stage_key, "running"))
pipe.delete(make_key(stage_key, "finished"))

pipe.delete(make_key(PREFIX, "qds", self.name, stage))
pipe.delete(make_key(PREFIX, "qds", self.name, stage, "pending"))
pipe.delete(make_key(PREFIX, "qds", self.name, stage, "running"))
Expand Down Expand Up @@ -132,31 +126,13 @@ def get_status(self):
for stage in self.conn.smembers(self.active_stages_key):
stage_key = self.get_stage_key(stage)

# TODO - uncomment this after all wrong keys are gone
# num_pending = unpack_int(
# self.conn.scard(make_key(PREFIX, "qds", self.name, stage, "pending"))
# )
# num_running = unpack_int(
# self.conn.scard(make_key(PREFIX, "qds", self.name, stage, "running"))
# )
# num_finished = unpack_int(
# self.conn.get(make_key(PREFIX, "qds", self.name, stage, "finished"))
# )

# TODO - temporary hack
num_pending = unpack_int(
self.conn.scard(make_key(stage_key, "pending"))
) + unpack_int(
self.conn.scard(make_key(PREFIX, "qds", self.name, stage, "pending"))
)
num_running = unpack_int(
self.conn.scard(make_key(stage_key, "running"))
) + unpack_int(
self.conn.scard(make_key(PREFIX, "qds", self.name, stage, "running"))
)
num_finished = unpack_int(
self.conn.get(make_key(stage_key, "finished"))
) + unpack_int(
self.conn.get(make_key(PREFIX, "qds", self.name, stage, "finished"))
)

Expand Down Expand Up @@ -259,12 +235,6 @@ def remove_task(self, task_id, stage):
pipe.srem(make_key(PREFIX, "qds", self.name, stage), task_id)
pipe.srem(make_key(PREFIX, "qds", self.name, stage, "pending"), task_id)

# TODO - remove this after there are no wrong keys left
# It's fine if this is executed, because an inexistent key is
# treated as an empty set and SREM works fine.
stage_key = self.get_stage_key(stage)
pipe.srem(make_key(stage_key, "pending"), task_id)

# delete the retry key for this task
pipe.delete(make_key(PREFIX, "qdj", self.name, "taskretry", task_id))

Expand All @@ -288,10 +258,6 @@ def checkout_task(self, task_id, stage):
pipe.srem(make_key(PREFIX, "qds", self.name, stage, "pending"), task_id)
pipe.sadd(make_key(PREFIX, "qds", self.name, stage, "running"), task_id)

# TODO - remove after all the wrong keys are gone
stage_key = self.get_stage_key(stage)
pipe.srem(make_key(stage_key, "pending"), task_id)

# add the task to the set of running tasks per dataset
pipe.sadd(self.running_key, task_id)
# remove the task from the set of pending tasks per dataset
Expand Down Expand Up @@ -324,11 +290,6 @@ def mark_done(self, task: Task):
pipe.srem(make_key(PREFIX, "qds", self.name, stage, "running"), task_id)
pipe.incr(make_key(PREFIX, "qds", self.name, stage, "finished"))

# TODO - remove this after all the wrong keys are gone
stage_key = self.get_stage_key(stage)
pipe.srem(make_key(stage_key, "pending"), task_id)
pipe.srem(make_key(stage_key, "running"), task_id)

# update dataset timestamps
pipe.set(self.last_update_key, pack_now())

Expand Down Expand Up @@ -358,11 +319,6 @@ def mark_for_retry(self, task):
pipe.srem(make_key(PREFIX, "qds", self.name, stage, "running"), task_id)
pipe.srem(make_key(PREFIX, "qds", self.name, stage), task_id)

# TODO - remove when there are no wrong tasks left
stage_key = self.get_stage_key(task.operation)
pipe.srem(make_key(stage_key, "pending"), task.task_id)
pipe.srem(make_key(stage_key, "running"), task.task_id)

# delete the retry key for the task
pipe.delete(task.retry_key)

Expand All @@ -377,10 +333,6 @@ def is_done(self):
def __str__(self):
return self.name

# TODO - remove this after all the wrong keys are gone
def get_stage_key(self, stage):
return make_key(PREFIX, "qds", self.name, stage)

def is_task_tracked(self, task: Task):
tracked = True

Expand Down Expand Up @@ -726,7 +678,7 @@ def process():
return self.process(blocking=True)

if not self.num_threads:
# TODO - seems like we need at least one thread
# we need at least one thread
# consuming and processing require separate threads
self.num_threads = 1

Expand Down

0 comments on commit bde2b38

Please sign in to comment.