Skip to content

Commit

Permalink
Finish test for queue fix
Browse files Browse the repository at this point in the history
  • Loading branch information
chuck-dbos committed Jan 21, 2025
1 parent 0ee325d commit 7fa49bb
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 9 deletions.
3 changes: 0 additions & 3 deletions tests/classdefs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@
# Public API
from dbos import DBOS, DBOSConfiguredInstance

# Private API used because this is a test
from dbos._context import assert_current_dbos_context


@DBOS.dbos_class()
class DBOSTestClass(DBOSConfiguredInstance):
Expand Down
7 changes: 1 addition & 6 deletions tests/more_classdefs.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
from typing import Optional

import sqlalchemy as sa

# Public API
from dbos import DBOS, DBOSConfiguredInstance

# Private API used because this is a test
from dbos._context import assert_current_dbos_context
from dbos import DBOS


@DBOS.dbos_class()
Expand Down
65 changes: 65 additions & 0 deletions tests/queuedworkflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Public API
import os

from dbos import DBOS, ConfigFile, Queue, SetWorkflowID


def default_config() -> ConfigFile:
return {
"name": "test-app",
"language": "python",
"database": {
"hostname": "localhost",
"port": 5432,
"username": "postgres",
"password": os.environ["PGPASSWORD"],
"app_db_name": "dbostestpy",
},
"runtimeConfig": {
"start": ["python3 main.py"],
},
"telemetry": {},
"env": {},
}


q = Queue("testq", concurrency=1, limiter={"limit": 1, "period": 1})


class WF:
@DBOS.workflow()
@staticmethod
def queued_task() -> int:
DBOS.sleep(0.1)
return 1

@DBOS.workflow()
@staticmethod
def enqueue_5_tasks() -> int:
for i in range(5):
print(f"Iteration {i + 1}")
wfh = DBOS.start_workflow(WF.queued_task)
wfh.get_result()
DBOS.sleep(0.9)

if i == 3 and "DIE_ON_PURPOSE" in os.environ:
print("CRASH")
os._exit(1)
return 5

x = 5


def main() -> None:
DBOS(config=default_config())
DBOS.launch()
DBOS.recover_pending_workflows()

with SetWorkflowID("testqueuedwfcrash"):
WF.enqueue_5_tasks()

DBOS.destroy()


if __name__ == "__main__":
main()
47 changes: 47 additions & 0 deletions tests/test_queue.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import os
import subprocess
import threading
import time
import uuid
Expand All @@ -8,6 +10,7 @@
from dbos._dbos import WorkflowHandle
from dbos._schemas.system_database import SystemSchema
from dbos._sys_db import WorkflowStatusString
from tests.conftest import default_config


def queue_entries_are_cleaned_up(dbos: DBOS) -> bool:
Expand Down Expand Up @@ -320,3 +323,47 @@ def limited_workflow(var1: str, var2: str) -> float:

# Verify all queue entries eventually get cleaned up.
assert queue_entries_are_cleaned_up(dbos)


def test_queue_workflow_in_recovered_workflow(dbos: DBOS) -> None:
# We don't want to be taking queued jobs while subprocess runs
DBOS.destroy()

# Set up environment variables to trigger the crash in subprocess
env = os.environ.copy()
env["DIE_ON_PURPOSE"] = "true"

# Run the script as a subprocess to get a workflow stuck
process = subprocess.run(
["python", "tests/queuedworkflow.py"],
cwd=os.getcwd(),
env=env,
capture_output=True,
text=True,
)
# print ("Process Return: ")
# print (process.stdout)
# print (process.stderr)
assert process.returncode != 0 # Crashed

# Run script again without crash
process = subprocess.run(
["python", "tests/queuedworkflow.py"],
cwd=os.getcwd(),
env=os.environ,
capture_output=True,
text=True,
)
# print ("Process Return: ")
# print (process.stdout)
# print (process.stderr)
assert process.returncode == 0 # Ran to completion

# Launch DBOS to check answer
dbos = DBOS(config=default_config())
DBOS.launch()
wfh: WorkflowHandle[int] = DBOS.retrieve_workflow("testqueuedwfcrash")
assert wfh.get_result() == 5
assert wfh.get_status().status == "SUCCESS"
assert queue_entries_are_cleaned_up(dbos)
return

0 comments on commit 7fa49bb

Please sign in to comment.