Skip to content

Commit

Permalink
we must execute garbage collection for completed, rate limited tasks …
Browse files Browse the repository at this point in the history
…-- can we split the logic instead of doing this try/catch?
  • Loading branch information
maxdml committed Jan 16, 2025
1 parent 75243c2 commit 0de574f
Showing 1 changed file with 124 additions and 113 deletions.
237 changes: 124 additions & 113 deletions dbos/_sys_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -1112,134 +1112,145 @@ def start_queued_workflows(self, queue: "Queue", executor_id: str) -> List[str]:
# Execute with snapshot isolation to ensure multiple workers respect limits
c.execute(sa.text("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ"))

# If there is a limiter, compute how many functions have started in its period.
if queue.limiter is not None:
query = (
sa.select(sa.func.count())
.select_from(SystemSchema.workflow_queue)
.where(SystemSchema.workflow_queue.c.queue_name == queue.name)
.where(
SystemSchema.workflow_queue.c.started_at_epoch_ms.isnot(None)
ret_ids: list[str] = []
try:
# If there is a limiter, compute how many functions have started in its period.
if queue.limiter is not None:
query = (
sa.select(sa.func.count())
.select_from(SystemSchema.workflow_queue)
.where(SystemSchema.workflow_queue.c.queue_name == queue.name)
.where(
SystemSchema.workflow_queue.c.started_at_epoch_ms.isnot(
None
)
)
.where(
SystemSchema.workflow_queue.c.started_at_epoch_ms
> start_time_ms - limiter_period_ms
)
)
.where(
SystemSchema.workflow_queue.c.started_at_epoch_ms
> start_time_ms - limiter_period_ms
num_recent_queries = c.execute(query).fetchone()[0] # type: ignore
if num_recent_queries >= queue.limiter["limit"]:
return []

# Select not-yet-completed functions in the queue ordered by the
# time at which they were enqueued.
# If there is a concurrency limit N, select only the N oldest enqueued
# functions, else select all of them.
query = (
sa.select(
SystemSchema.workflow_queue.c.workflow_uuid,
SystemSchema.workflow_queue.c.started_at_epoch_ms,
SystemSchema.workflow_queue.c.executor_id,
)
.where(SystemSchema.workflow_queue.c.queue_name == queue.name)
.where(SystemSchema.workflow_queue.c.completed_at_epoch_ms == None)
.order_by(SystemSchema.workflow_queue.c.created_at_epoch_ms.asc())
)
num_recent_queries = c.execute(query).fetchone()[0] # type: ignore
if num_recent_queries >= queue.limiter["limit"]:
if queue.concurrency is not None:
query = query.limit(queue.concurrency)

rows = c.execute(query).fetchall()
dbos_logger.debug(f"[{queue.name}] dequeued {len(rows)} task(s)")
if len(rows) == 0:
return []

# Select not-yet-completed functions in the queue ordered by the
# time at which they were enqueued.
# If there is a concurrency limit N, select only the N oldest enqueued
# functions, else select all of them.
query = (
sa.select(
SystemSchema.workflow_queue.c.workflow_uuid,
SystemSchema.workflow_queue.c.started_at_epoch_ms,
SystemSchema.workflow_queue.c.executor_id,
# First, get the IDs of functions that have already been started
# We will use these to calculate how many more functions this worker can start
number_of_tasks_already_started: int = len(
[row[0] for row in rows if row[1] is not None]
)
dbos_logger.debug(
f"[{queue.name}] {number_of_tasks_already_started} task(s) already started"
)
.where(SystemSchema.workflow_queue.c.queue_name == queue.name)
.where(SystemSchema.workflow_queue.c.completed_at_epoch_ms == None)
.order_by(SystemSchema.workflow_queue.c.created_at_epoch_ms.asc())
)
if queue.concurrency is not None:
query = query.limit(queue.concurrency)

rows = c.execute(query).fetchall()
dbos_logger.debug(f"[{queue.name}] dequeued {len(rows)} task(s)")
if len(rows) == 0:
return []

# First, get the IDs of functions that have already been started
# We will use these to calculate how many more functions this worker can start
number_of_tasks_already_started: int = len(
[row[0] for row in rows if row[1] is not None]
)
dbos_logger.debug(
f"[{queue.name}] {number_of_tasks_already_started} task(s) already started"
)

# queue length >= queue.concurrency >= len(rows) >= number_of_tasks_already_started > 0
number_of_eligible_tasks: int = len(rows) - number_of_tasks_already_started
dbos_logger.debug(
f"[{queue.name}] {number_of_eligible_tasks} task(s) eligible for dequeue"
)
if number_of_eligible_tasks == 0:
return []

tasks_this_worker_is_already_working_on: int = len(
[row[0] for row in rows if len(row) == 3 and row[2] == executor_id]
)

# This worker can dequeue up to whatever is smaller between the eligible tasks and its set concurrency
# Of course we must account for tasks this worker is already working on, dequeued during a previous pass of this function
max_tasks_this_worker_can_dequeue = (
min(
number_of_eligible_tasks,
(
queue.worker_concurrency
if queue.worker_concurrency is not None
else float("inf")
),
# queue length >= queue.concurrency >= len(rows) >= number_of_tasks_already_started > 0
number_of_eligible_tasks: int = (
len(rows) - number_of_tasks_already_started
)
- tasks_this_worker_is_already_working_on
)
assert (
max_tasks_this_worker_can_dequeue >= 0
) # TODO: remove this assert after sufficient tests are implemented

# Now, get the workflow IDs of functions that have not yet been started
# Limit the list by the maximum concurrency for this worker
dequeued_ids: List[str] = [row[0] for row in rows if row[1] is None][
:max_tasks_this_worker_can_dequeue
]
dbos_logger.debug(f"[{queue.name}] dequeueing {len(dequeued_ids)} task(s)")
ret_ids: list[str] = []
for id in dequeued_ids:
dbos_logger.debug(
f"[{queue.name}] {number_of_eligible_tasks} task(s) eligible for dequeue"
)
if number_of_eligible_tasks == 0:
return []

# If we have a limiter, stop starting functions when the number
# of functions started this period exceeds the limit.
if queue.limiter is not None:
if len(ret_ids) + num_recent_queries >= queue.limiter["limit"]:
break
tasks_this_worker_is_already_working_on: int = len(
[row[0] for row in rows if len(row) == 3 and row[2] == executor_id]
)

# To start a function, first set its status to PENDING and update its executor ID
c.execute(
SystemSchema.workflow_status.update()
.where(SystemSchema.workflow_status.c.workflow_uuid == id)
.where(
SystemSchema.workflow_status.c.status
== WorkflowStatusString.ENQUEUED.value
)
.values(
status=WorkflowStatusString.PENDING.value,
executor_id=executor_id,
# This worker can dequeue up to whatever is smaller between the eligible tasks and its set concurrency
# Of course we must account for tasks this worker is already working on, dequeued during a previous pass of this function
max_tasks_this_worker_can_dequeue = (
min(
number_of_eligible_tasks,
(
queue.worker_concurrency
if queue.worker_concurrency is not None
else float("inf")
),
)
- tasks_this_worker_is_already_working_on
)

# Then give it a start time and assign the executor ID
c.execute(
SystemSchema.workflow_queue.update()
.where(SystemSchema.workflow_queue.c.workflow_uuid == id)
.values(started_at_epoch_ms=start_time_ms, executor_id=executor_id)
assert (
max_tasks_this_worker_can_dequeue >= 0
) # TODO: remove this assert after sufficient tests are implemented

# Now, get the workflow IDs of functions that have not yet been started
# Limit the list by the maximum concurrency for this worker
dequeued_ids: List[str] = [row[0] for row in rows if row[1] is None][
:max_tasks_this_worker_can_dequeue
]
dbos_logger.debug(
f"[{queue.name}] dequeueing {len(dequeued_ids)} task(s)"
)
ret_ids.append(id)
for id in dequeued_ids:

# If we have a limiter, stop starting functions when the number
# of functions started this period exceeds the limit.
if queue.limiter is not None:
if len(ret_ids) + num_recent_queries >= queue.limiter["limit"]:
break

# To start a function, first set its status to PENDING and update its executor ID
c.execute(
SystemSchema.workflow_status.update()
.where(SystemSchema.workflow_status.c.workflow_uuid == id)
.where(
SystemSchema.workflow_status.c.status
== WorkflowStatusString.ENQUEUED.value
)
.values(
status=WorkflowStatusString.PENDING.value,
executor_id=executor_id,
)
)

# If we have a limiter, garbage-collect all completed functions started
# before the period. If there's no limiter, there's no need--they were
# deleted on completion.
if queue.limiter is not None:
c.execute(
sa.delete(SystemSchema.workflow_queue)
.where(SystemSchema.workflow_queue.c.completed_at_epoch_ms != None)
.where(SystemSchema.workflow_queue.c.queue_name == queue.name)
.where(
SystemSchema.workflow_queue.c.started_at_epoch_ms
< start_time_ms - limiter_period_ms
# Then give it a start time and assign the executor ID
c.execute(
SystemSchema.workflow_queue.update()
.where(SystemSchema.workflow_queue.c.workflow_uuid == id)
.values(
started_at_epoch_ms=start_time_ms, executor_id=executor_id
)
)
ret_ids.append(id)
finally:
# If we have a limiter, garbage-collect all completed functions started
# before the period. If there's no limiter, there's no need--they were
# deleted on completion.
if queue.limiter is not None:
c.execute(
sa.delete(SystemSchema.workflow_queue)
.where(
SystemSchema.workflow_queue.c.completed_at_epoch_ms != None
)
.where(SystemSchema.workflow_queue.c.queue_name == queue.name)
.where(
SystemSchema.workflow_queue.c.started_at_epoch_ms
< start_time_ms - limiter_period_ms
)
)
)

# Return the IDs of all functions we started
return ret_ids
Expand Down

0 comments on commit 0de574f

Please sign in to comment.