Skip to content

Commit

Permalink
Workflow status optimization / queue fix (#179)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuck-dbos authored Jan 21, 2025
1 parent 6434514 commit 4a4a80a
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 14 deletions.
14 changes: 11 additions & 3 deletions dbos/_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,21 +180,23 @@ def _init_workflow(
if class_name is not None:
inputs = {"args": inputs["args"][1:], "kwargs": inputs["kwargs"]}

wf_status = status["status"]
if temp_wf_type != "transaction" or queue is not None:
# Synchronously record the status and inputs for workflows and single-step workflows
# We also have to do this for single-step workflows because of the foreign key constraint on the operation outputs table
# TODO: Make this transactional (and with the queue step below)
dbos._sys_db.update_workflow_status(
wf_status = dbos._sys_db.update_workflow_status(
status, False, ctx.in_recovery, max_recovery_attempts=max_recovery_attempts
)
dbos._sys_db.update_workflow_inputs(wfid, _serialization.serialize_args(inputs))
else:
# Buffer the inputs for single-transaction workflows, but don't buffer the status
dbos._sys_db.buffer_workflow_inputs(wfid, _serialization.serialize_args(inputs))

if queue is not None:
if queue is not None and wf_status == WorkflowStatusString.ENQUEUED.value:
dbos._sys_db.enqueue(wfid, queue)

status["status"] = wf_status
return status


Expand Down Expand Up @@ -413,7 +415,13 @@ def start_workflow(
max_recovery_attempts=fi.max_recovery_attempts,
)

if not execute_workflow:
wf_status = status["status"]

if (
not execute_workflow
or wf_status == WorkflowStatusString.ERROR.value
or wf_status == WorkflowStatusString.SUCCESS.value
):
return WorkflowHandlePolling(new_wf_id, dbos)

if fself is not None:
Expand Down
10 changes: 8 additions & 2 deletions dbos/_sys_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,9 @@ def update_workflow_status(
*,
conn: Optional[sa.Connection] = None,
max_recovery_attempts: int = DEFAULT_MAX_RECOVERY_ATTEMPTS,
) -> None:
) -> WorkflowStatuses:
wf_status: WorkflowStatuses = status["status"]

cmd = pg.insert(SystemSchema.workflow_status).values(
workflow_uuid=status["workflow_uuid"],
status=status["status"],
Expand Down Expand Up @@ -287,17 +289,19 @@ def update_workflow_status(
)
else:
cmd = cmd.on_conflict_do_nothing()
cmd = cmd.returning(SystemSchema.workflow_status.c.recovery_attempts) # type: ignore
cmd = cmd.returning(SystemSchema.workflow_status.c.recovery_attempts, SystemSchema.workflow_status.c.status) # type: ignore

if conn is not None:
results = conn.execute(cmd)
else:
with self.engine.begin() as c:
results = c.execute(cmd)

if in_recovery:
row = results.fetchone()
if row is not None:
recovery_attempts: int = row[0]
wf_status = row[1]
if recovery_attempts > max_recovery_attempts:
with self.engine.begin() as c:
c.execute(
Expand Down Expand Up @@ -329,6 +333,8 @@ def update_workflow_status(
if status["workflow_uuid"] in self._temp_txn_wf_ids:
self._exported_temp_txn_wf_status.add(status["workflow_uuid"])

return wf_status

def set_workflow_status(
self,
workflow_uuid: str,
Expand Down
3 changes: 0 additions & 3 deletions tests/classdefs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@
# Public API
from dbos import DBOS, DBOSConfiguredInstance

# Private API used because this is a test
from dbos._context import assert_current_dbos_context


@DBOS.dbos_class()
class DBOSTestClass(DBOSConfiguredInstance):
Expand Down
7 changes: 1 addition & 6 deletions tests/more_classdefs.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
from typing import Optional

import sqlalchemy as sa

# Public API
from dbos import DBOS, DBOSConfiguredInstance

# Private API used because this is a test
from dbos._context import assert_current_dbos_context
from dbos import DBOS


@DBOS.dbos_class()
Expand Down
66 changes: 66 additions & 0 deletions tests/queuedworkflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Public API
import os

from dbos import DBOS, ConfigFile, Queue, SetWorkflowID


def default_config() -> ConfigFile:
return {
"name": "test-app",
"language": "python",
"database": {
"hostname": "localhost",
"port": 5432,
"username": "postgres",
"password": os.environ["PGPASSWORD"],
"app_db_name": "dbostestpy",
},
"runtimeConfig": {
"start": ["python3 main.py"],
},
"telemetry": {},
"env": {},
}


q = Queue("testq", concurrency=1, limiter={"limit": 1, "period": 1})


class WF:
@staticmethod
@DBOS.workflow()
def queued_task() -> int:
DBOS.sleep(0.1)
return 1

@staticmethod
@DBOS.workflow()
def enqueue_5_tasks() -> int:
for i in range(5):
print(f"Iteration {i + 1}")
wfh = DBOS.start_workflow(WF.queued_task)
wfh.get_result()
DBOS.sleep(0.9)

if i == 3 and "DIE_ON_PURPOSE" in os.environ:
print("CRASH")
os._exit(1)
return 5

x = 5


def main() -> None:
DBOS(config=default_config())
DBOS.launch()
DBOS.recover_pending_workflows()

with SetWorkflowID("testqueuedwfcrash"):
WF.enqueue_5_tasks()

DBOS.destroy()
os._exit(0)


if __name__ == "__main__":
main()
47 changes: 47 additions & 0 deletions tests/test_queue.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import os
import subprocess
import threading
import time
import uuid
Expand All @@ -8,6 +10,7 @@
from dbos._dbos import WorkflowHandle
from dbos._schemas.system_database import SystemSchema
from dbos._sys_db import WorkflowStatusString
from tests.conftest import default_config


def queue_entries_are_cleaned_up(dbos: DBOS) -> bool:
Expand Down Expand Up @@ -320,3 +323,47 @@ def limited_workflow(var1: str, var2: str) -> float:

# Verify all queue entries eventually get cleaned up.
assert queue_entries_are_cleaned_up(dbos)


def test_queue_workflow_in_recovered_workflow(dbos: DBOS) -> None:
# We don't want to be taking queued jobs while subprocess runs
DBOS.destroy()

# Set up environment variables to trigger the crash in subprocess
env = os.environ.copy()
env["DIE_ON_PURPOSE"] = "true"

# Run the script as a subprocess to get a workflow stuck
process = subprocess.run(
["python", "tests/queuedworkflow.py"],
cwd=os.getcwd(),
env=env,
capture_output=True,
text=True,
)
# print ("Process Return: ")
# print (process.stdout)
# print (process.stderr)
assert process.returncode != 0 # Crashed

# Run script again without crash
process = subprocess.run(
["python", "tests/queuedworkflow.py"],
cwd=os.getcwd(),
env=os.environ,
capture_output=True,
text=True,
)
# print ("Process Return: ")
# print (process.stdout)
# print (process.stderr)
assert process.returncode == 0 # Ran to completion

# Launch DBOS to check answer
dbos = DBOS(config=default_config())
DBOS.launch()
wfh: WorkflowHandle[int] = DBOS.retrieve_workflow("testqueuedwfcrash")
assert wfh.get_result() == 5
assert wfh.get_status().status == "SUCCESS"
assert queue_entries_are_cleaned_up(dbos)
return

0 comments on commit 4a4a80a

Please sign in to comment.