Skip to content

Commit

Permalink
Schema migration CLI (#21)
Browse files Browse the repository at this point in the history
- Add a migrate command to the CLI
- Update the template to use alembic and sqlalchemy.
  • Loading branch information
qianl15 authored Aug 1, 2024
1 parent fc11d16 commit 6c2892b
Show file tree
Hide file tree
Showing 9 changed files with 271 additions and 1 deletion.
51 changes: 51 additions & 0 deletions dbos_transact/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

import typer

from dbos_transact.application_database import ApplicationDatabase
from dbos_transact.dbos_config import load_config
from dbos_transact.system_database import SystemDatabase

app = typer.Typer()

Expand All @@ -25,5 +27,54 @@ def create() -> None:
pass


@app.command()
def migrate() -> None:
config = load_config()
if not config["database"]["password"]:
typer.echo(
"DBOS configuration does not contain database password, please check your config file and retry!"
)
raise typer.Exit(code=1)
app_db_name = config["database"]["app_db_name"]

typer.echo(f"Starting schema migration for database {app_db_name}")

# First, run DBOS migrations on the system database and the application database
app_db = None
sys_db = None
try:
sys_db = SystemDatabase(config)
app_db = ApplicationDatabase(config)
except Exception as e:
typer.echo(f"DBOS system schema migration failed: {e}")
finally:
if sys_db:
sys_db.destroy()
if app_db:
app_db.destroy()

# Next, run any custom migration commands specified in the configuration
try:
migrate_commands = (
config["database"]["migrate"]
if "migrate" in config["database"] and config["database"]["migrate"]
else []
)
for command in migrate_commands:
typer.echo(f"Executing migration command: {command}")
result = subprocess.run(command, shell=True, text=True)
if result.returncode != 0:
typer.echo(f"Migration command failed: {command}")
typer.echo(result.stderr)
raise typer.Exit(1)
if result.stdout:
typer.echo(result.stdout.rstrip())
except Exception as e:
typer.echo(f"An error occurred during schema migration: {e}")
raise typer.Exit(code=1)

typer.echo(f"Completed schema migration for database {app_db_name}")


if __name__ == "__main__":
app()
47 changes: 47 additions & 0 deletions templates/hello/alembic.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
[alembic]
script_location = alembic

# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file
# for all available tokens
file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d%%(second).2d_%%(slug)s

# sys.path path, will be prepended to sys.path if present.
# defaults to the current working directory.
prepend_sys_path = .

version_path_separator = os # Use os.pathsep.

# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic

[handlers]
keys = console

[formatters]
keys = generic

[logger_root]
level = WARN
handlers = console
qualname =

[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine

[logger_alembic]
level = INFO
handlers =
qualname = alembic

[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic

[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S
93 changes: 93 additions & 0 deletions templates/hello/alembic/env.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import os
from logging.config import fileConfig

from alembic import context
from sqlalchemy import URL, engine_from_config, pool

from dbos_transact.dbos_config import load_config

# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config

# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
fileConfig(config.config_file_name)

# Load DBOS Config and parse the database URL
dbos_config_path = os.path.join(
os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "dbos-config.yaml"
)

dbos_config = load_config(dbos_config_path)
db_url = URL.create(
"postgresql",
username=dbos_config["database"]["username"],
password=dbos_config["database"]["password"],
host=dbos_config["database"]["hostname"],
port=dbos_config["database"]["port"],
database=dbos_config["database"]["app_db_name"],
)
config.set_main_option("sqlalchemy.url", db_url.render_as_string(hide_password=False))

# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
target_metadata = None

# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.


def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)

with context.begin_transaction():
context.run_migrations()


def run_migrations_online() -> None:
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)

with connectable.connect() as connection:
context.configure(connection=connection, target_metadata=target_metadata)

with context.begin_transaction():
context.run_migrations()


if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
26 changes: 26 additions & 0 deletions templates/hello/alembic/script.py.mako
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
"""${message}

Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}

"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa
${imports if imports else ""}

# revision identifiers, used by Alembic.
revision: str = ${repr(up_revision)}
down_revision: Union[str, None] = ${repr(down_revision)}
branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)}
depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)}


def upgrade() -> None:
${upgrades if upgrades else "pass"}


def downgrade() -> None:
${downgrades if downgrades else "pass"}
36 changes: 36 additions & 0 deletions templates/hello/alembic/versions/2024_07_31_180642_init.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
"""init
Revision ID: c6b516e182b2
Revises:
Create Date: 2024-07-31 18:06:42.500040
"""

from typing import Sequence, Union

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision: str = "c6b516e182b2"
down_revision: Union[str, None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
op.execute(
sa.text(
"CREATE TABLE dbos_hello (name TEXT PRIMARY KEY, greet_count INT DEFAULT 0)"
)
)
op.execute(
sa.text(
"CREATE TABLE dbos_greetings (greeting_name TEXT, greeting_note_content TEXT)"
)
)


def downgrade() -> None:
op.execute(sa.text("DROP TABLE dbos_hello"))
op.execute(sa.text("DROP TABLE dbos_greetings"))
2 changes: 2 additions & 0 deletions templates/hello/dbos-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ database:
username: postgres
password: ${PGPASSWORD}
app_db_name: hello
migrate:
- alembic upgrade head
telemetry:
logs:
logLevel: INFO
6 changes: 5 additions & 1 deletion templates/hello/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@ def example_workflow(ctx: WorkflowContext, var: str) -> str:

@dbos.transaction()
def example_transaction(ctx: TransactionContext, var: str) -> str:
rows = ctx.session.execute(sa.text("SELECT 1")).fetchall()
rows = ctx.session.execute(
sa.text(
"INSERT INTO dbos_hello (name, greet_count) VALUES ('dbos', 1) ON CONFLICT (name) DO UPDATE SET greet_count = dbos_hello.greet_count + 1 RETURNING greet_count;"
)
).all()
return var + str(rows[0][0])


Expand Down
8 changes: 8 additions & 0 deletions templates/hello/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
alembic==1.13.2
annotated-types==0.7.0
anyio==4.4.0
attrs==23.2.0
certifi==2024.7.4
click==8.1.7
dnspython==2.6.1
Expand All @@ -14,16 +16,22 @@ httptools==0.6.1
httpx==0.27.0
idna==3.7
Jinja2==3.1.4
jsonschema==4.23.0
jsonschema-specifications==2023.12.1
Mako==1.3.5
markdown-it-py==3.0.0
MarkupSafe==2.1.5
mdurl==0.1.2
psycopg2-binary==2.9.9
pydantic==2.8.2
pydantic_core==2.20.1
Pygments==2.18.0
python-dotenv==1.0.1
python-multipart==0.0.9
PyYAML==6.0.1
referencing==0.35.1
rich==13.7.1
rpds-py==0.19.1
shellingham==1.5.4
sniffio==1.3.1
SQLAlchemy==2.0.31
Expand Down
3 changes: 3 additions & 0 deletions tests/test_package.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ def test_package(build_wheel: str, postgres_db_engine: sa.Engine) -> None:
# Install the dbos_transact package into the virtual environment
subprocess.check_call(["pip", "install", build_wheel], env=venv)

# Run schema migration
subprocess.check_call(["dbos", "migrate"], cwd=template_path, env=venv)

# Launch the application in the virtual environment as a background process
process = subprocess.Popen(["dbos", "start"], cwd=template_path, env=venv)

Expand Down

0 comments on commit 6c2892b

Please sign in to comment.