Skip to content

Commit

Permalink
Workflow cmds (#180)
Browse files Browse the repository at this point in the history
dbos workflow list
dbos workflow get
dbos workflow cancel
  • Loading branch information
manojdbos authored Jan 22, 2025
1 parent dc5c4c6 commit a529b4c
Show file tree
Hide file tree
Showing 4 changed files with 491 additions and 4 deletions.
6 changes: 3 additions & 3 deletions dbos/_sys_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ def set_workflow_status(
stmt = (
sa.update(SystemSchema.workflow_status)
.where(
SystemSchema.workflow_inputs.c.workflow_uuid == workflow_uuid
SystemSchema.workflow_status.c.workflow_uuid == workflow_uuid
)
.values(recovery_attempts=reset_recovery_attempts)
)
Expand Down Expand Up @@ -582,12 +582,12 @@ def get_workflows(self, input: GetWorkflowsInput) -> GetWorkflowsOutput:
if input.start_time:
query = query.where(
SystemSchema.workflow_status.c.created_at
>= datetime.datetime.fromisoformat(input.start_time).timestamp()
>= datetime.datetime.fromisoformat(input.start_time).timestamp() * 1000
)
if input.end_time:
query = query.where(
SystemSchema.workflow_status.c.created_at
<= datetime.datetime.fromisoformat(input.end_time).timestamp()
<= datetime.datetime.fromisoformat(input.end_time).timestamp() * 1000
)
if input.status:
query = query.where(SystemSchema.workflow_status.c.status == input.status)
Expand Down
172 changes: 172 additions & 0 deletions dbos/_workflow_commands.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
from typing import Any, List, Optional, cast

import typer
from rich import print

from dbos import DBOS

from . import _serialization, load_config
from ._dbos_config import ConfigFile, _is_valid_app_name
from ._sys_db import (
GetWorkflowsInput,
GetWorkflowsOutput,
SystemDatabase,
WorkflowStatuses,
WorkflowStatusInternal,
WorkflowStatusString,
)


class WorkflowInformation:
workflowUUID: str
status: WorkflowStatuses
workflowName: str
workflowClassName: Optional[str]
workflowConfigName: Optional[str]
input: Optional[_serialization.WorkflowInputs] # JSON (jsonpickle)
output: Optional[str] # JSON (jsonpickle)
error: Optional[str] # JSON (jsonpickle)
executor_id: Optional[str]
app_version: Optional[str]
app_id: Optional[str]
request: Optional[str] # JSON (jsonpickle)
recovery_attempts: Optional[int]
authenticated_user: Optional[str]
assumed_role: Optional[str]
authenticated_roles: Optional[str] # JSON list of roles.
queue_name: Optional[str]


def _list_workflows(
config: ConfigFile,
li: int,
user: Optional[str],
starttime: Optional[str],
endtime: Optional[str],
status: Optional[str],
request: bool,
appversion: Optional[str],
) -> List[WorkflowInformation]:

sys_db = None

try:
sys_db = SystemDatabase(config)

input = GetWorkflowsInput()
input.authenticated_user = user
input.start_time = starttime
input.end_time = endtime
if status is not None:
input.status = cast(WorkflowStatuses, status)
input.application_version = appversion
input.limit = li

output: GetWorkflowsOutput = sys_db.get_workflows(input)

infos: List[WorkflowInformation] = []

if output.workflow_uuids is None:
typer.echo("No workflows found")
return {}

for workflow_id in output.workflow_uuids:
info = _get_workflow_info(
sys_db, workflow_id, request
) # Call the method for each ID

if info is not None:
infos.append(info)

return infos
except Exception as e:
typer.echo(f"Error listing workflows: {e}")
return []
finally:
if sys_db:
sys_db.destroy()


def _get_workflow(
config: ConfigFile, uuid: str, request: bool
) -> Optional[WorkflowInformation]:
sys_db = None

try:
sys_db = SystemDatabase(config)

info = _get_workflow_info(sys_db, uuid, request)
return info

except Exception as e:
typer.echo(f"Error getting workflow: {e}")
return None
finally:
if sys_db:
sys_db.destroy()


def _cancel_workflow(config: ConfigFile, uuid: str) -> None:
# config = load_config()
sys_db = None

try:
sys_db = SystemDatabase(config)
sys_db.set_workflow_status(uuid, WorkflowStatusString.CANCELLED, False)
return

except Exception as e:
typer.echo(f"Failed to connect to DBOS system database: {e}")
return None
finally:
if sys_db:
sys_db.destroy()


def _reattempt_workflow(uuid: str, startNewWorkflow: bool) -> None:
print(f"Reattempt workflow info for {uuid} not implemented")
return


def _get_workflow_info(
sys_db: SystemDatabase, workflowUUID: str, getRequest: bool
) -> Optional[WorkflowInformation]:

info = sys_db.get_workflow_status(workflowUUID)
if info is None:
return None

winfo = WorkflowInformation()

winfo.workflowUUID = workflowUUID
winfo.status = info["status"]
winfo.workflowName = info["name"]
winfo.workflowClassName = info["class_name"]
winfo.workflowConfigName = info["config_name"]
winfo.executor_id = info["executor_id"]
winfo.app_version = info["app_version"]
winfo.app_id = info["app_id"]
winfo.recovery_attempts = info["recovery_attempts"]
winfo.authenticated_user = info["authenticated_user"]
winfo.assumed_role = info["assumed_role"]
winfo.authenticated_roles = info["authenticated_roles"]
winfo.queue_name = info["queue_name"]

# no input field
input_data = sys_db.get_workflow_inputs(workflowUUID)
if input_data is not None:
winfo.input = input_data

if info.get("status") == "SUCCESS":
result = sys_db.await_workflow_result(workflowUUID)
winfo.output = result
elif info.get("status") == "ERROR":
try:
sys_db.await_workflow_result(workflowUUID)
except Exception as e:
winfo.error = str(e)

if not getRequest:
winfo.request = None

return winfo
101 changes: 100 additions & 1 deletion dbos/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from os import path
from typing import Any

import jsonpickle # type: ignore
import sqlalchemy as sa
import tomlkit
import typer
Expand All @@ -17,12 +18,21 @@

from dbos._schemas.system_database import SystemSchema

from . import load_config
from . import _serialization, load_config
from ._app_db import ApplicationDatabase
from ._dbos_config import _is_valid_app_name
from ._sys_db import SystemDatabase
from ._workflow_commands import (
_cancel_workflow,
_get_workflow,
_list_workflows,
_reattempt_workflow,
)

app = typer.Typer()
workflow = typer.Typer()

app.add_typer(workflow, name="workflow", help="Manage DBOS workflows")


def _on_windows() -> bool:
Expand Down Expand Up @@ -333,5 +343,94 @@ def reset(
sys_db.destroy()


@workflow.command(help="List workflows for your application")
def list(
limit: Annotated[
int,
typer.Option("--limit", "-l", help="Limit the results returned"),
] = 10,
user: Annotated[
typing.Optional[str],
typer.Option("--user", "-u", help="Retrieve workflows run by this user"),
] = None,
starttime: Annotated[
typing.Optional[str],
typer.Option(
"--start-time",
"-s",
help="Retrieve workflows starting after this timestamp (ISO 8601 format)",
),
] = None,
endtime: Annotated[
typing.Optional[str],
typer.Option(
"--end-time",
"-e",
help="Retrieve workflows starting before this timestamp (ISO 8601 format)",
),
] = None,
status: Annotated[
typing.Optional[str],
typer.Option(
"--status",
"-S",
help="Retrieve workflows with this status (PENDING, SUCCESS, ERROR, RETRIES_EXCEEDED, ENQUEUED, or CANCELLED)",
),
] = None,
appversion: Annotated[
typing.Optional[str],
typer.Option(
"--application-version",
"-v",
help="Retrieve workflows with this application version",
),
] = None,
request: Annotated[
bool,
typer.Option("--request", help="Retrieve workflow request information"),
] = True,
appdir: Annotated[
typing.Optional[str],
typer.Option("--app-dir", "-d", help="Specify the application root directory"),
] = None,
) -> None:
config = load_config()
workflows = _list_workflows(
config, limit, user, starttime, endtime, status, request, appversion
)
print(jsonpickle.encode(workflows, unpicklable=False))


@workflow.command(help="Retrieve the status of a workflow")
def get(
uuid: Annotated[str, typer.Argument()],
appdir: Annotated[
typing.Optional[str],
typer.Option("--app-dir", "-d", help="Specify the application root directory"),
] = None,
request: Annotated[
bool,
typer.Option("--request", help="Retrieve workflow request information"),
] = True,
) -> None:
config = load_config()
print(jsonpickle.encode(_get_workflow(config, uuid, request), unpicklable=False))


@workflow.command(
help="Cancel a workflow so it is no longer automatically retried or restarted"
)
def cancel(
uuid: Annotated[str, typer.Argument()],
appdir: Annotated[
typing.Optional[str],
typer.Option("--app-dir", "-d", help="Specify the application root directory"),
] = None,
) -> None:
config = load_config()
_cancel_workflow(config, uuid)
print(f"Workflow {uuid} has been cancelled")


if __name__ == "__main__":
app()
Loading

0 comments on commit a529b4c

Please sign in to comment.