Skip to content

Commit

Permalink
switch to WorkflowInformation
Browse files Browse the repository at this point in the history
  • Loading branch information
manojdbos committed Jan 20, 2025
1 parent b904404 commit ce75483
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 21 deletions.
1 change: 1 addition & 0 deletions dbos/_sys_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class WorkflowStatusInternal(TypedDict):
name: str
class_name: Optional[str]
config_name: Optional[str]
input: Optional[_serialization.WorkflowInputs] # JSON (jsonpickle)
output: Optional[str] # JSON (jsonpickle)
error: Optional[str] # JSON (jsonpickle)
executor_id: Optional[str]
Expand Down
59 changes: 46 additions & 13 deletions dbos/_workflow_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from dbos import DBOS

from . import load_config
from . import _serialization, load_config
from ._dbos_config import ConfigFile, _is_valid_app_name
from ._sys_db import (
GetWorkflowsInput,
Expand All @@ -17,6 +17,26 @@
)


class WorkflowInformation:
workflowUUID: str
status: WorkflowStatuses
name: str
class_name: Optional[str]
config_name: Optional[str]
input: Optional[_serialization.WorkflowInputs] # JSON (jsonpickle)
output: Optional[str] # JSON (jsonpickle)
error: Optional[str] # JSON (jsonpickle)
executor_id: Optional[str]
app_version: Optional[str]
app_id: Optional[str]
request: Optional[str] # JSON (jsonpickle)
recovery_attempts: Optional[int]
authenticated_user: Optional[str]
assumed_role: Optional[str]
authenticated_roles: Optional[str] # JSON list of roles.
queue_name: Optional[str]


def _list_workflows(
config: ConfigFile,
li: int,
Expand All @@ -26,10 +46,9 @@ def _list_workflows(
status: Optional[str],
request: bool,
appversion: Optional[str],
) -> List[WorkflowStatusInternal]:
) -> List[WorkflowInformation]:

sys_db = None

try:
sys_db = SystemDatabase(config)
except Exception as e:
Expand All @@ -50,7 +69,7 @@ def _list_workflows(

output: GetWorkflowsOutput = sys_db.get_workflows(input)

infos: List[WorkflowStatusInternal] = []
infos: List[WorkflowInformation] = []

if output.workflow_uuids is None:
typer.echo("No workflows found")
Expand All @@ -68,8 +87,7 @@ def _list_workflows(

def _get_workflow(
config: ConfigFile, uuid: str, request: bool
) -> Optional[WorkflowStatusInternal]:
print(f"Getting workflow info for {uuid}")
) -> Optional[WorkflowInformation]:
sys_db = None

try:
Expand Down Expand Up @@ -109,28 +127,43 @@ def _reattempt_workflow(uuid: str, startNewWorkflow: bool) -> None:

def _get_workflow_info(
sys_db: SystemDatabase, workflowUUID: str, getRequest: bool
) -> Optional[WorkflowStatusInternal]:
) -> Optional[WorkflowInformation]:
info = sys_db.get_workflow_status(workflowUUID)
if info is None:
return None

info["workflow_uuid"] = workflowUUID
winfo = WorkflowInformation()
print("creating winfo")

winfo.workflowUUID = workflowUUID
winfo.status = info["status"]
winfo.name = info["name"]
winfo.class_name = info["class_name"]
winfo.config_name = info["config_name"]
winfo.executor_id = info["executor_id"]
winfo.app_version = info["app_version"]
winfo.app_id = info["app_id"]
winfo.recovery_attempts = info["recovery_attempts"]
winfo.authenticated_user = info["authenticated_user"]
winfo.assumed_role = info["assumed_role"]
winfo.authenticated_roles = info["authenticated_roles"]
winfo.queue_name = info["queue_name"]

# no input field
input_data = sys_db.get_workflow_inputs(workflowUUID)
if input_data is not None:
info["input"] = input_data
winfo.input = input_data

if info.get("status") == "SUCCESS":
result = sys_db.await_workflow_result(workflowUUID)
info["output"] = result
winfo.output = result
elif info.get("status") == "ERROR":
try:
sys_db.await_workflow_result(workflowUUID)
except Exception as e:
info["error"] = str(e)
winfo.error = str(e)

if not getRequest:
info["request"] = None
winfo.request = None

return info
return winfo
6 changes: 3 additions & 3 deletions dbos/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from dbos._schemas.system_database import SystemSchema

from . import load_config
from . import _serialization, load_config
from ._app_db import ApplicationDatabase
from ._dbos_config import _is_valid_app_name
from ._sys_db import SystemDatabase
Expand Down Expand Up @@ -397,7 +397,7 @@ def list(
workflows = _list_workflows(
config, limit, user, starttime, endtime, status, request, appversion
)
print(workflows)
print(_serialization.serialize(workflows))


@workflow.command(help="Retrieve the status of a workflow")
Expand All @@ -413,7 +413,7 @@ def get(
] = True,
) -> None:
config = load_config()
print(_get_workflow(config, uuid, request))
print(_serialization.serialize(_get_workflow(config, uuid, request)))


@workflow.command(
Expand Down
10 changes: 5 additions & 5 deletions tests/test_workflow_cmds.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def simple_workflow() -> None:
assert len(output) == 1, f"Expected list length to be 1, but got {len(output)}"
assert output[0] != None, "Expected output to be not None"
if output[0] != None:
assert output[0]["workflow_uuid"].strip(), "field_name is an empty string"
assert output[0].workflowUUID.strip(), "field_name is an empty string"


def test_list_workflow_limit(dbos: DBOS, config: ConfigFile) -> None:
Expand Down Expand Up @@ -178,13 +178,13 @@ def simple_workflow() -> None:

assert output[0] != None, "Expected output to be not None"

wfUuid = output[0]["workflow_uuid"]
wfUuid = output[0].workflowUUID

info = _workflow_commands._get_workflow(config, wfUuid, True)
assert info is not None, "Expected output to be not None"

if info is not None:
assert info["workflow_uuid"] == wfUuid, f"Expected workflow_uuid to be {wfUuid}"
assert info.workflowUUID == wfUuid, f"Expected workflow_uuid to be {wfUuid}"
print(info)


Expand All @@ -207,12 +207,12 @@ def simple_workflow() -> None:

print(output[0])
assert output[0] != None, "Expected output to be not None"
wfUuid = output[0]["workflow_uuid"]
wfUuid = output[0].workflowUUID

_workflow_commands._cancel_workflow(config, wfUuid)

info = _workflow_commands._get_workflow(config, wfUuid, True)
print(info)
assert info is not None, "Expected info to be not None"
if info is not None:
assert info["status"] == "CANCELLED", f"Expected status to be CANCELLED"
assert info.status == "CANCELLED", f"Expected status to be CANCELLED"

0 comments on commit ce75483

Please sign in to comment.