internal.database.manager

Manages database connection for the entire server.

 1"""Manages database connection for the entire server."""
 2
 3from collections.abc import AsyncGenerator
 4
 5from fastapi import HTTPException, status
 6from sqlalchemy import text
 7from sqlalchemy.exc import IntegrityError, OperationalError, SQLAlchemyError
 8from sqlalchemy.ext.asyncio import AsyncConnection, AsyncEngine, create_async_engine
 9
10from internal.logger.logger import logger
11from internal.settings.env import database_settings
12
13
14class DatabaseManager:
15    """Starts, keeps and serves connections."""
16
17    engine: AsyncEngine
18
19    async def initialise(self) -> None:
20        """Starts and checks connection pool.
21
22        Raises:
23          Exception: if failed to connect to the database
24        """
25        credentials: str = f"{database_settings.username}:{database_settings.password}"
26        full_host: str = f"{database_settings.host}:{database_settings.port}"
27        self.engine = create_async_engine(
28            f"postgresql+psycopg://{credentials}@{full_host}/"
29            f"{database_settings.database}",
30            pool_size=database_settings.pool_size,
31            max_overflow=database_settings.max_overflow,
32            pool_pre_ping=True,
33            pool_recycle=2600,
34        )
35
36        try:
37            async with self.engine.connect() as conn:
38                await conn.execute(text("SELECT 1"))
39        except SQLAlchemyError as err:
40            raise Exception(f"Failed to initiate connection with database: {err}")
41
42    async def cleanup(self) -> None:
43        """Closes database connection."""
44        if self.engine:
45            await self.engine.dispose()
46
47    async def get_connection(self) -> AsyncGenerator[AsyncConnection]:
48        """Gets connection session and returns it for dependency use.
49
50        Yields:
51          connection session with auto close and auto commit
52
53        Raises:
54          HTTPException: user facing error if failed to process database request
55        """
56        try:
57            async with self.engine.begin() as conn:
58                yield conn
59        except OperationalError:
60            raise HTTPException(
61                status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
62                detail="Service unavailable",
63            )
64        except IntegrityError:
65            raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Conflict")
66        except ValueError as err:
67            logger.error(f"Validation Error: {err}")
68            raise HTTPException(
69                status_code=status.HTTP_400_BAD_REQUEST, detail="Validation error"
70            )
71
72
73database_manager = DatabaseManager()
class DatabaseManager:
15class DatabaseManager:
16    """Starts, keeps and serves connections."""
17
18    engine: AsyncEngine
19
20    async def initialise(self) -> None:
21        """Starts and checks connection pool.
22
23        Raises:
24          Exception: if failed to connect to the database
25        """
26        credentials: str = f"{database_settings.username}:{database_settings.password}"
27        full_host: str = f"{database_settings.host}:{database_settings.port}"
28        self.engine = create_async_engine(
29            f"postgresql+psycopg://{credentials}@{full_host}/"
30            f"{database_settings.database}",
31            pool_size=database_settings.pool_size,
32            max_overflow=database_settings.max_overflow,
33            pool_pre_ping=True,
34            pool_recycle=2600,
35        )
36
37        try:
38            async with self.engine.connect() as conn:
39                await conn.execute(text("SELECT 1"))
40        except SQLAlchemyError as err:
41            raise Exception(f"Failed to initiate connection with database: {err}")
42
43    async def cleanup(self) -> None:
44        """Closes database connection."""
45        if self.engine:
46            await self.engine.dispose()
47
48    async def get_connection(self) -> AsyncGenerator[AsyncConnection]:
49        """Gets connection session and returns it for dependency use.
50
51        Yields:
52          connection session with auto close and auto commit
53
54        Raises:
55          HTTPException: user facing error if failed to process database request
56        """
57        try:
58            async with self.engine.begin() as conn:
59                yield conn
60        except OperationalError:
61            raise HTTPException(
62                status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
63                detail="Service unavailable",
64            )
65        except IntegrityError:
66            raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Conflict")
67        except ValueError as err:
68            logger.error(f"Validation Error: {err}")
69            raise HTTPException(
70                status_code=status.HTTP_400_BAD_REQUEST, detail="Validation error"
71            )

Starts, keeps and serves connections.

engine: sqlalchemy.ext.asyncio.engine.AsyncEngine
async def initialise(self) -> None:
20    async def initialise(self) -> None:
21        """Starts and checks connection pool.
22
23        Raises:
24          Exception: if failed to connect to the database
25        """
26        credentials: str = f"{database_settings.username}:{database_settings.password}"
27        full_host: str = f"{database_settings.host}:{database_settings.port}"
28        self.engine = create_async_engine(
29            f"postgresql+psycopg://{credentials}@{full_host}/"
30            f"{database_settings.database}",
31            pool_size=database_settings.pool_size,
32            max_overflow=database_settings.max_overflow,
33            pool_pre_ping=True,
34            pool_recycle=2600,
35        )
36
37        try:
38            async with self.engine.connect() as conn:
39                await conn.execute(text("SELECT 1"))
40        except SQLAlchemyError as err:
41            raise Exception(f"Failed to initiate connection with database: {err}")

Starts and checks connection pool.

Raises:
  • Exception: if failed to connect to the database
async def cleanup(self) -> None:
43    async def cleanup(self) -> None:
44        """Closes database connection."""
45        if self.engine:
46            await self.engine.dispose()

Closes database connection.

async def get_connection(self) -> AsyncGenerator[sqlalchemy.ext.asyncio.engine.AsyncConnection]:
48    async def get_connection(self) -> AsyncGenerator[AsyncConnection]:
49        """Gets connection session and returns it for dependency use.
50
51        Yields:
52          connection session with auto close and auto commit
53
54        Raises:
55          HTTPException: user facing error if failed to process database request
56        """
57        try:
58            async with self.engine.begin() as conn:
59                yield conn
60        except OperationalError:
61            raise HTTPException(
62                status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
63                detail="Service unavailable",
64            )
65        except IntegrityError:
66            raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Conflict")
67        except ValueError as err:
68            logger.error(f"Validation Error: {err}")
69            raise HTTPException(
70                status_code=status.HTTP_400_BAD_REQUEST, detail="Validation error"
71            )

Gets connection session and returns it for dependency use.

Yields:

connection session with auto close and auto commit

Raises:
  • HTTPException: user facing error if failed to process database request
database_manager = <DatabaseManager object>