internal.database.manager
Manages database connection for the entire server.
1"""Manages database connection for the entire server.""" 2 3from collections.abc import AsyncGenerator 4 5from fastapi import HTTPException, status 6from sqlalchemy import text 7from sqlalchemy.exc import IntegrityError, OperationalError, SQLAlchemyError 8from sqlalchemy.ext.asyncio import AsyncConnection, AsyncEngine, create_async_engine 9 10from internal.logger.logger import logger 11from internal.settings.env import database_settings 12 13 14class DatabaseManager: 15 """Starts, keeps and serves connections.""" 16 17 engine: AsyncEngine 18 19 async def initialise(self) -> None: 20 """Starts and checks connection pool. 21 22 Raises: 23 Exception: if failed to connect to the database 24 """ 25 credentials: str = f"{database_settings.username}:{database_settings.password}" 26 full_host: str = f"{database_settings.host}:{database_settings.port}" 27 self.engine = create_async_engine( 28 f"postgresql+psycopg://{credentials}@{full_host}/" 29 f"{database_settings.database}", 30 pool_size=database_settings.pool_size, 31 max_overflow=database_settings.max_overflow, 32 pool_pre_ping=True, 33 pool_recycle=2600, 34 ) 35 36 try: 37 async with self.engine.connect() as conn: 38 await conn.execute(text("SELECT 1")) 39 except SQLAlchemyError as err: 40 raise Exception(f"Failed to initiate connection with database: {err}") 41 42 async def cleanup(self) -> None: 43 """Closes database connection.""" 44 if self.engine: 45 await self.engine.dispose() 46 47 async def get_connection(self) -> AsyncGenerator[AsyncConnection]: 48 """Gets connection session and returns it for dependency use. 49 50 Yields: 51 connection session with auto close and auto commit 52 53 Raises: 54 HTTPException: user facing error if failed to process database request 55 """ 56 try: 57 async with self.engine.begin() as conn: 58 yield conn 59 except OperationalError: 60 raise HTTPException( 61 status_code=status.HTTP_503_SERVICE_UNAVAILABLE, 62 detail="Service unavailable", 63 ) 64 except IntegrityError: 65 raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Conflict") 66 except ValueError as err: 67 logger.error(f"Validation Error: {err}") 68 raise HTTPException( 69 status_code=status.HTTP_400_BAD_REQUEST, detail="Validation error" 70 ) 71 72 73database_manager = DatabaseManager()
class
DatabaseManager:
15class DatabaseManager: 16 """Starts, keeps and serves connections.""" 17 18 engine: AsyncEngine 19 20 async def initialise(self) -> None: 21 """Starts and checks connection pool. 22 23 Raises: 24 Exception: if failed to connect to the database 25 """ 26 credentials: str = f"{database_settings.username}:{database_settings.password}" 27 full_host: str = f"{database_settings.host}:{database_settings.port}" 28 self.engine = create_async_engine( 29 f"postgresql+psycopg://{credentials}@{full_host}/" 30 f"{database_settings.database}", 31 pool_size=database_settings.pool_size, 32 max_overflow=database_settings.max_overflow, 33 pool_pre_ping=True, 34 pool_recycle=2600, 35 ) 36 37 try: 38 async with self.engine.connect() as conn: 39 await conn.execute(text("SELECT 1")) 40 except SQLAlchemyError as err: 41 raise Exception(f"Failed to initiate connection with database: {err}") 42 43 async def cleanup(self) -> None: 44 """Closes database connection.""" 45 if self.engine: 46 await self.engine.dispose() 47 48 async def get_connection(self) -> AsyncGenerator[AsyncConnection]: 49 """Gets connection session and returns it for dependency use. 50 51 Yields: 52 connection session with auto close and auto commit 53 54 Raises: 55 HTTPException: user facing error if failed to process database request 56 """ 57 try: 58 async with self.engine.begin() as conn: 59 yield conn 60 except OperationalError: 61 raise HTTPException( 62 status_code=status.HTTP_503_SERVICE_UNAVAILABLE, 63 detail="Service unavailable", 64 ) 65 except IntegrityError: 66 raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Conflict") 67 except ValueError as err: 68 logger.error(f"Validation Error: {err}") 69 raise HTTPException( 70 status_code=status.HTTP_400_BAD_REQUEST, detail="Validation error" 71 )
Starts, keeps and serves connections.
async def
initialise(self) -> None:
20 async def initialise(self) -> None: 21 """Starts and checks connection pool. 22 23 Raises: 24 Exception: if failed to connect to the database 25 """ 26 credentials: str = f"{database_settings.username}:{database_settings.password}" 27 full_host: str = f"{database_settings.host}:{database_settings.port}" 28 self.engine = create_async_engine( 29 f"postgresql+psycopg://{credentials}@{full_host}/" 30 f"{database_settings.database}", 31 pool_size=database_settings.pool_size, 32 max_overflow=database_settings.max_overflow, 33 pool_pre_ping=True, 34 pool_recycle=2600, 35 ) 36 37 try: 38 async with self.engine.connect() as conn: 39 await conn.execute(text("SELECT 1")) 40 except SQLAlchemyError as err: 41 raise Exception(f"Failed to initiate connection with database: {err}")
Starts and checks connection pool.
Raises:
- Exception: if failed to connect to the database
async def
cleanup(self) -> None:
43 async def cleanup(self) -> None: 44 """Closes database connection.""" 45 if self.engine: 46 await self.engine.dispose()
Closes database connection.
async def
get_connection(self) -> AsyncGenerator[sqlalchemy.ext.asyncio.engine.AsyncConnection]:
48 async def get_connection(self) -> AsyncGenerator[AsyncConnection]: 49 """Gets connection session and returns it for dependency use. 50 51 Yields: 52 connection session with auto close and auto commit 53 54 Raises: 55 HTTPException: user facing error if failed to process database request 56 """ 57 try: 58 async with self.engine.begin() as conn: 59 yield conn 60 except OperationalError: 61 raise HTTPException( 62 status_code=status.HTTP_503_SERVICE_UNAVAILABLE, 63 detail="Service unavailable", 64 ) 65 except IntegrityError: 66 raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Conflict") 67 except ValueError as err: 68 logger.error(f"Validation Error: {err}") 69 raise HTTPException( 70 status_code=status.HTTP_400_BAD_REQUEST, detail="Validation error" 71 )
Gets connection session and returns it for dependency use.
Yields:
connection session with auto close and auto commit
Raises:
- HTTPException: user facing error if failed to process database request
database_manager =
<DatabaseManager object>