diff --git a/tests/classdefs.py b/tests/classdefs.py index 0b8d7ec8..83075048 100644 --- a/tests/classdefs.py +++ b/tests/classdefs.py @@ -3,9 +3,6 @@ # Public API from dbos import DBOS, DBOSConfiguredInstance -# Private API used because this is a test -from dbos._context import assert_current_dbos_context - @DBOS.dbos_class() class DBOSTestClass(DBOSConfiguredInstance): diff --git a/tests/more_classdefs.py b/tests/more_classdefs.py index 8f52f4ba..b1168aad 100644 --- a/tests/more_classdefs.py +++ b/tests/more_classdefs.py @@ -1,12 +1,7 @@ from typing import Optional -import sqlalchemy as sa - # Public API -from dbos import DBOS, DBOSConfiguredInstance - -# Private API used because this is a test -from dbos._context import assert_current_dbos_context +from dbos import DBOS @DBOS.dbos_class() diff --git a/tests/queuedworkflow.py b/tests/queuedworkflow.py new file mode 100644 index 00000000..b5f1201d --- /dev/null +++ b/tests/queuedworkflow.py @@ -0,0 +1,65 @@ +# Public API +import os + +from dbos import DBOS, ConfigFile, Queue, SetWorkflowID + + +def default_config() -> ConfigFile: + return { + "name": "test-app", + "language": "python", + "database": { + "hostname": "localhost", + "port": 5432, + "username": "postgres", + "password": os.environ["PGPASSWORD"], + "app_db_name": "dbostestpy", + }, + "runtimeConfig": { + "start": ["python3 main.py"], + }, + "telemetry": {}, + "env": {}, + } + + +q = Queue("testq", concurrency=1, limiter={"limit": 1, "period": 1}) + + +class WF: + @DBOS.workflow() + @staticmethod + def queued_task() -> int: + DBOS.sleep(0.1) + return 1 + + @DBOS.workflow() + @staticmethod + def enqueue_5_tasks() -> int: + for i in range(5): + print(f"Iteration {i + 1}") + wfh = DBOS.start_workflow(WF.queued_task) + wfh.get_result() + DBOS.sleep(0.9) + + if i == 3 and "DIE_ON_PURPOSE" in os.environ: + print("CRASH") + os._exit(1) + return 5 + + x = 5 + + +def main() -> None: + DBOS(config=default_config()) + DBOS.launch() + DBOS.recover_pending_workflows() + + with SetWorkflowID("testqueuedwfcrash"): + WF.enqueue_5_tasks() + + DBOS.destroy() + + +if __name__ == "__main__": + main() diff --git a/tests/test_queue.py b/tests/test_queue.py index 4ce59eb5..1453ca09 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -1,3 +1,5 @@ +import os +import subprocess import threading import time import uuid @@ -8,6 +10,7 @@ from dbos._dbos import WorkflowHandle from dbos._schemas.system_database import SystemSchema from dbos._sys_db import WorkflowStatusString +from tests.conftest import default_config def queue_entries_are_cleaned_up(dbos: DBOS) -> bool: @@ -320,3 +323,47 @@ def limited_workflow(var1: str, var2: str) -> float: # Verify all queue entries eventually get cleaned up. assert queue_entries_are_cleaned_up(dbos) + + +def test_queue_workflow_in_recovered_workflow(dbos: DBOS) -> None: + # We don't want to be taking queued jobs while subprocess runs + DBOS.destroy() + + # Set up environment variables to trigger the crash in subprocess + env = os.environ.copy() + env["DIE_ON_PURPOSE"] = "true" + + # Run the script as a subprocess to get a workflow stuck + process = subprocess.run( + ["python", "tests/queuedworkflow.py"], + cwd=os.getcwd(), + env=env, + capture_output=True, + text=True, + ) + # print ("Process Return: ") + # print (process.stdout) + # print (process.stderr) + assert process.returncode != 0 # Crashed + + # Run script again without crash + process = subprocess.run( + ["python", "tests/queuedworkflow.py"], + cwd=os.getcwd(), + env=os.environ, + capture_output=True, + text=True, + ) + # print ("Process Return: ") + # print (process.stdout) + # print (process.stderr) + assert process.returncode == 0 # Ran to completion + + # Launch DBOS to check answer + dbos = DBOS(config=default_config()) + DBOS.launch() + wfh: WorkflowHandle[int] = DBOS.retrieve_workflow("testqueuedwfcrash") + assert wfh.get_result() == 5 + assert wfh.get_status().status == "SUCCESS" + assert queue_entries_are_cleaned_up(dbos) + return