main

Fastapi server entrypoint.

 1"""Fastapi server entrypoint."""
 2
 3from collections.abc import AsyncIterator
 4from contextlib import asynccontextmanager
 5from importlib.metadata import version
 6
 7from fastapi import APIRouter, Depends, FastAPI
 8from internal.auth.security import log_request
 9from internal.block.management import block_management
10from internal.database.manager import database_manager
11from internal.logger.logger import logger
12from internal.settings.config import badges_config
13from internal.settings.env import host_settings
14from routers.admins import router as admin_router
15from routers.allergens import router as allergens_router
16from routers.badges import router as badges_router
17from routers.bundles import router as bundle_router
18from routers.categories import router as categories_router
19from routers.consumers import router as consumers_router
20from routers.leaderboard import router as leaderboard_router
21from routers.reports import router as reports_router
22from routers.sellers import router as sellers_router
23from routers.sessions import router as sessions_router
24from routers.users import router as users_router
25from uvicorn import run
26
27
28@asynccontextmanager
29async def lifespan(_: FastAPI) -> AsyncIterator[None]:
30    """Manages startup and shutdown."""
31    badges_config.initialise()
32    logger.info("Initialising database engine")
33    await database_manager.initialise()
34    logger.info("Database ready")
35    logger.info("Initialising block storage")
36    block_management.initialise()
37    logger.info("Block storage ready")
38
39    yield
40
41    logger.info("Cleaning database engine")
42    await database_manager.cleanup()
43
44
45app = FastAPI(
46    title=host_settings.name,
47    version=version("rescue-marketplace"),
48    root_path="/api",
49    lifespan=lifespan,
50)
51
52
53def register_routers(app: FastAPI) -> None:
54    """Registers api routers with the app."""
55    routers: list[APIRouter] = [
56        consumers_router,
57        sellers_router,
58        bundle_router,
59        sessions_router,
60        users_router,
61        allergens_router,
62        categories_router,
63        badges_router,
64        admin_router,
65        reports_router,
66        leaderboard_router,
67    ]
68    for router in routers:
69        app.include_router(router, dependencies=[Depends(log_request)])
70
71
72register_routers(app)
73
74if __name__ == "__main__":
75    run(
76        app,
77        host=host_settings.host,
78        forwarded_allow_ips=host_settings.forward_from,
79        port=host_settings.port,
80        log_level="info",
81        loop="uvloop",
82    )
@asynccontextmanager
async def lifespan(_: fastapi.applications.FastAPI) -> AsyncIterator[None]:
29@asynccontextmanager
30async def lifespan(_: FastAPI) -> AsyncIterator[None]:
31    """Manages startup and shutdown."""
32    badges_config.initialise()
33    logger.info("Initialising database engine")
34    await database_manager.initialise()
35    logger.info("Database ready")
36    logger.info("Initialising block storage")
37    block_management.initialise()
38    logger.info("Block storage ready")
39
40    yield
41
42    logger.info("Cleaning database engine")
43    await database_manager.cleanup()

Manages startup and shutdown.

app = <fastapi.applications.FastAPI object>
def register_routers(app: fastapi.applications.FastAPI) -> None:
54def register_routers(app: FastAPI) -> None:
55    """Registers api routers with the app."""
56    routers: list[APIRouter] = [
57        consumers_router,
58        sellers_router,
59        bundle_router,
60        sessions_router,
61        users_router,
62        allergens_router,
63        categories_router,
64        badges_router,
65        admin_router,
66        reports_router,
67        leaderboard_router,
68    ]
69    for router in routers:
70        app.include_router(router, dependencies=[Depends(log_request)])

Registers api routers with the app.