internal.auth.middleware
Middlewares to include in routes for auto authorisation.
1"""Middlewares to include in routes for auto authorisation.""" 2 3from typing import Annotated 4 5from fastapi import HTTPException, Security, status 6from fastapi.security import ( 7 HTTPAuthorizationCredentials, 8 HTTPBasic, 9 HTTPBasicCredentials, 10 HTTPBearer, 11) 12from pydantic import BaseModel 13 14from internal.auth.security import check_password 15from internal.database.dependency import database_dependency 16from internal.queries.admin import AsyncQuerier as AdminQuerier 17from internal.queries.models import UserRole 18from internal.queries.token import AsyncQuerier as TokenQuerier 19from internal.queries.token import GetSessionByTokenRow 20from internal.queries.user import AsyncQuerier as UserQuerier 21from internal.settings.env import auth_settings 22 23 24class BasicAuthResponse(BaseModel): 25 """Response when user got authorised with email and password.""" 26 27 user_id: int 28 role: UserRole 29 30 31async def basic_auth( 32 conn: database_dependency, credentials: HTTPBasicCredentials = Security(HTTPBasic()) 33) -> BasicAuthResponse: 34 """Fetches user id for endpoint with basic auth. 35 36 Args: 37 conn: database connection 38 credentials: credentials (email(username), password) passed from request header 39 40 Returns: 41 user id and role if user authenticated 42 43 Raises: 44 HTTPException: if user wasn't found in the database or password incorrect 45 """ 46 user = await UserQuerier(conn).get_user_login(email=credentials.username) 47 if not user: 48 raise HTTPException( 49 status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials" 50 ) 51 if not check_password(credentials.password, user.pw_hash): 52 raise HTTPException( 53 status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials" 54 ) 55 56 if user.role == UserRole.ADMIN: 57 admin_info = await AdminQuerier(conn).get_admin(user_id=user.user_id) 58 if admin_info and not admin_info.active: 59 raise HTTPException( 60 status_code=status.HTTP_403_FORBIDDEN, 61 detail="Admin account deactivated", 62 ) 63 64 return BasicAuthResponse(user_id=user.user_id, role=user.role) 65 66 67async def bearer_auth( 68 conn: database_dependency, 69 credentials: HTTPAuthorizationCredentials = Security(HTTPBearer()), 70) -> GetSessionByTokenRow: 71 """Fetches user session from given token credentials. 72 73 Args: 74 conn: database connectionn 75 credentials: credentials (token) gotten from headers 76 77 Returns: 78 user session with user and session information 79 80 Raises: 81 HTTPException: if user wasn't found in the database 82 """ 83 session = await TokenQuerier(conn).get_session_by_token( 84 token=credentials.credentials 85 ) 86 if not session: 87 raise HTTPException( 88 status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid session" 89 ) 90 return session 91 92 93def consumer_auth( 94 session: GetSessionByTokenRow = Security(bearer_auth), 95) -> GetSessionByTokenRow: 96 """Authentisate consumer in a middleware. 97 98 Args: 99 session: user session from bearer authentication 100 101 Returns: 102 user session if user was successfully authenticated 103 104 Raises: 105 HTTPException: if user is not a consumer 106 """ 107 if session.role == UserRole.CONSUMER: 108 return session 109 raise HTTPException( 110 status_code=status.HTTP_403_FORBIDDEN, detail="Not authorised as consumer" 111 ) 112 113 114def seller_auth( 115 session: GetSessionByTokenRow = Security(bearer_auth), 116) -> GetSessionByTokenRow: 117 """Authentisate seller in a middleware. 118 119 Args: 120 session: user session from bearer authentication 121 122 Returns: 123 user session if user was successfully authenticated 124 125 Raises: 126 HTTPException: if user is not a seller 127 """ 128 if session.role == UserRole.SELLER: 129 return session 130 raise HTTPException( 131 status_code=status.HTTP_403_FORBIDDEN, detail="Not authorised as seller" 132 ) 133 134 135async def admin_auth( 136 conn: database_dependency, session: GetSessionByTokenRow = Security(bearer_auth) 137) -> GetSessionByTokenRow: 138 """Authentisate admin in a middleware. 139 140 Args: 141 conn: database connection 142 session: user session from bearer authentication 143 144 Returns: 145 user session if user was successfully authenticated 146 147 Raises: 148 HTTPException: if user is not a admin 149 """ 150 if session.role != UserRole.ADMIN: 151 raise HTTPException( 152 status_code=status.HTTP_403_FORBIDDEN, detail="Not authorised as admin" 153 ) 154 155 admin_info = await AdminQuerier(conn).get_admin(user_id=session.user_id) 156 if not admin_info or not admin_info.active: 157 raise HTTPException( 158 status_code=status.HTTP_403_FORBIDDEN, detail="Admin account deactivated" 159 ) 160 161 return session 162 163 164def root_auth(credentials: HTTPBasicCredentials = Security(HTTPBasic())) -> None: 165 """Root user authentication. 166 167 Args: 168 credentials: root user credentials 169 170 Raises: 171 HTTPException: if not root user 172 """ 173 if ("" in {auth_settings.root_username, auth_settings.root_password}) or not ( 174 credentials.username == auth_settings.root_username 175 and credentials.password == auth_settings.root_password 176 ): 177 raise HTTPException( 178 status.HTTP_401_UNAUTHORIZED, "Not authorised as root admin" 179 ) 180 181 182BasicAuthDep = Annotated[BasicAuthResponse, Security(basic_auth)] 183BearerAuthDep = Annotated[GetSessionByTokenRow, Security(bearer_auth)] 184ConsumerAuthDep = Annotated[GetSessionByTokenRow, Security(consumer_auth)] 185SellerAuthDep = Annotated[GetSessionByTokenRow, Security(seller_auth)] 186AdminAuthDep = Annotated[GetSessionByTokenRow, Security(admin_auth)] 187RootAuthDep = Annotated[None, Security(root_auth)]
class
BasicAuthResponse(pydantic.main.BaseModel):
25class BasicAuthResponse(BaseModel): 26 """Response when user got authorised with email and password.""" 27 28 user_id: int 29 role: UserRole
Response when user got authorised with email and password.
async def
basic_auth( conn: Annotated[sqlalchemy.ext.asyncio.engine.AsyncConnection, Depends(dependency=<bound method DatabaseManager.get_connection of <internal.database.manager.DatabaseManager object>>, use_cache=True, scope=None)], credentials: fastapi.security.http.HTTPBasicCredentials = Security(dependency=<fastapi.security.http.HTTPBasic object>, use_cache=True, scope=None, scopes=None)) -> BasicAuthResponse:
32async def basic_auth( 33 conn: database_dependency, credentials: HTTPBasicCredentials = Security(HTTPBasic()) 34) -> BasicAuthResponse: 35 """Fetches user id for endpoint with basic auth. 36 37 Args: 38 conn: database connection 39 credentials: credentials (email(username), password) passed from request header 40 41 Returns: 42 user id and role if user authenticated 43 44 Raises: 45 HTTPException: if user wasn't found in the database or password incorrect 46 """ 47 user = await UserQuerier(conn).get_user_login(email=credentials.username) 48 if not user: 49 raise HTTPException( 50 status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials" 51 ) 52 if not check_password(credentials.password, user.pw_hash): 53 raise HTTPException( 54 status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials" 55 ) 56 57 if user.role == UserRole.ADMIN: 58 admin_info = await AdminQuerier(conn).get_admin(user_id=user.user_id) 59 if admin_info and not admin_info.active: 60 raise HTTPException( 61 status_code=status.HTTP_403_FORBIDDEN, 62 detail="Admin account deactivated", 63 ) 64 65 return BasicAuthResponse(user_id=user.user_id, role=user.role)
Fetches user id for endpoint with basic auth.
Arguments:
- conn: database connection
- credentials: credentials (email(username), password) passed from request header
Returns:
user id and role if user authenticated
Raises:
- HTTPException: if user wasn't found in the database or password incorrect
async def
bearer_auth( conn: Annotated[sqlalchemy.ext.asyncio.engine.AsyncConnection, Depends(dependency=<bound method DatabaseManager.get_connection of <internal.database.manager.DatabaseManager object>>, use_cache=True, scope=None)], credentials: fastapi.security.http.HTTPAuthorizationCredentials = Security(dependency=<fastapi.security.http.HTTPBearer object>, use_cache=True, scope=None, scopes=None)) -> internal.queries.token.GetSessionByTokenRow:
68async def bearer_auth( 69 conn: database_dependency, 70 credentials: HTTPAuthorizationCredentials = Security(HTTPBearer()), 71) -> GetSessionByTokenRow: 72 """Fetches user session from given token credentials. 73 74 Args: 75 conn: database connectionn 76 credentials: credentials (token) gotten from headers 77 78 Returns: 79 user session with user and session information 80 81 Raises: 82 HTTPException: if user wasn't found in the database 83 """ 84 session = await TokenQuerier(conn).get_session_by_token( 85 token=credentials.credentials 86 ) 87 if not session: 88 raise HTTPException( 89 status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid session" 90 ) 91 return session
Fetches user session from given token credentials.
Arguments:
- conn: database connectionn
- credentials: credentials (token) gotten from headers
Returns:
user session with user and session information
Raises:
- HTTPException: if user wasn't found in the database
def
consumer_auth( session: internal.queries.token.GetSessionByTokenRow = Security(dependency=<function bearer_auth>, use_cache=True, scope=None, scopes=None)) -> internal.queries.token.GetSessionByTokenRow:
94def consumer_auth( 95 session: GetSessionByTokenRow = Security(bearer_auth), 96) -> GetSessionByTokenRow: 97 """Authentisate consumer in a middleware. 98 99 Args: 100 session: user session from bearer authentication 101 102 Returns: 103 user session if user was successfully authenticated 104 105 Raises: 106 HTTPException: if user is not a consumer 107 """ 108 if session.role == UserRole.CONSUMER: 109 return session 110 raise HTTPException( 111 status_code=status.HTTP_403_FORBIDDEN, detail="Not authorised as consumer" 112 )
Authentisate consumer in a middleware.
Arguments:
- session: user session from bearer authentication
Returns:
user session if user was successfully authenticated
Raises:
- HTTPException: if user is not a consumer
def
seller_auth( session: internal.queries.token.GetSessionByTokenRow = Security(dependency=<function bearer_auth>, use_cache=True, scope=None, scopes=None)) -> internal.queries.token.GetSessionByTokenRow:
115def seller_auth( 116 session: GetSessionByTokenRow = Security(bearer_auth), 117) -> GetSessionByTokenRow: 118 """Authentisate seller in a middleware. 119 120 Args: 121 session: user session from bearer authentication 122 123 Returns: 124 user session if user was successfully authenticated 125 126 Raises: 127 HTTPException: if user is not a seller 128 """ 129 if session.role == UserRole.SELLER: 130 return session 131 raise HTTPException( 132 status_code=status.HTTP_403_FORBIDDEN, detail="Not authorised as seller" 133 )
Authentisate seller in a middleware.
Arguments:
- session: user session from bearer authentication
Returns:
user session if user was successfully authenticated
Raises:
- HTTPException: if user is not a seller
async def
admin_auth( conn: Annotated[sqlalchemy.ext.asyncio.engine.AsyncConnection, Depends(dependency=<bound method DatabaseManager.get_connection of <internal.database.manager.DatabaseManager object>>, use_cache=True, scope=None)], session: internal.queries.token.GetSessionByTokenRow = Security(dependency=<function bearer_auth>, use_cache=True, scope=None, scopes=None)) -> internal.queries.token.GetSessionByTokenRow:
136async def admin_auth( 137 conn: database_dependency, session: GetSessionByTokenRow = Security(bearer_auth) 138) -> GetSessionByTokenRow: 139 """Authentisate admin in a middleware. 140 141 Args: 142 conn: database connection 143 session: user session from bearer authentication 144 145 Returns: 146 user session if user was successfully authenticated 147 148 Raises: 149 HTTPException: if user is not a admin 150 """ 151 if session.role != UserRole.ADMIN: 152 raise HTTPException( 153 status_code=status.HTTP_403_FORBIDDEN, detail="Not authorised as admin" 154 ) 155 156 admin_info = await AdminQuerier(conn).get_admin(user_id=session.user_id) 157 if not admin_info or not admin_info.active: 158 raise HTTPException( 159 status_code=status.HTTP_403_FORBIDDEN, detail="Admin account deactivated" 160 ) 161 162 return session
Authentisate admin in a middleware.
Arguments:
- conn: database connection
- session: user session from bearer authentication
Returns:
user session if user was successfully authenticated
Raises:
- HTTPException: if user is not a admin
def
root_auth( credentials: fastapi.security.http.HTTPBasicCredentials = Security(dependency=<fastapi.security.http.HTTPBasic object>, use_cache=True, scope=None, scopes=None)) -> None:
165def root_auth(credentials: HTTPBasicCredentials = Security(HTTPBasic())) -> None: 166 """Root user authentication. 167 168 Args: 169 credentials: root user credentials 170 171 Raises: 172 HTTPException: if not root user 173 """ 174 if ("" in {auth_settings.root_username, auth_settings.root_password}) or not ( 175 credentials.username == auth_settings.root_username 176 and credentials.password == auth_settings.root_password 177 ): 178 raise HTTPException( 179 status.HTTP_401_UNAUTHORIZED, "Not authorised as root admin" 180 )
Root user authentication.
Arguments:
- credentials: root user credentials
Raises:
- HTTPException: if not root user
BasicAuthDep =
typing.Annotated[BasicAuthResponse, Security(dependency=<function basic_auth>, use_cache=True, scope=None, scopes=None)]
BearerAuthDep =
typing.Annotated[internal.queries.token.GetSessionByTokenRow, Security(dependency=<function bearer_auth>, use_cache=True, scope=None, scopes=None)]
ConsumerAuthDep =
typing.Annotated[internal.queries.token.GetSessionByTokenRow, Security(dependency=<function consumer_auth>, use_cache=True, scope=None, scopes=None)]
SellerAuthDep =
typing.Annotated[internal.queries.token.GetSessionByTokenRow, Security(dependency=<function seller_auth>, use_cache=True, scope=None, scopes=None)]
AdminAuthDep =
typing.Annotated[internal.queries.token.GetSessionByTokenRow, Security(dependency=<function admin_auth>, use_cache=True, scope=None, scopes=None)]
RootAuthDep =
typing.Annotated[NoneType, Security(dependency=<function root_auth>, use_cache=True, scope=None, scopes=None)]