Skip to content

Commit

Permalink
Merge branch 'release/1.23.0' into chore/fix-redis-keys
Browse files Browse the repository at this point in the history
  • Loading branch information
catileptic committed Sep 25, 2024
2 parents 2670410 + 83c62cb commit 7b16152
Show file tree
Hide file tree
Showing 7 changed files with 266 additions and 113 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 1.23.0-rc32
current_version = 1.23.0-rc34
commit = True
tag = True
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)([-](?P<release>(pre|rc))(?P<build>\d+))?
Expand Down
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ all: clean install test
.PHONY: build

build-docker:
docker-compose build --no-rm --parallel
docker compose build --no-rm --parallel

install:
pip install -q -e .
Expand All @@ -16,7 +16,7 @@ dev:
python3 -m pip install -q -r requirements-dev.txt

test:
docker-compose run --rm shell pytest --cov=servicelayer
docker compose run --rm shell pytest --cov=servicelayer
@echo "⚠️ you might notice a warning about a fairy from SQLAlchemy"
@echo "this is fixed in a newer release -- see https://github.com/sqlalchemy/sqlalchemy/issues/10414"
@echo "we are ignoring this for now"
Expand All @@ -31,7 +31,7 @@ format-check:
black --check .

shell:
docker-compose run --rm shell
docker compose run --rm shell

build:
python3 setup.py sdist bdist_wheel
Expand Down
1 change: 1 addition & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ pytest-env==1.1.3
pytest-cov==5.0.0
pytest-mock==3.14.0
wheel==0.43.0
time-machine==2.14.1
2 changes: 1 addition & 1 deletion servicelayer/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging

__version__ = "1.23.0-rc32"
__version__ = "1.23.0-rc34"

logging.getLogger("boto3").setLevel(logging.WARNING)
logging.getLogger("botocore").setLevel(logging.WARNING)
162 changes: 62 additions & 100 deletions servicelayer/taskqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,48 +83,50 @@ def __init__(self, conn, name):
self.running_key = make_key(PREFIX, "qdj", name, "running")
self.pending_key = make_key(PREFIX, "qdj", name, "pending")
self.start_key = make_key(PREFIX, "qdj", name, "start")
self.end_key = make_key(PREFIX, "qdj", name, "end")
self.last_update_key = make_key(PREFIX, "qdj", name, "last_update")
self.active_stages_key = make_key(PREFIX, "qds", name, "active_stages")

def cancel(self):
"""Cancel processing of all tasks belonging to a dataset"""
pipe = self.conn.pipeline()
def flush_status(self, pipe):
# remove the dataset from active datasets
pipe.srem(self.key, self.name)
# clean up tasks and task counts
pipe.delete(self.finished_key)
pipe.delete(self.running_key)
pipe.delete(self.pending_key)

# reset timestamps
pipe.delete(self.start_key)
pipe.delete(self.end_key)
pipe.delete(self.last_update_key)

# delete information about running stages
for stage in self.conn.smembers(self.active_stages_key):
# TODO - delete this block after all wrong keys are gone
stage_key = self.get_stage_key(stage)
pipe.delete(stage_key)
pipe.delete(make_key(stage_key, "pending"))
pipe.delete(make_key(stage_key, "running"))
pipe.delete(make_key(stage_key, "finished"))

# TODO - correct
pipe.delete(make_key(PREFIX, "qds", self.name, stage))
pipe.delete(make_key(PREFIX, "qds", self.name, stage, "pending"))
pipe.delete(make_key(PREFIX, "qds", self.name, stage, "running"))
pipe.delete(make_key(PREFIX, "qds", self.name, stage, "finished"))

# delete information about tasks per dataset
pipe.delete(self.pending_key)
pipe.delete(self.running_key)
pipe.delete(self.finished_key)

# delete stages key
pipe.delete(self.active_stages_key)

def cancel(self):
"""Cancel processing of all tasks belonging to a dataset"""
pipe = self.conn.pipeline()
self.flush_status(pipe)
pipe.execute()

def get_status(self):
"""Status of a given dataset."""
status = {"finished": 0, "running": 0, "pending": 0, "stages": []}

start, end, last_update = self.conn.mget(
(self.start_key, self.end_key, self.last_update_key)
)
start, last_update = self.conn.mget((self.start_key, self.last_update_key))
status["start_time"] = start
status["end_time"] = end
status["last_update"] = last_update

for stage in self.conn.smembers(self.active_stages_key):
Expand Down Expand Up @@ -198,33 +200,9 @@ def cleanup_dataset_status(cls, conn):
datasets_key = make_key(PREFIX, "qdatasets")
for name in conn.smembers(datasets_key):
dataset = cls(conn, name)
status = dataset.get_status()
if status["running"] == 0 and status["pending"] == 0:
if dataset.is_done():
pipe = conn.pipeline()
# remove the dataset from active datasets
pipe.srem(dataset.key, dataset.name)
# reset finished task count
pipe.delete(dataset.finished_key)
# delete information about running stages
for stage in dataset.conn.smembers(dataset.active_stages_key):
# TODO - delete this block after all wrong keys are gone
stage_key = dataset.get_stage_key(stage)
pipe.delete(stage_key)
pipe.delete(make_key(stage_key, "pending"))
pipe.delete(make_key(stage_key, "running"))
pipe.delete(make_key(stage_key, "finished"))

# TODO - correct
pipe.delete(make_key(PREFIX, "qds", dataset.name, stage))
pipe.delete(make_key(PREFIX, "qds", dataset.name, stage, "pending"))
pipe.delete(make_key(PREFIX, "qds", dataset.name, stage, "running"))
pipe.delete(
make_key(PREFIX, "qds", dataset.name, stage, "finished")
)
# delete stages key
pipe.delete(dataset.active_stages_key)
pipe.set(dataset.last_update_key, pack_now())

dataset.flush_status(pipe)
pipe.execute()

def should_execute(self, task_id):
Expand Down Expand Up @@ -262,54 +240,41 @@ def add_task(self, task_id, stage):
pipe.sadd(make_key(PREFIX, "qds", self.name, stage), task_id)
pipe.sadd(make_key(PREFIX, "qds", self.name, stage, "pending"), task_id)

# add the task to the set of pending tasks per dataset
pipe.sadd(self.pending_key, task_id)
pipe.set(self.start_key, pack_now())

# update dataset timestamps
pipe.set(self.start_key, pack_now(), nx=True)
pipe.set(self.last_update_key, pack_now())
pipe.delete(self.end_key)
pipe.execute()

def remove_task(self, task_id, stage):
"""Remove a task that's not going to be executed"""
log.info(f"Removing task: {task_id}")
pipe = self.conn.pipeline()

# remove the task from the set of pending tasks per dataset
pipe.srem(self.pending_key, task_id)

pipe.srem(make_key(PREFIX, "qds", self.name, stage), task_id)
pipe.srem(make_key(PREFIX, "qds", self.name, stage, "pending"), task_id)

# TODO - remove this after there are no wrong keys left
# It's fine if this si executed, because an inexistent key is
# It's fine if this is executed, because an inexistent key is
# treated as an empty set and SREM works fine.
stage_key = self.get_stage_key(stage)
pipe.srem(make_key(stage_key, "pending"), task_id)

# delete the retry key for this task
pipe.delete(make_key(PREFIX, "qdj", self.name, "taskretry", task_id))

status = self.get_status()
if status["running"] == 0 and status["pending"] == 0:
# remove the dataset from active datasets
pipe.srem(self.key, self.name)
# reset finished task count
pipe.delete(self.finished_key)
# delete information about running stages
for stage in self.conn.smembers(self.active_stages_key):
# TODO - delete this block after all wrong keys are gone
stage_key = self.get_stage_key(stage)
pipe.delete(stage_key)
pipe.delete(make_key(stage_key, "pending"))
pipe.delete(make_key(stage_key, "running"))
pipe.delete(make_key(stage_key, "finished"))

# TODO - correct
pipe.delete(make_key(PREFIX, "qds", self.name, stage))
pipe.delete(make_key(PREFIX, "qds", self.name, stage, "pending"))
pipe.delete(make_key(PREFIX, "qds", self.name, stage, "running"))
pipe.delete(make_key(PREFIX, "qds", self.name, stage, "finished"))
# delete stages key
pipe.delete(self.active_stages_key)
pipe.set(self.last_update_key, pack_now())
pipe.execute()

if self.is_done():
pipe = self.conn.pipeline()
self.flush_status(pipe)
pipe.execute()

def checkout_task(self, task_id, stage):
"""Update state when a task is checked out for execution"""
log.info(f"Checking out task: {task_id}")
Expand All @@ -327,20 +292,29 @@ def checkout_task(self, task_id, stage):
stage_key = self.get_stage_key(stage)
pipe.srem(make_key(stage_key, "pending"), task_id)

pipe.srem(self.pending_key, task_id)
# add the task to the set of running tasks per dataset
pipe.sadd(self.running_key, task_id)
pipe.set(self.start_key, pack_now())
# remove the task from the set of pending tasks per dataset
pipe.srem(self.pending_key, task_id)

# update dataset timestamps
pipe.set(self.start_key, pack_now(), nx=True)
pipe.set(self.last_update_key, pack_now())
pipe.delete(self.end_key)
pipe.execute()

def mark_done(self, task: Task):
"""Update state when a task is finished executing"""
log.info(f"Finished executing task: {task.task_id}")
pipe = self.conn.pipeline()

# remove the task from the pending and running sets of tasks per dataset
pipe.srem(self.pending_key, task.task_id)
pipe.srem(self.running_key, task.task_id)

# increase the number of finished tasks per dataset
pipe.incr(self.finished_key)

# delete the retry key for the task
pipe.delete(task.retry_key)

stage = task.operation
Expand All @@ -352,38 +326,18 @@ def mark_done(self, task: Task):

# TODO - remove this after all the wrong keys are gone
stage_key = self.get_stage_key(stage)
pipe.srem(stage_key, task_id)
pipe.srem(make_key(stage_key, "pending"), task_id)
pipe.srem(make_key(stage_key, "running"), task_id)
pipe.incr(make_key(stage_key, "finished"))

pipe.set(self.end_key, pack_now())
# update dataset timestamps
pipe.set(self.last_update_key, pack_now())
pipe.execute()

status = self.get_status()
if status["running"] == 0 and status["pending"] == 0:
# remove the dataset from active datasets
pipe.srem(self.key, self.name)
# reset finished task count
pipe.delete(self.finished_key)
# delete information about running stages
for stage in self.conn.smembers(self.active_stages_key):
# TODO - delete this block after all wrong keys are gone
stage_key = self.get_stage_key(stage)
pipe.delete(stage_key)
pipe.delete(make_key(stage_key, "pending"))
pipe.delete(make_key(stage_key, "running"))
pipe.delete(make_key(stage_key, "finished"))

# TODO - correct
pipe.delete(make_key(PREFIX, "qds", self.name, stage))
pipe.delete(make_key(PREFIX, "qds", self.name, stage, "pending"))
pipe.delete(make_key(PREFIX, "qds", self.name, stage, "running"))
pipe.delete(make_key(PREFIX, "qds", self.name, stage, "finished"))
# delete stages key
pipe.delete(self.active_stages_key)
pipe.execute()

if self.is_done():
pipe = self.conn.pipeline()
self.flush_status(pipe)
pipe.execute()

def mark_for_retry(self, task):
Expand All @@ -396,15 +350,20 @@ def mark_for_retry(self, task):

stage = task.operation
task_id = task.task_id

# remove the task from the pending and running sets of tasks per dataset
pipe.srem(self.pending_key, task_id)
pipe.srem(self.running_key, task_id)

pipe.srem(make_key(PREFIX, "qds", self.name, stage, "running"), task_id)
pipe.srem(make_key(PREFIX, "qds", self.name, stage), task_id)
pipe.srem(self.running_key, task_id)
pipe.delete(task.retry_key)

# TODO - remove when there are no wrong tasks left
stage_key = self.get_stage_key(task.operation)
pipe.srem(make_key(stage_key, "running"), task.task_id)
pipe.srem(stage_key, task.task_id)

# delete the retry key for the task
pipe.delete(task.retry_key)

pipe.set(self.last_update_key, pack_now())

Expand All @@ -426,6 +385,8 @@ def is_task_tracked(self, task: Task):
task_id = task.task_id
stage = task.operation

stage_key = self.get_stage_key(stage)

# A task is considered tracked if
# the dataset is in the list of active datasets
if dataset not in self.conn.smembers(self.key):
Expand Down Expand Up @@ -590,7 +551,7 @@ def process_blocking(self):
success, retry = self.handle(task, channel)
log.debug(
f"Task {task.task_id} finished with success={success}"
f" and retry={retry}"
f"{'' if success else ' and retry=' + str(retry)}"
)
if success:
cb = functools.partial(self.ack_message, task, channel)
Expand Down Expand Up @@ -618,6 +579,7 @@ def process_nonblocking(self):
else:
queue_active[queue] = True
task = get_task(body, method.delivery_tag)
task._channel = channel
success, retry = self.handle(task, channel)
if success:
channel.basic_ack(task.delivery_tag)
Expand Down Expand Up @@ -740,9 +702,9 @@ def nack_message(self, task, channel, requeue=True):
dataset = task.get_dataset(conn=self.conn)
# Sync state to redis
if requeue:
dataset.mark_for_retry(task)
if not dataset.is_task_tracked(task):
dataset.add_task(task.task_id, task.operation)
dataset.mark_for_retry(task)
else:
dataset.mark_done(task)
if channel.is_open:
Expand Down
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

setup(
name="servicelayer",
version="1.23.0-rc32",
version="1.23.0-rc34",
description="Basic remote service functions for alephdata components",
classifiers=[
"Development Status :: 3 - Alpha",
Expand Down Expand Up @@ -51,6 +51,7 @@
"pytest >= 3.6",
"coverage",
"pytest-cov",
"time-machine>=2.14.1, <3.0.0",
],
},
test_suite="tests",
Expand Down
Loading

0 comments on commit 7b16152

Please sign in to comment.