diff --git a/dbos/_sys_db.py b/dbos/_sys_db.py index ba51476..43d938b 100644 --- a/dbos/_sys_db.py +++ b/dbos/_sys_db.py @@ -62,6 +62,7 @@ class WorkflowStatusInternal(TypedDict): name: str class_name: Optional[str] config_name: Optional[str] + input: Optional[_serialization.WorkflowInputs] # JSON (jsonpickle) output: Optional[str] # JSON (jsonpickle) error: Optional[str] # JSON (jsonpickle) executor_id: Optional[str] diff --git a/dbos/_workflow_commands.py b/dbos/_workflow_commands.py index de1ef9a..435b3d3 100644 --- a/dbos/_workflow_commands.py +++ b/dbos/_workflow_commands.py @@ -5,7 +5,7 @@ from dbos import DBOS -from . import load_config +from . import _serialization, load_config from ._dbos_config import ConfigFile, _is_valid_app_name from ._sys_db import ( GetWorkflowsInput, @@ -17,6 +17,26 @@ ) +class WorkflowInformation: + workflowUUID: str + status: WorkflowStatuses + name: str + class_name: Optional[str] + config_name: Optional[str] + input: Optional[_serialization.WorkflowInputs] # JSON (jsonpickle) + output: Optional[str] # JSON (jsonpickle) + error: Optional[str] # JSON (jsonpickle) + executor_id: Optional[str] + app_version: Optional[str] + app_id: Optional[str] + request: Optional[str] # JSON (jsonpickle) + recovery_attempts: Optional[int] + authenticated_user: Optional[str] + assumed_role: Optional[str] + authenticated_roles: Optional[str] # JSON list of roles. + queue_name: Optional[str] + + def _list_workflows( config: ConfigFile, li: int, @@ -26,10 +46,9 @@ def _list_workflows( status: Optional[str], request: bool, appversion: Optional[str], -) -> List[WorkflowStatusInternal]: +) -> List[WorkflowInformation]: sys_db = None - try: sys_db = SystemDatabase(config) except Exception as e: @@ -50,7 +69,7 @@ def _list_workflows( output: GetWorkflowsOutput = sys_db.get_workflows(input) - infos: List[WorkflowStatusInternal] = [] + infos: List[WorkflowInformation] = [] if output.workflow_uuids is None: typer.echo("No workflows found") @@ -68,8 +87,7 @@ def _list_workflows( def _get_workflow( config: ConfigFile, uuid: str, request: bool -) -> Optional[WorkflowStatusInternal]: - print(f"Getting workflow info for {uuid}") +) -> Optional[WorkflowInformation]: sys_db = None try: @@ -109,28 +127,43 @@ def _reattempt_workflow(uuid: str, startNewWorkflow: bool) -> None: def _get_workflow_info( sys_db: SystemDatabase, workflowUUID: str, getRequest: bool -) -> Optional[WorkflowStatusInternal]: +) -> Optional[WorkflowInformation]: info = sys_db.get_workflow_status(workflowUUID) if info is None: return None - info["workflow_uuid"] = workflowUUID + winfo = WorkflowInformation() + print("creating winfo") + + winfo.workflowUUID = workflowUUID + winfo.status = info["status"] + winfo.name = info["name"] + winfo.class_name = info["class_name"] + winfo.config_name = info["config_name"] + winfo.executor_id = info["executor_id"] + winfo.app_version = info["app_version"] + winfo.app_id = info["app_id"] + winfo.recovery_attempts = info["recovery_attempts"] + winfo.authenticated_user = info["authenticated_user"] + winfo.assumed_role = info["assumed_role"] + winfo.authenticated_roles = info["authenticated_roles"] + winfo.queue_name = info["queue_name"] # no input field input_data = sys_db.get_workflow_inputs(workflowUUID) if input_data is not None: - info["input"] = input_data + winfo.input = input_data if info.get("status") == "SUCCESS": result = sys_db.await_workflow_result(workflowUUID) - info["output"] = result + winfo.output = result elif info.get("status") == "ERROR": try: sys_db.await_workflow_result(workflowUUID) except Exception as e: - info["error"] = str(e) + winfo.error = str(e) if not getRequest: - info["request"] = None + winfo.request = None - return info + return winfo diff --git a/dbos/cli.py b/dbos/cli.py index bda7bb1..bc39b8a 100644 --- a/dbos/cli.py +++ b/dbos/cli.py @@ -17,7 +17,7 @@ from dbos._schemas.system_database import SystemSchema -from . import load_config +from . import _serialization, load_config from ._app_db import ApplicationDatabase from ._dbos_config import _is_valid_app_name from ._sys_db import SystemDatabase @@ -397,7 +397,7 @@ def list( workflows = _list_workflows( config, limit, user, starttime, endtime, status, request, appversion ) - print(workflows) + print(_serialization.serialize(workflows)) @workflow.command(help="Retrieve the status of a workflow") @@ -413,7 +413,7 @@ def get( ] = True, ) -> None: config = load_config() - print(_get_workflow(config, uuid, request)) + print(_serialization.serialize(_get_workflow(config, uuid, request))) @workflow.command( diff --git a/tests/test_workflow_cmds.py b/tests/test_workflow_cmds.py index b7bcd51..322c1ec 100644 --- a/tests/test_workflow_cmds.py +++ b/tests/test_workflow_cmds.py @@ -37,7 +37,7 @@ def simple_workflow() -> None: assert len(output) == 1, f"Expected list length to be 1, but got {len(output)}" assert output[0] != None, "Expected output to be not None" if output[0] != None: - assert output[0]["workflow_uuid"].strip(), "field_name is an empty string" + assert output[0].workflowUUID.strip(), "field_name is an empty string" def test_list_workflow_limit(dbos: DBOS, config: ConfigFile) -> None: @@ -178,13 +178,13 @@ def simple_workflow() -> None: assert output[0] != None, "Expected output to be not None" - wfUuid = output[0]["workflow_uuid"] + wfUuid = output[0].workflowUUID info = _workflow_commands._get_workflow(config, wfUuid, True) assert info is not None, "Expected output to be not None" if info is not None: - assert info["workflow_uuid"] == wfUuid, f"Expected workflow_uuid to be {wfUuid}" + assert info.workflowUUID == wfUuid, f"Expected workflow_uuid to be {wfUuid}" print(info) @@ -207,7 +207,7 @@ def simple_workflow() -> None: print(output[0]) assert output[0] != None, "Expected output to be not None" - wfUuid = output[0]["workflow_uuid"] + wfUuid = output[0].workflowUUID _workflow_commands._cancel_workflow(config, wfUuid) @@ -215,4 +215,4 @@ def simple_workflow() -> None: print(info) assert info is not None, "Expected info to be not None" if info is not None: - assert info["status"] == "CANCELLED", f"Expected status to be CANCELLED" + assert info.status == "CANCELLED", f"Expected status to be CANCELLED"