internal.auth.middleware

Middlewares to include in routes for auto authorisation.

  1"""Middlewares to include in routes for auto authorisation."""
  2
  3from typing import Annotated
  4
  5from fastapi import HTTPException, Security, status
  6from fastapi.security import (
  7    HTTPAuthorizationCredentials,
  8    HTTPBasic,
  9    HTTPBasicCredentials,
 10    HTTPBearer,
 11)
 12from pydantic import BaseModel
 13
 14from internal.auth.security import check_password
 15from internal.database.dependency import database_dependency
 16from internal.queries.admin import AsyncQuerier as AdminQuerier
 17from internal.queries.models import UserRole
 18from internal.queries.token import AsyncQuerier as TokenQuerier
 19from internal.queries.token import GetSessionByTokenRow
 20from internal.queries.user import AsyncQuerier as UserQuerier
 21from internal.settings.env import auth_settings
 22
 23
 24class BasicAuthResponse(BaseModel):
 25    """Response when user got authorised with email and password."""
 26
 27    user_id: int
 28    role: UserRole
 29
 30
 31async def basic_auth(
 32    conn: database_dependency, credentials: HTTPBasicCredentials = Security(HTTPBasic())
 33) -> BasicAuthResponse:
 34    """Fetches user id for endpoint with basic auth.
 35
 36    Args:
 37      conn: database connection
 38      credentials: credentials (email(username), password) passed from request header
 39
 40    Returns:
 41      user id and role if user authenticated
 42
 43    Raises:
 44        HTTPException: if user wasn't found in the database or password incorrect
 45    """
 46    user = await UserQuerier(conn).get_user_login(email=credentials.username)
 47    if not user:
 48        raise HTTPException(
 49            status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials"
 50        )
 51    if not check_password(credentials.password, user.pw_hash):
 52        raise HTTPException(
 53            status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials"
 54        )
 55
 56    if user.role == UserRole.ADMIN:
 57        admin_info = await AdminQuerier(conn).get_admin(user_id=user.user_id)
 58        if admin_info and not admin_info.active:
 59            raise HTTPException(
 60                status_code=status.HTTP_403_FORBIDDEN,
 61                detail="Admin account deactivated",
 62            )
 63
 64    return BasicAuthResponse(user_id=user.user_id, role=user.role)
 65
 66
 67async def bearer_auth(
 68    conn: database_dependency,
 69    credentials: HTTPAuthorizationCredentials = Security(HTTPBearer()),
 70) -> GetSessionByTokenRow:
 71    """Fetches user session from given token credentials.
 72
 73    Args:
 74      conn: database connectionn
 75      credentials: credentials (token) gotten from headers
 76
 77    Returns:
 78      user session with user and session information
 79
 80    Raises:
 81      HTTPException: if user wasn't found in the database
 82    """
 83    session = await TokenQuerier(conn).get_session_by_token(
 84        token=credentials.credentials
 85    )
 86    if not session:
 87        raise HTTPException(
 88            status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid session"
 89        )
 90    return session
 91
 92
 93def consumer_auth(
 94    session: GetSessionByTokenRow = Security(bearer_auth),
 95) -> GetSessionByTokenRow:
 96    """Authentisate consumer in a middleware.
 97
 98    Args:
 99      session: user session from bearer authentication
100
101    Returns:
102      user session if user was successfully authenticated
103
104    Raises:
105      HTTPException: if user is not a consumer
106    """
107    if session.role == UserRole.CONSUMER:
108        return session
109    raise HTTPException(
110        status_code=status.HTTP_403_FORBIDDEN, detail="Not authorised as consumer"
111    )
112
113
114def seller_auth(
115    session: GetSessionByTokenRow = Security(bearer_auth),
116) -> GetSessionByTokenRow:
117    """Authentisate seller in a middleware.
118
119    Args:
120      session: user session from bearer authentication
121
122    Returns:
123      user session if user was successfully authenticated
124
125    Raises:
126      HTTPException: if user is not a seller
127    """
128    if session.role == UserRole.SELLER:
129        return session
130    raise HTTPException(
131        status_code=status.HTTP_403_FORBIDDEN, detail="Not authorised as seller"
132    )
133
134
135async def admin_auth(
136    conn: database_dependency, session: GetSessionByTokenRow = Security(bearer_auth)
137) -> GetSessionByTokenRow:
138    """Authentisate admin in a middleware.
139
140    Args:
141      conn: database connection
142      session: user session from bearer authentication
143
144    Returns:
145      user session if user was successfully authenticated
146
147    Raises:
148      HTTPException: if user is not a admin
149    """
150    if session.role != UserRole.ADMIN:
151        raise HTTPException(
152            status_code=status.HTTP_403_FORBIDDEN, detail="Not authorised as admin"
153        )
154
155    admin_info = await AdminQuerier(conn).get_admin(user_id=session.user_id)
156    if not admin_info or not admin_info.active:
157        raise HTTPException(
158            status_code=status.HTTP_403_FORBIDDEN, detail="Admin account deactivated"
159        )
160
161    return session
162
163
164def root_auth(credentials: HTTPBasicCredentials = Security(HTTPBasic())) -> None:
165    """Root user authentication.
166
167    Args:
168        credentials: root user credentials
169
170    Raises:
171        HTTPException: if not root user
172    """
173    if ("" in {auth_settings.root_username, auth_settings.root_password}) or not (
174        credentials.username == auth_settings.root_username
175        and credentials.password == auth_settings.root_password
176    ):
177        raise HTTPException(
178            status.HTTP_401_UNAUTHORIZED, "Not authorised as root admin"
179        )
180
181
182BasicAuthDep = Annotated[BasicAuthResponse, Security(basic_auth)]
183BearerAuthDep = Annotated[GetSessionByTokenRow, Security(bearer_auth)]
184ConsumerAuthDep = Annotated[GetSessionByTokenRow, Security(consumer_auth)]
185SellerAuthDep = Annotated[GetSessionByTokenRow, Security(seller_auth)]
186AdminAuthDep = Annotated[GetSessionByTokenRow, Security(admin_auth)]
187RootAuthDep = Annotated[None, Security(root_auth)]
class BasicAuthResponse(pydantic.main.BaseModel):
25class BasicAuthResponse(BaseModel):
26    """Response when user got authorised with email and password."""
27
28    user_id: int
29    role: UserRole

Response when user got authorised with email and password.

user_id: int = PydanticUndefined
role: internal.queries.models.UserRole = PydanticUndefined
async def basic_auth( conn: Annotated[sqlalchemy.ext.asyncio.engine.AsyncConnection, Depends(dependency=<bound method DatabaseManager.get_connection of <internal.database.manager.DatabaseManager object>>, use_cache=True, scope=None)], credentials: fastapi.security.http.HTTPBasicCredentials = Security(dependency=<fastapi.security.http.HTTPBasic object>, use_cache=True, scope=None, scopes=None)) -> BasicAuthResponse:
32async def basic_auth(
33    conn: database_dependency, credentials: HTTPBasicCredentials = Security(HTTPBasic())
34) -> BasicAuthResponse:
35    """Fetches user id for endpoint with basic auth.
36
37    Args:
38      conn: database connection
39      credentials: credentials (email(username), password) passed from request header
40
41    Returns:
42      user id and role if user authenticated
43
44    Raises:
45        HTTPException: if user wasn't found in the database or password incorrect
46    """
47    user = await UserQuerier(conn).get_user_login(email=credentials.username)
48    if not user:
49        raise HTTPException(
50            status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials"
51        )
52    if not check_password(credentials.password, user.pw_hash):
53        raise HTTPException(
54            status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials"
55        )
56
57    if user.role == UserRole.ADMIN:
58        admin_info = await AdminQuerier(conn).get_admin(user_id=user.user_id)
59        if admin_info and not admin_info.active:
60            raise HTTPException(
61                status_code=status.HTTP_403_FORBIDDEN,
62                detail="Admin account deactivated",
63            )
64
65    return BasicAuthResponse(user_id=user.user_id, role=user.role)

Fetches user id for endpoint with basic auth.

Arguments:
  • conn: database connection
  • credentials: credentials (email(username), password) passed from request header
Returns:

user id and role if user authenticated

Raises:
  • HTTPException: if user wasn't found in the database or password incorrect
async def bearer_auth( conn: Annotated[sqlalchemy.ext.asyncio.engine.AsyncConnection, Depends(dependency=<bound method DatabaseManager.get_connection of <internal.database.manager.DatabaseManager object>>, use_cache=True, scope=None)], credentials: fastapi.security.http.HTTPAuthorizationCredentials = Security(dependency=<fastapi.security.http.HTTPBearer object>, use_cache=True, scope=None, scopes=None)) -> internal.queries.token.GetSessionByTokenRow:
68async def bearer_auth(
69    conn: database_dependency,
70    credentials: HTTPAuthorizationCredentials = Security(HTTPBearer()),
71) -> GetSessionByTokenRow:
72    """Fetches user session from given token credentials.
73
74    Args:
75      conn: database connectionn
76      credentials: credentials (token) gotten from headers
77
78    Returns:
79      user session with user and session information
80
81    Raises:
82      HTTPException: if user wasn't found in the database
83    """
84    session = await TokenQuerier(conn).get_session_by_token(
85        token=credentials.credentials
86    )
87    if not session:
88        raise HTTPException(
89            status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid session"
90        )
91    return session

Fetches user session from given token credentials.

Arguments:
  • conn: database connectionn
  • credentials: credentials (token) gotten from headers
Returns:

user session with user and session information

Raises:
  • HTTPException: if user wasn't found in the database
def consumer_auth( session: internal.queries.token.GetSessionByTokenRow = Security(dependency=<function bearer_auth>, use_cache=True, scope=None, scopes=None)) -> internal.queries.token.GetSessionByTokenRow:
 94def consumer_auth(
 95    session: GetSessionByTokenRow = Security(bearer_auth),
 96) -> GetSessionByTokenRow:
 97    """Authentisate consumer in a middleware.
 98
 99    Args:
100      session: user session from bearer authentication
101
102    Returns:
103      user session if user was successfully authenticated
104
105    Raises:
106      HTTPException: if user is not a consumer
107    """
108    if session.role == UserRole.CONSUMER:
109        return session
110    raise HTTPException(
111        status_code=status.HTTP_403_FORBIDDEN, detail="Not authorised as consumer"
112    )

Authentisate consumer in a middleware.

Arguments:
  • session: user session from bearer authentication
Returns:

user session if user was successfully authenticated

Raises:
  • HTTPException: if user is not a consumer
def seller_auth( session: internal.queries.token.GetSessionByTokenRow = Security(dependency=<function bearer_auth>, use_cache=True, scope=None, scopes=None)) -> internal.queries.token.GetSessionByTokenRow:
115def seller_auth(
116    session: GetSessionByTokenRow = Security(bearer_auth),
117) -> GetSessionByTokenRow:
118    """Authentisate seller in a middleware.
119
120    Args:
121      session: user session from bearer authentication
122
123    Returns:
124      user session if user was successfully authenticated
125
126    Raises:
127      HTTPException: if user is not a seller
128    """
129    if session.role == UserRole.SELLER:
130        return session
131    raise HTTPException(
132        status_code=status.HTTP_403_FORBIDDEN, detail="Not authorised as seller"
133    )

Authentisate seller in a middleware.

Arguments:
  • session: user session from bearer authentication
Returns:

user session if user was successfully authenticated

Raises:
  • HTTPException: if user is not a seller
async def admin_auth( conn: Annotated[sqlalchemy.ext.asyncio.engine.AsyncConnection, Depends(dependency=<bound method DatabaseManager.get_connection of <internal.database.manager.DatabaseManager object>>, use_cache=True, scope=None)], session: internal.queries.token.GetSessionByTokenRow = Security(dependency=<function bearer_auth>, use_cache=True, scope=None, scopes=None)) -> internal.queries.token.GetSessionByTokenRow:
136async def admin_auth(
137    conn: database_dependency, session: GetSessionByTokenRow = Security(bearer_auth)
138) -> GetSessionByTokenRow:
139    """Authentisate admin in a middleware.
140
141    Args:
142      conn: database connection
143      session: user session from bearer authentication
144
145    Returns:
146      user session if user was successfully authenticated
147
148    Raises:
149      HTTPException: if user is not a admin
150    """
151    if session.role != UserRole.ADMIN:
152        raise HTTPException(
153            status_code=status.HTTP_403_FORBIDDEN, detail="Not authorised as admin"
154        )
155
156    admin_info = await AdminQuerier(conn).get_admin(user_id=session.user_id)
157    if not admin_info or not admin_info.active:
158        raise HTTPException(
159            status_code=status.HTTP_403_FORBIDDEN, detail="Admin account deactivated"
160        )
161
162    return session

Authentisate admin in a middleware.

Arguments:
  • conn: database connection
  • session: user session from bearer authentication
Returns:

user session if user was successfully authenticated

Raises:
  • HTTPException: if user is not a admin
def root_auth( credentials: fastapi.security.http.HTTPBasicCredentials = Security(dependency=<fastapi.security.http.HTTPBasic object>, use_cache=True, scope=None, scopes=None)) -> None:
165def root_auth(credentials: HTTPBasicCredentials = Security(HTTPBasic())) -> None:
166    """Root user authentication.
167
168    Args:
169        credentials: root user credentials
170
171    Raises:
172        HTTPException: if not root user
173    """
174    if ("" in {auth_settings.root_username, auth_settings.root_password}) or not (
175        credentials.username == auth_settings.root_username
176        and credentials.password == auth_settings.root_password
177    ):
178        raise HTTPException(
179            status.HTTP_401_UNAUTHORIZED, "Not authorised as root admin"
180        )

Root user authentication.

Arguments:
  • credentials: root user credentials
Raises:
  • HTTPException: if not root user
BasicAuthDep = typing.Annotated[BasicAuthResponse, Security(dependency=<function basic_auth>, use_cache=True, scope=None, scopes=None)]
BearerAuthDep = typing.Annotated[internal.queries.token.GetSessionByTokenRow, Security(dependency=<function bearer_auth>, use_cache=True, scope=None, scopes=None)]
ConsumerAuthDep = typing.Annotated[internal.queries.token.GetSessionByTokenRow, Security(dependency=<function consumer_auth>, use_cache=True, scope=None, scopes=None)]
SellerAuthDep = typing.Annotated[internal.queries.token.GetSessionByTokenRow, Security(dependency=<function seller_auth>, use_cache=True, scope=None, scopes=None)]
AdminAuthDep = typing.Annotated[internal.queries.token.GetSessionByTokenRow, Security(dependency=<function admin_auth>, use_cache=True, scope=None, scopes=None)]
RootAuthDep = typing.Annotated[NoneType, Security(dependency=<function root_auth>, use_cache=True, scope=None, scopes=None)]