main
Fastapi server entrypoint.
1"""Fastapi server entrypoint.""" 2 3from collections.abc import AsyncIterator 4from contextlib import asynccontextmanager 5from importlib.metadata import version 6 7from fastapi import APIRouter, Depends, FastAPI 8from internal.auth.security import log_request 9from internal.block.management import block_management 10from internal.database.manager import database_manager 11from internal.logger.logger import logger 12from internal.settings.config import badges_config 13from internal.settings.env import host_settings 14from routers.admins import router as admin_router 15from routers.allergens import router as allergens_router 16from routers.badges import router as badges_router 17from routers.bundles import router as bundle_router 18from routers.categories import router as categories_router 19from routers.consumers import router as consumers_router 20from routers.leaderboard import router as leaderboard_router 21from routers.reports import router as reports_router 22from routers.sellers import router as sellers_router 23from routers.sessions import router as sessions_router 24from routers.users import router as users_router 25from uvicorn import run 26 27 28@asynccontextmanager 29async def lifespan(_: FastAPI) -> AsyncIterator[None]: 30 """Manages startup and shutdown.""" 31 badges_config.initialise() 32 logger.info("Initialising database engine") 33 await database_manager.initialise() 34 logger.info("Database ready") 35 logger.info("Initialising block storage") 36 block_management.initialise() 37 logger.info("Block storage ready") 38 39 yield 40 41 logger.info("Cleaning database engine") 42 await database_manager.cleanup() 43 44 45app = FastAPI( 46 title=host_settings.name, 47 version=version("rescue-marketplace"), 48 root_path="/api", 49 lifespan=lifespan, 50) 51 52 53def register_routers(app: FastAPI) -> None: 54 """Registers api routers with the app.""" 55 routers: list[APIRouter] = [ 56 consumers_router, 57 sellers_router, 58 bundle_router, 59 sessions_router, 60 users_router, 61 allergens_router, 62 categories_router, 63 badges_router, 64 admin_router, 65 reports_router, 66 leaderboard_router, 67 ] 68 for router in routers: 69 app.include_router(router, dependencies=[Depends(log_request)]) 70 71 72register_routers(app) 73 74if __name__ == "__main__": 75 run( 76 app, 77 host=host_settings.host, 78 forwarded_allow_ips=host_settings.forward_from, 79 port=host_settings.port, 80 log_level="info", 81 loop="uvloop", 82 )
@asynccontextmanager
async def
lifespan(_: fastapi.applications.FastAPI) -> AsyncIterator[None]:
29@asynccontextmanager 30async def lifespan(_: FastAPI) -> AsyncIterator[None]: 31 """Manages startup and shutdown.""" 32 badges_config.initialise() 33 logger.info("Initialising database engine") 34 await database_manager.initialise() 35 logger.info("Database ready") 36 logger.info("Initialising block storage") 37 block_management.initialise() 38 logger.info("Block storage ready") 39 40 yield 41 42 logger.info("Cleaning database engine") 43 await database_manager.cleanup()
Manages startup and shutdown.
app =
<fastapi.applications.FastAPI object>
def
register_routers(app: fastapi.applications.FastAPI) -> None:
54def register_routers(app: FastAPI) -> None: 55 """Registers api routers with the app.""" 56 routers: list[APIRouter] = [ 57 consumers_router, 58 sellers_router, 59 bundle_router, 60 sessions_router, 61 users_router, 62 allergens_router, 63 categories_router, 64 badges_router, 65 admin_router, 66 reports_router, 67 leaderboard_router, 68 ] 69 for router in routers: 70 app.include_router(router, dependencies=[Depends(log_request)])
Registers api routers with the app.