Skip to content

Commit

Permalink
progressi, working
Browse files Browse the repository at this point in the history
  • Loading branch information
guifry committed Feb 7, 2025
1 parent 41f9801 commit c58ca19
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 71 deletions.
4 changes: 3 additions & 1 deletion apps/consent-api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@
import asyncio
import logging
from contextlib import asynccontextmanager
from src.config import START_DELAY_SECONDS

logger = logging.getLogger(__name__)

@asynccontextmanager
async def lifespan(app: FastAPI):
logger.info("Application starting up...")
# Give Cloud SQL a moment to accept connections
await asyncio.sleep(5)
logger.info(f"Starting up server with delay {START_DELAY_SECONDS} seconds")
await asyncio.sleep(START_DELAY_SECONDS)
yield

app = FastAPI(lifespan=lifespan)
Expand Down
5 changes: 5 additions & 0 deletions apps/consent-api/src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@
BIGQUERY_DATASET_ID = os.getenv("BIGQUERY_DATASET_ID", f"consent_audit_logs_{ENV}")
BIGQUERY_TABLE_ID = os.getenv("BIGQUERY_TABLE_ID", "consent_events")

START_DELAY_SECONDS = {
"production": 5,
"staging": 2,
"development": 2}[ENV]

# Add other application-wide configuration settings here
# For example:
# DEBUG = os.getenv("DEBUG", "false").lower() == "true"
Expand Down
100 changes: 59 additions & 41 deletions apps/consent-api/src/db_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
from asyncpg.exceptions import TooManyConnectionsError
import logging
import math
from typing import AsyncGenerator
from typing import AsyncGenerator, Dict, TypedDict
from dataclasses import dataclass

from essentials.json import FriendlyEncoder
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
Expand All @@ -12,82 +13,99 @@

logger = logging.getLogger(__name__)

# Connection pool settings optimized for Cloud SQL
# Calculate pool size based on CPU cores (Cloud SQL typically provides 2 connections per vCPU)
CPU_COUNT = 8 # Assuming 8 vCPU instance, adjust based on your Cloud SQL tier
POOL_SIZE = max(CPU_COUNT * 2, 20) # Minimum of 20 connections
MAX_OVERFLOW = math.ceil(POOL_SIZE * 0.5) # Allow 50% overflow
@dataclass
class DBConfig:
"""Database connection configuration per environment"""
pool_size: int
max_overflow: int
pool_timeout: int = 10
pool_recycle: int = 1800
statement_timeout: int = 30000
idle_timeout: int = 30000

# Connection health monitoring
HEALTH_CHECK_INTERVAL = 30 # seconds
MAX_RETRIES = 5
# Environment-specific database configurations
DB_CONFIGS = {
# Production: 8 CPU cores, 16GB RAM
"production": DBConfig(
pool_size=50, # Base pool size for high load
max_overflow=25, # 50% overflow for spikes
),
# Staging/Dev: Small instance
"staging": DBConfig(
pool_size=10, # Smaller pool for reduced resources
max_overflow=5, # Limited overflow
),
"development": DBConfig(
pool_size=10, # Same as staging
max_overflow=5,
)
}

current_env = config.ENV.lower()
db_config = DB_CONFIGS.get(current_env, DB_CONFIGS["development"])

logger.info(f"Using database configuration for environment: {current_env}")
logger.info(f"Pool size: {db_config.pool_size}, Max overflow: {db_config.max_overflow}")

# Connection health settings
MAX_RETRIES = 3
BASE_RETRY_DELAY = 0.1 # seconds

engine = create_async_engine(
config.SQLALCHEMY_DATABASE_URI,
json_serializer=FriendlyEncoder().encode,
# Optimized connection pool settings
pool_size=POOL_SIZE,
max_overflow=MAX_OVERFLOW,
pool_timeout=5, # Reduced timeout for faster failure
pool_pre_ping=True, # Keep connection health check
pool_recycle=1800, # Recycle connections every 30 minutes
# Connection pool settings from environment config
pool_size=db_config.pool_size,
max_overflow=db_config.max_overflow,
pool_timeout=db_config.pool_timeout,
pool_pre_ping=True, # Verify connection health
pool_recycle=db_config.pool_recycle,
echo=config.ENV != "production",
# Optimized connection parameters
# Performance optimizations
connect_args={
"server_settings": {
"application_name": "consent-api",
"statement_timeout": "10000", # Reduced to 10s for startup
"idle_in_transaction_session_timeout": "10000",
"tcp_keepalives_idle": "60", # TCP Keepalive idle time
"tcp_keepalives_interval": "10", # TCP Keepalive interval
"tcp_keepalives_count": "3" # TCP Keepalive count
},
"command_timeout": 10 # 10 second timeout for startup
"application_name": f"consent-api-{current_env}",
"statement_timeout": str(db_config.statement_timeout),
"idle_in_transaction_session_timeout": str(db_config.idle_timeout),
"tcp_keepalives_idle": "60",
"tcp_keepalives_interval": "10",
"tcp_keepalives_count": "3"
}
}
)

# Session factory optimized for high throughput
async_session = sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
autoflush=False,
expire_on_commit=False, # Don't expire objects after commit (better performance)
autoflush=False # Don't auto-flush (better performance)
)

async def exponential_backoff(attempt: int) -> float:
"""Calculate exponential backoff with jitter."""
delay = min(BASE_RETRY_DELAY * (2 ** attempt), 2.0) # Cap at 2 seconds
jitter = delay * 0.1 # 10% jitter
return delay + (asyncio.random() * jitter)

async def get_session() -> AsyncGenerator[AsyncSession, None]:
"""
Get a database session with optimized connection handling and retry logic.
Implements connection retry with exponential backoff and jitter.
Get a database session from the connection pool.
Implements connection retry with backoff for high-load scenarios.
"""
for attempt in range(MAX_RETRIES):
try:
async with async_session() as session:
try:
yield session
except Exception as e:
logger.error(f"Database error: {str(e)}", exc_info=True)
logger.error(f"Database error: {str(e)}")
await session.rollback()
raise
finally:
await session.close()
return # Success, exit retry loop
except TooManyConnectionsError as e:
if attempt == MAX_RETRIES - 1:
logger.error(f"Failed to get database connection after {MAX_RETRIES} attempts", exc_info=True)
logger.error(f"Failed to get database connection after {MAX_RETRIES} attempts")
raise
delay = await exponential_backoff(attempt)
logger.warning(f"Connection pool exhausted (attempt {attempt + 1}/{MAX_RETRIES}), retrying in {delay:.2f}s")
delay = BASE_RETRY_DELAY * (2 ** attempt) # Exponential backoff
logger.warning(f"Connection pool exhausted (attempt {attempt + 1}/{MAX_RETRIES}), retrying in {delay}s")
await asyncio.sleep(delay)
except Exception as e:
logger.error(f"Unexpected database error: {str(e)}", exc_info=True)
raise

# Async context manager for database sessions
db_context = asynccontextmanager(get_session)
45 changes: 20 additions & 25 deletions apps/consent-api/src/health.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,29 +11,25 @@

router = APIRouter()

async def check_db_connection(retries=3, delay=1):
async def check_db_connection(retries=3, delay=2):
"""Check database connection with retries"""
for attempt in range(retries):
try:
async with db_context() as db:
await db.execute(text("SELECT 1").execution_options(
skip_transaction=True,
statement_timeout=5000 # 5s timeout for health check
))
# Simple SELECT 1 query without transaction
await db.execute(text("SELECT 1").execution_options(skip_transaction=True))
return True
except Exception as e:
if attempt == retries - 1:
logger.error(f"Database connection failed after {retries} attempts: {str(e)}")
return False
logger.warning(f"Database connection attempt {attempt + 1} failed, retrying in {delay}s")
await asyncio.sleep(delay)
return False

@router.get("/health")
async def health_check():
"""
Health check endpoint for Cloud Run.
More resilient during startup with retries.
"""
"""Health check endpoint for Cloud Run."""
health_status = {
"status": "healthy",
"timestamp": datetime.utcnow().isoformat(),
Expand All @@ -49,20 +45,19 @@ async def health_check():
"status": "healthy",
"message": "connected"
}
else:
health_status.update({
"status": "unhealthy",
"components": {
"database": {
"status": "unhealthy",
"message": "failed to connect"
}
}
})
return Response(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
content=json.dumps(health_status),
media_type="application/json"
)
return health_status

return health_status
health_status.update({
"status": "unhealthy",
"components": {
"database": {
"status": "unhealthy",
"message": "failed to connect"
}
}
})
return Response(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
content=json.dumps(health_status),
media_type="application/json"
)
9 changes: 5 additions & 4 deletions infra/modules/consent-api/cloud_run.tf
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ resource "google_cloud_run_service" "this" {
"autoscaling.knative.dev/maxScale" = tostring(var.max_instances)
"autoscaling.knative.dev/minScale" = tostring(var.min_instances)
"run.googleapis.com/cloudsql-instances" = google_sql_database_instance.db_instance.connection_name
"run.googleapis.com/startup-probe-period" = "5" # Check every 5 seconds
"run.googleapis.com/startup-probe-delay" = "2" # Wait 2 seconds before first check
"run.googleapis.com/startup-probe-period" = "15" # Check every 10 seconds
"run.googleapis.com/startup-probe-delay" = "5" # Wait 5 seconds before first check
"run.googleapis.com/startup-probe-failure-threshold" = "3" # Allow 3 failures
}
}
spec {
Expand Down Expand Up @@ -70,8 +71,8 @@ resource "google_cloud_run_service" "this" {
http_get {
path = "/health" # You'll need to add this endpoint
}
initial_delay_seconds = 2
period_seconds = 5
initial_delay_seconds = 5
period_seconds = 10
failure_threshold = 3
timeout_seconds = 3
}
Expand Down

0 comments on commit c58ca19

Please sign in to comment.