Skip to content

Commit

Permalink
Add test for the warning
Browse files Browse the repository at this point in the history
  • Loading branch information
qianl15 committed Jan 23, 2025
1 parent 00b6a0b commit f9cb835
Showing 1 changed file with 13 additions and 1 deletion.
14 changes: 13 additions & 1 deletion tests/test_queue.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import os
import subprocess
import threading
Expand Down Expand Up @@ -478,7 +479,7 @@ def test_worker_concurrency_with_n_dbos_instances(dbos: DBOS) -> None:


# Test error cases where we have duplicated workflows starting with the same workflow ID.
def test_duplicate_workflow_id(dbos: DBOS) -> None:
def test_duplicate_workflow_id(dbos: DBOS, caplog: pytest.LogCaptureFixture) -> None:
wfid = str(uuid.uuid4())

@DBOS.workflow()
Expand Down Expand Up @@ -510,11 +511,17 @@ def test_workflow(self, var1: str) -> str:
DBOS.sleep(0.1)
return self.config_name + ":" + var1

original_propagate = logging.getLogger("dbos").propagate
caplog.set_level(logging.WARNING, "dbos")
logging.getLogger("dbos").propagate = True

with SetWorkflowID(wfid):
origHandle = DBOS.start_workflow(test_workflow, "abc")
# The second one will generate a warning message but no error.
test_dup_workflow()

assert "Multiple workflow started within the SetWorkflowID block." in caplog.text

# It's okay to call the same workflow with the same ID again.
with SetWorkflowID(wfid):
same_handle = DBOS.start_workflow(test_workflow, "abc")
Expand Down Expand Up @@ -559,13 +566,18 @@ def test_workflow(self, var1: str) -> str:
with SetWorkflowID(wfid):
handle = queue.enqueue(test_workflow, "abc")
assert handle.get_result() == "abc"
assert "Workflow already exists in queue" in caplog.text

# Call with a different input would generate a warning, but still use the recorded input.
with SetWorkflowID(wfid):
res = test_workflow("def")
# We want to see the warning message, but the result is non-deterministic
# TODO: in the future, we may want to always use the recorded inputs.
assert res == "abc" or res == "def"
assert f"Workflow inputs for {wfid} changed since the first call" in caplog.text

assert origHandle.get_result() == "abc"
assert same_handle.get_result() == "abc"

# Reset logging
logging.getLogger("dbos").propagate = original_propagate

0 comments on commit f9cb835

Please sign in to comment.