-
Currently the FastAPI example provided looks like this """
Example demonstrating use with the FastAPI web framework.
Requires the "postgresql" service to be running.
To install prerequisites: pip install sqlalchemy asycnpg fastapi uvicorn
To run: uvicorn asgi_fastapi:app
It should print a line on the console on a one-second interval while running a
basic web app at http://localhost:8000.
"""
from __future__ import annotations
from datetime import datetime
from fastapi import FastAPI
from fastapi.middleware import Middleware
from fastapi.requests import Request
from fastapi.responses import PlainTextResponse, Response
from sqlalchemy.ext.asyncio import create_async_engine
from starlette.types import ASGIApp, Receive, Scope, Send
from apscheduler import AsyncScheduler
from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore
from apscheduler.eventbrokers.asyncpg import AsyncpgEventBroker
from apscheduler.triggers.interval import IntervalTrigger
def tick():
print("Hello, the time is", datetime.now())
class SchedulerMiddleware:
def __init__(
self,
app: ASGIApp,
scheduler: AsyncScheduler,
) -> None:
self.app = app
self.scheduler = scheduler
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
if scope["type"] == "lifespan":
async with self.scheduler:
await self.scheduler.add_schedule(
tick, IntervalTrigger(seconds=1), id="tick"
)
await self.scheduler.start_in_background()
await self.app(scope, receive, send)
else:
await self.app(scope, receive, send)
async def root(request: Request) -> Response:
return PlainTextResponse("Hello, world!")
engine = create_async_engine("postgresql+asyncpg://postgres:secret@localhost/testdb")
data_store = SQLAlchemyDataStore(engine)
event_broker = AsyncpgEventBroker.from_async_sqla_engine(engine)
scheduler = AsyncScheduler(data_store, event_broker)
middleware = [Middleware(SchedulerMiddleware, scheduler=scheduler)]
app = FastAPI(middleware=middleware)
app.add_api_route("/", root) There is probably a good reason why from __future__ import annotations
from datetime import datetime
from apscheduler import AsyncScheduler
from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore
from apscheduler.eventbrokers.asyncpg import AsyncpgEventBroker
from apscheduler.triggers.interval import IntervalTrigger
from fastapi import FastAPI
from fastapi.requests import Request
from fastapi.responses import PlainTextResponse, Response
from sqlalchemy.ext.asyncio import create_async_engine
def tick():
print("Hello, the time is", datetime.now())
async def root(request: Request) -> Response:
return PlainTextResponse("Hello, world!")
engine = create_async_engine("postgresql+asyncpg://postgres:secret@localhost/testdb")
data_store = SQLAlchemyDataStore(engine)
event_broker = AsyncpgEventBroker.from_async_sqla_engine(engine)
scheduler = AsyncScheduler(data_store, event_broker)
app = FastAPI()
app.add_api_route("/", root)
@app.on_event("startup")
async def startup_event():
await scheduler.add_schedule(tick, IntervalTrigger(seconds=1), id="tick")
await scheduler.start_in_background()
@app.on_event("shutdown")
async def shutdown_event():
await scheduler.stop() Let me know if both solutions are valid and if one is inferior to the other. |
Beta Was this translation helpful? Give feedback.
Answered by
agronholm
Oct 10, 2023
Replies: 1 comment 4 replies
-
As the documentation states, the async scheduler must be used as a context manager to manage its life cycle and initialize the data store and event broker. |
Beta Was this translation helpful? Give feedback.
4 replies
Answer selected by
NixBiks
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
As the documentation states, the async scheduler must be used as a context manager to manage its life cycle and initialize the data store and event broker.