Skip to content

Commit

Permalink
task(zymtools): implement datalayer
Browse files Browse the repository at this point in the history
  • Loading branch information
svaponi committed Jul 20, 2024
1 parent e4b0d9c commit 9aafe9e
Show file tree
Hide file tree
Showing 43 changed files with 1,433 additions and 262 deletions.
13 changes: 12 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ python = "^3.12"
fastapi = "^0.104.1"
httpx = "^0.27.0"
asyncpg = "^0.29.0"
cachetools = "^5.4.0"

[tool.poetry.group.dev.dependencies]
pytest = "^8.1.1"
Expand Down
5 changes: 3 additions & 2 deletions src/app/api/v1.py → src/app/api/api.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import fastapi

from app.api import users
from app.api import users, ratecards


def setup_api_v1(app: fastapi.FastAPI):
def setup_api(app: fastapi.FastAPI):
api_router = fastapi.APIRouter()
api_router.include_router(users.router, prefix="/users", tags=["users"])
api_router.include_router(ratecards.router, prefix="/ratecards", tags=["ratecards"])
app.include_router(api_router, prefix="/api/v1")
52 changes: 52 additions & 0 deletions src/app/api/ratecards.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from uuid import UUID

import fastapi

from app.datalayer.ratecard import (
RateCardRecord,
RateCardRepository,
RateCardPatchPayload,
RateCardCreatePayload,
)

router = fastapi.APIRouter()


@router.get("")
async def get_all(
data_service: RateCardRepository = fastapi.Depends(),
) -> list[RateCardRecord]:
return await data_service.get_all()


@router.post("")
async def create(
payload: RateCardCreatePayload,
data_service: RateCardRepository = fastapi.Depends(),
) -> UUID:
return await data_service.create(payload)


@router.get("/{pk}")
async def get_by_id(
pk: UUID,
data_service: RateCardRepository = fastapi.Depends(),
) -> None:
return await data_service.get_by_id(pk)


@router.patch("/{pk}")
async def patch_by_id(
pk: UUID,
payload: RateCardPatchPayload,
data_service: RateCardRepository = fastapi.Depends(),
) -> None:
await data_service.patch_by_id(pk, payload)


@router.delete("/{pk}")
async def delete_by_id(
pk: UUID,
data_service: RateCardRepository = fastapi.Depends(),
) -> None:
await data_service.delete_by_id(pk)
34 changes: 4 additions & 30 deletions src/app/api/users.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,12 @@
import fastapi

from app.datalayer.model import UserAccount
from app.datalayer.users import UserService
from app.datalayer.user_account import UserAccountRepository, UserAccountRecord

router = fastapi.APIRouter()


@router.get("")
async def get_users(
data_service: UserService = fastapi.Depends(),
page: int = 0,
size: int = 10,
) -> list[UserAccount]:
# Read users
users = await data_service.read_users()

# Create users
if not users:
await data_service.create_user("Alice", "alice@example.com")
await data_service.create_user("Bob", "bob@example.com")

# Read users
users = await data_service.read_users()

# # Update user
# user_id = users[0].id
# await data_service.update_user(user_id, "John")
#
# # Read users again
# await data_service.read_users()
#
# # Delete user
# await data_service.delete_user(2)

# Read users again

return users
data_service: UserAccountRepository = fastapi.Depends(),
) -> list[UserAccountRecord]:
return await data_service.get_all()
22 changes: 14 additions & 8 deletions src/app/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
import fastapi
from starlette.responses import RedirectResponse

from app.api.v1 import setup_api_v1
from app.api.api import setup_api
from app.core.cors import setup_cors
from app.core.datasource import setup_datasource
from app.core.datasource import create_connection_pool_manager
from app.core.error_handlers import setup_error_handlers
from app.core.logging import setup_logging
from app.core.migration import run_migrations
Expand All @@ -18,14 +18,20 @@
@contextlib.asynccontextmanager
async def _lifespan(app: "App"):
app.logger.info(f"Starting ...")
await app.datasource.connect()
await run_migrations(app.datasource)

await app.cpm.get_pool()
await run_migrations(app.cpm)
# ...other startup code

app.logger.info("✅ Started")
yield

yield # this is where the app is running

app.logger.info("Shutting down ...")
await app.datasource.disconnect()

await app.cpm.close()
# ...other shutdown code

app.logger.info("🛑 Shutdown")


Expand Down Expand Up @@ -53,10 +59,10 @@ def __init__(
setup_cors(self)

# Setup API routes
setup_api_v1(self)
setup_api(self)

# Setup DB data source
self.datasource = setup_datasource(self)
self.cpm = create_connection_pool_manager(self)

if hasattr(self, "docs_url") and self.docs_url:

Expand Down
36 changes: 3 additions & 33 deletions src/app/core/datasource.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,9 @@
import logging
import os

import asyncpg
import fastapi


# Simple class that takes care of setting up and tearing down the connection pool.
# Useful to decouple for the actual connection pool object.
# It's meant to be used in FastAPI lifespan (see https://fastapi.tiangolo.com/advanced/events/#lifespan).
class DataSource:
def __init__(self, app: fastapi.FastAPI):
self._pool: asyncpg.Pool | None = None
self.logger = logging.getLogger(__name__)

async def connect(self):
if not self._pool:
postgres_url = os.getenv("POSTGRES_URL")
assert postgres_url, "missing POSTGRES_URL"
self._pool = await asyncpg.create_pool(dsn=postgres_url)
self.logger.info("DataSource created")

async def disconnect(self):
if self._pool:
await self._pool.close()
self.logger.info("DataSource closed")

@property
def pool(self) -> asyncpg.Pool:
if not self._pool:
raise RuntimeError("DataSource not initialized, you need to call `await connect()` in the lifespan event, "
"see see https://fastapi.tiangolo.com/advanced/events/#lifespan.")
return self._pool
from app.libs.datalayer.connectionpool.manager import ConnectionPoolManager


def setup_datasource(app: fastapi.FastAPI) -> DataSource:
return DataSource(app)
def create_connection_pool_manager(app: fastapi.FastAPI) -> ConnectionPoolManager:
return ConnectionPoolManager()


34 changes: 34 additions & 0 deletions src/app/core/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import fastapi
from starlette import status


class BadRequestException(fastapi.HTTPException):
def __init__(self, message: str) -> None:
super().__init__(
status_code=status.HTTP_400_BAD_REQUEST,
detail=message,
)


class UnauthorizedException(fastapi.HTTPException):
def __init__(self, message: str) -> None:
super().__init__(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=message,
)


class ForbiddenException(fastapi.HTTPException):
def __init__(self, message: str) -> None:
super().__init__(
status_code=status.HTTP_403_FORBIDDEN,
detail=message,
)


class NotFoundException(fastapi.HTTPException):
def __init__(self, message: str) -> None:
super().__init__(
status_code=status.HTTP_404_NOT_FOUND,
detail=message,
)
26 changes: 9 additions & 17 deletions src/app/core/migration.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,12 @@
import os.path
import os

from app.core.datasource import DataSource


async def run_migrations(datasource: DataSource):
async with datasource.pool.acquire(timeout=2) as connection:
await connection.execute("create table if not exists migration (name text primary key, executed_at timestamp)")
records = await connection.fetch("select * from migration")
print(records)
if len(records) > 0:
print("Migration table already exists")
else:
with open(os.path.join(os.path.dirname(__file__), "../../schema.sql")) as f:
content = f.read()
print(content)
await connection.execute(content)
await connection.execute("insert into migration (name, executed_at) values ('migration.sql', now())")
from app.dirs import resources_dir
from app.libs.datalayer.connectionpool.manager import ConnectionPoolManager
from app.libs.migrationtool import MigrationTool


async def run_migrations(provider: ConnectionPoolManager):
pool = await provider.get_pool()
migration_tool = MigrationTool(pool)
migrations_dir = os.path.join(resources_dir, "migrations")
await migration_tool.migrate(migrations_dir)
19 changes: 0 additions & 19 deletions src/app/datalayer/db.py

This file was deleted.

Loading

0 comments on commit 9aafe9e

Please sign in to comment.