routers.consumers

Endpoint for consumers.

--- config: mirrorActors: false --- sequenceDiagram title Consumer Registration actor user box ./routers participant consumers.py@{ "type" : "boundary" } end box ./internal/database participant dd as database.py end box ./internal/auth participant creation.py participant security.py end box ./internal/queries participant user.py participant cq as consumer.py end participant database@{ "type" : "database" } user->>consumer.py: register consumer activate consumers.py dd->>consumers.py: yield connection activate dd consumers.py->>creation.py: await create_consumer() activate creation.py creation.py->>creation.py: create_user() creation.py->>security.py: hash_password() activate security.py security.py-->>creation.py: password hash deactivate security.py creation.py->>user.py: Queries.create_user() activate user.py user.py->>database: insert user activate database database-->>user.py: created user deactivate database user.py-->>creation.py: created user deactivate user.py creation.py-->>creation.py: created user creation.py->>cq: Queries.create_consumer() activate cq cq->>database: insert consumer activate database database-->>cq: created consumer deactivate database cq-->>creation.py: created consumer deactivate cq creation.py-->>consumers.py: created consumer deactivate creation.py consumers.py-->>user: 201 OK consumers.py-->>dd: return connection deactivate dd deactivate consumers.py
  1"""Endpoint for consumers.
  2
  3```mermaid
  4---
  5config:
  6  mirrorActors: false
  7---
  8sequenceDiagram
  9    title Consumer Registration
 10    actor user
 11    box ./routers
 12    participant consumers.py@{ "type" : "boundary" }
 13    end
 14    box ./internal/database
 15    participant dd as database.py
 16    end
 17    box ./internal/auth
 18    participant creation.py
 19    participant security.py
 20    end
 21    box ./internal/queries
 22    participant user.py
 23    participant cq as consumer.py
 24    end
 25    participant database@{ "type" : "database" }
 26
 27    user->>consumer.py: register consumer
 28    activate consumers.py
 29    dd->>consumers.py: yield connection
 30    activate dd
 31    consumers.py->>creation.py: await create_consumer()
 32    activate creation.py
 33    creation.py->>creation.py: create_user()
 34    creation.py->>security.py: hash_password()
 35    activate security.py
 36    security.py-->>creation.py: password hash
 37    deactivate security.py
 38    creation.py->>user.py: Queries.create_user()
 39    activate user.py
 40    user.py->>database: insert user
 41    activate database
 42    database-->>user.py: created user
 43    deactivate database
 44    user.py-->>creation.py: created user
 45    deactivate user.py
 46    creation.py-->>creation.py: created user
 47    creation.py->>cq: Queries.create_consumer()
 48    activate cq
 49    cq->>database: insert consumer
 50    activate database
 51    database-->>cq: created consumer
 52    deactivate database
 53    cq-->>creation.py: created consumer
 54    deactivate cq
 55    creation.py-->>consumers.py: created consumer
 56    deactivate creation.py
 57    consumers.py-->>user: 201 OK
 58    consumers.py-->>dd: return connection
 59    deactivate dd
 60    deactivate consumers.py
 61```
 62"""
 63
 64from datetime import UTC, datetime, timedelta
 65
 66from fastapi import APIRouter, HTTPException, status
 67from internal.auth.creation import CreateConsumerForm, create_consumer
 68from internal.auth.middleware import ConsumerAuthDep
 69from internal.database.dependency import database_dependency
 70from internal.queries.badge import AsyncQuerier as BadgeQuerier
 71from internal.queries.badge import GetConsumerBadgesRow
 72from internal.queries.consumer import AsyncQuerier as ConsumerQuerier
 73from internal.queries.consumer import (
 74    GetConsumerRow,
 75    GetConsumersRow,
 76    UpdateConsumerParams,
 77)
 78from internal.queries.models import Reservation
 79from internal.queries.reservations import AsyncQuerier as ReservationsQuerier
 80from internal.queries.reservations import GetConsumersReservationsFullRow
 81from pydantic import BaseModel
 82
 83router = APIRouter(prefix="/consumers", tags=["consumers"])
 84
 85
 86@router.get(
 87    "",
 88    status_code=status.HTTP_200_OK,
 89    summary="Get all consumers",
 90    description="Retrieves a list of all registered consumers.",
 91)
 92async def get_consumers(conn: database_dependency) -> list[GetConsumersRow]:
 93    """Get all consumers.
 94
 95    Args:
 96      conn: database connection
 97
 98    Returns:
 99      list of all consumers
100    """
101    return [c async for c in ConsumerQuerier(conn).get_consumers()]
102
103
104@router.get(
105    "/me",
106    status_code=status.HTTP_200_OK,
107    summary="Get authenticated consumer",
108    description="Retrieves the profile of the authenticated consumer.",
109)
110async def get_consumer_me(
111    conn: database_dependency, consumer: ConsumerAuthDep
112) -> GetConsumerRow:
113    """Get authenticated consumer profile.
114
115    Args:
116      conn: database connection
117      consumer: consumers session
118
119    Returns:
120      consumer profile
121
122    Raises:
123      HTTPException: if consumer not found
124    """
125    consumer_profile = await ConsumerQuerier(conn).get_consumer(
126        user_id=consumer.user_id
127    )
128    if not consumer_profile:
129        raise HTTPException(
130            status_code=status.HTTP_404_NOT_FOUND, detail="Consumer profile not found"
131        )
132    return consumer_profile
133
134
135@router.get(
136    "/{consumer_id}",
137    status_code=status.HTTP_200_OK,
138    summary="Get consumer by ID",
139    description="Retrieves the profile of a consumer by their unique ID.",
140)
141async def get_consumer_by_id(
142    consumer_id: int, conn: database_dependency
143) -> GetConsumerRow:
144    """Get consumer profile by ID.
145
146    Args:
147      consumer_id: unique identifier of the consumer
148      conn: database connection
149
150    Returns:
151      consumer profile
152
153    Raises:
154      HTTPException: if consumer not found
155    """
156    consumer_profile = await ConsumerQuerier(conn).get_consumer(user_id=consumer_id)
157    if not consumer_profile:
158        raise HTTPException(
159            status_code=status.HTTP_404_NOT_FOUND, detail="Consumer not found"
160        )
161    return consumer_profile
162
163
164@router.post(
165    "",
166    status_code=status.HTTP_201_CREATED,
167    summary="Register consumer",
168    description="Registers a new consumer and their corresponding user entity.",
169)
170async def register_consumer(
171    form: CreateConsumerForm, conn: database_dependency
172) -> None:
173    """Register consumer and corresponding user.
174
175    Args:
176      form: signup information for the user
177      conn: database connection
178    """
179    _ = await create_consumer(form, conn)
180
181
182@router.get(
183    "/me/reservations",
184    tags=["reservations"],
185    status_code=status.HTTP_200_OK,
186    summary="Get consumer reservations",
187    description="Retrieves all reservations made by the authenticated consumer.",
188)
189async def get_reservations(
190    conn: database_dependency, consumer: ConsumerAuthDep
191) -> list[Reservation]:
192    """Get consumers reservations.
193
194    Args:
195      conn: database connection
196      consumer: consumer session
197
198    Returns:
199        list of consumer reservations
200
201    Raises:
202        HTTPException: if failed to get reservations
203    """
204    reservations = [
205        item
206        async for item in ReservationsQuerier(conn).get_consumers_reservations(
207            consumer_id=consumer.user_id
208        )
209    ]
210    if reservations is None:
211        raise HTTPException(
212            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
213            detail="Failed to get reservations",
214        )
215    return list(reservations)
216
217
218@router.get(
219    "/me/rescued",
220    tags=["reservations"],
221    status_code=status.HTTP_200_OK,
222    summary="Get number of consumer rescued reservations",
223    description="Retrieves number of rescued (collected) reservations.",
224)
225async def get_rescued(conn: database_dependency, consumer: ConsumerAuthDep) -> int:
226    """Get count of rescued (collected) reservations for the authenticated consumer.
227
228    Args:
229      conn: database connection
230      consumer: consumer session
231
232    Returns:
233        number of rescued (collected) reservations
234    """
235    result = await ReservationsQuerier(conn).count_consumer_collected_reservations(
236        consumer_id=consumer.user_id
237    )
238    return result.collected_count if result else 0
239
240
241class UpdateConsumerForm(BaseModel):
242    """Consumer name update form."""
243
244    first_name: str
245    last_name: str
246
247
248@router.patch(
249    "/me",
250    status_code=status.HTTP_200_OK,
251    summary="Update consumer profile",
252    description=(
253        "Updates the profile information (first and last name) "
254        "for the authenticated consumer."
255    ),
256)
257async def update_consumer(
258    form: UpdateConsumerForm, conn: database_dependency, consumer: ConsumerAuthDep
259) -> None:
260    """Consumer name update.
261
262    Args:
263        form: consumer update form
264        conn: database connection
265        consumer: consumer session
266
267    Raises:
268        HTTPException: if failed to update consumer
269    """
270    updated_consumer = await ConsumerQuerier(conn).update_consumer(
271        UpdateConsumerParams(
272            user_id=consumer.user_id, fname=form.first_name, lname=form.last_name
273        )
274    )
275    if not updated_consumer:
276        raise HTTPException(
277            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
278            detail="Failed to update consumer",
279        )
280
281
282@router.get(
283    "/me/badges",
284    status_code=status.HTTP_200_OK,
285    summary="Consumer badges",
286    description="Get all acquired badges by consumer",
287    tags=["badges"],
288)
289async def get_consumer_badges(
290    conn: database_dependency, consumer: ConsumerAuthDep
291) -> list[GetConsumerBadgesRow]:
292    """Get badges acquired by consumer.
293
294    Args:
295      conn: database connection
296      consumer: consumer session
297
298    Returns:
299      list of acquired badges
300    """
301    return [
302        badge
303        async for badge in BadgeQuerier(conn).get_consumer_badges(
304            user_id=consumer.user_id
305        )
306    ]
307
308
309@router.get(
310    "/me/streaks",
311    status_code=status.HTTP_200_OK,
312    summary="Consumer streak",
313    description="Get consumer streak in number of weeks",
314    tags=["analytics"],
315)
316async def get_streaks(conn: database_dependency, consumer: ConsumerAuthDep) -> int:
317    """Get consumer collection streak in number of weeks.
318
319    Args:
320        conn: database connection
321        consumer: consumer session
322
323    Returns:
324        number of weeks
325    """
326    reservations: list[GetConsumersReservationsFullRow] = [
327        reservation
328        async for reservation in ReservationsQuerier(
329            conn
330        ).get_consumers_reservations_full(consumer_id=consumer.user_id)
331    ]
332    if len(reservations) == 0:
333        return 0
334    reservations.sort(key=lambda reservation: reservation.window_end, reverse=True)
335    streak_count = 0
336    last_counted_week = None
337    today = datetime.now(tz=UTC)
338    last_week = (today - timedelta(weeks=1)).isocalendar()[:2]
339    anchor_date = today
340    for reservation in reservations:
341        if reservation.window_end > today:
342            continue
343        if reservation.collected_at is None:
344            break
345        check_week = reservation.window_start.date().isocalendar()[:2]
346        if check_week == last_counted_week:
347            continue
348        if last_counted_week is None and check_week == last_week:
349            anchor_date = today - timedelta(weeks=1)
350        expected_week = (anchor_date - timedelta(weeks=streak_count)).isocalendar()[:2]
351        if check_week == expected_week:
352            streak_count += 1
353            last_counted_week = check_week
354        else:
355            break
356
357    return streak_count
router = <fastapi.routing.APIRouter object>
@router.get('', status_code=status.HTTP_200_OK, summary='Get all consumers', description='Retrieves a list of all registered consumers.')
async def get_consumers( conn: Annotated[sqlalchemy.ext.asyncio.engine.AsyncConnection, Depends(dependency=<bound method DatabaseManager.get_connection of <internal.database.manager.DatabaseManager object>>, use_cache=True, scope=None)]) -> list[internal.queries.consumer.GetConsumersRow]:
 87@router.get(
 88    "",
 89    status_code=status.HTTP_200_OK,
 90    summary="Get all consumers",
 91    description="Retrieves a list of all registered consumers.",
 92)
 93async def get_consumers(conn: database_dependency) -> list[GetConsumersRow]:
 94    """Get all consumers.
 95
 96    Args:
 97      conn: database connection
 98
 99    Returns:
100      list of all consumers
101    """
102    return [c async for c in ConsumerQuerier(conn).get_consumers()]

Get all consumers.

Arguments:
  • conn: database connection
Returns:

list of all consumers

@router.get('/me', status_code=status.HTTP_200_OK, summary='Get authenticated consumer', description='Retrieves the profile of the authenticated consumer.')
async def get_consumer_me( conn: Annotated[sqlalchemy.ext.asyncio.engine.AsyncConnection, Depends(dependency=<bound method DatabaseManager.get_connection of <internal.database.manager.DatabaseManager object>>, use_cache=True, scope=None)], consumer: Annotated[internal.queries.token.GetSessionByTokenRow, Security(dependency=<function consumer_auth>, use_cache=True, scope=None, scopes=None)]) -> internal.queries.consumer.GetConsumerRow:
105@router.get(
106    "/me",
107    status_code=status.HTTP_200_OK,
108    summary="Get authenticated consumer",
109    description="Retrieves the profile of the authenticated consumer.",
110)
111async def get_consumer_me(
112    conn: database_dependency, consumer: ConsumerAuthDep
113) -> GetConsumerRow:
114    """Get authenticated consumer profile.
115
116    Args:
117      conn: database connection
118      consumer: consumers session
119
120    Returns:
121      consumer profile
122
123    Raises:
124      HTTPException: if consumer not found
125    """
126    consumer_profile = await ConsumerQuerier(conn).get_consumer(
127        user_id=consumer.user_id
128    )
129    if not consumer_profile:
130        raise HTTPException(
131            status_code=status.HTTP_404_NOT_FOUND, detail="Consumer profile not found"
132        )
133    return consumer_profile

Get authenticated consumer profile.

Arguments:
  • conn: database connection
  • consumer: consumers session
Returns:

consumer profile

Raises:
  • HTTPException: if consumer not found
@router.get('/{consumer_id}', status_code=status.HTTP_200_OK, summary='Get consumer by ID', description='Retrieves the profile of a consumer by their unique ID.')
async def get_consumer_by_id( consumer_id: int, conn: Annotated[sqlalchemy.ext.asyncio.engine.AsyncConnection, Depends(dependency=<bound method DatabaseManager.get_connection of <internal.database.manager.DatabaseManager object>>, use_cache=True, scope=None)]) -> internal.queries.consumer.GetConsumerRow:
136@router.get(
137    "/{consumer_id}",
138    status_code=status.HTTP_200_OK,
139    summary="Get consumer by ID",
140    description="Retrieves the profile of a consumer by their unique ID.",
141)
142async def get_consumer_by_id(
143    consumer_id: int, conn: database_dependency
144) -> GetConsumerRow:
145    """Get consumer profile by ID.
146
147    Args:
148      consumer_id: unique identifier of the consumer
149      conn: database connection
150
151    Returns:
152      consumer profile
153
154    Raises:
155      HTTPException: if consumer not found
156    """
157    consumer_profile = await ConsumerQuerier(conn).get_consumer(user_id=consumer_id)
158    if not consumer_profile:
159        raise HTTPException(
160            status_code=status.HTTP_404_NOT_FOUND, detail="Consumer not found"
161        )
162    return consumer_profile

Get consumer profile by ID.

Arguments:
  • consumer_id: unique identifier of the consumer
  • conn: database connection
Returns:

consumer profile

Raises:
  • HTTPException: if consumer not found
@router.post('', status_code=status.HTTP_201_CREATED, summary='Register consumer', description='Registers a new consumer and their corresponding user entity.')
async def register_consumer( form: internal.auth.creation.CreateConsumerForm, conn: Annotated[sqlalchemy.ext.asyncio.engine.AsyncConnection, Depends(dependency=<bound method DatabaseManager.get_connection of <internal.database.manager.DatabaseManager object>>, use_cache=True, scope=None)]) -> None:
165@router.post(
166    "",
167    status_code=status.HTTP_201_CREATED,
168    summary="Register consumer",
169    description="Registers a new consumer and their corresponding user entity.",
170)
171async def register_consumer(
172    form: CreateConsumerForm, conn: database_dependency
173) -> None:
174    """Register consumer and corresponding user.
175
176    Args:
177      form: signup information for the user
178      conn: database connection
179    """
180    _ = await create_consumer(form, conn)

Register consumer and corresponding user.

Arguments:
  • form: signup information for the user
  • conn: database connection
@router.get('/me/reservations', tags=['reservations'], status_code=status.HTTP_200_OK, summary='Get consumer reservations', description='Retrieves all reservations made by the authenticated consumer.')
async def get_reservations( conn: Annotated[sqlalchemy.ext.asyncio.engine.AsyncConnection, Depends(dependency=<bound method DatabaseManager.get_connection of <internal.database.manager.DatabaseManager object>>, use_cache=True, scope=None)], consumer: Annotated[internal.queries.token.GetSessionByTokenRow, Security(dependency=<function consumer_auth>, use_cache=True, scope=None, scopes=None)]) -> list[internal.queries.models.Reservation]:
183@router.get(
184    "/me/reservations",
185    tags=["reservations"],
186    status_code=status.HTTP_200_OK,
187    summary="Get consumer reservations",
188    description="Retrieves all reservations made by the authenticated consumer.",
189)
190async def get_reservations(
191    conn: database_dependency, consumer: ConsumerAuthDep
192) -> list[Reservation]:
193    """Get consumers reservations.
194
195    Args:
196      conn: database connection
197      consumer: consumer session
198
199    Returns:
200        list of consumer reservations
201
202    Raises:
203        HTTPException: if failed to get reservations
204    """
205    reservations = [
206        item
207        async for item in ReservationsQuerier(conn).get_consumers_reservations(
208            consumer_id=consumer.user_id
209        )
210    ]
211    if reservations is None:
212        raise HTTPException(
213            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
214            detail="Failed to get reservations",
215        )
216    return list(reservations)

Get consumers reservations.

Arguments:
  • conn: database connection
  • consumer: consumer session
Returns:

list of consumer reservations

Raises:
  • HTTPException: if failed to get reservations
@router.get('/me/rescued', tags=['reservations'], status_code=status.HTTP_200_OK, summary='Get number of consumer rescued reservations', description='Retrieves number of rescued (collected) reservations.')
async def get_rescued( conn: Annotated[sqlalchemy.ext.asyncio.engine.AsyncConnection, Depends(dependency=<bound method DatabaseManager.get_connection of <internal.database.manager.DatabaseManager object>>, use_cache=True, scope=None)], consumer: Annotated[internal.queries.token.GetSessionByTokenRow, Security(dependency=<function consumer_auth>, use_cache=True, scope=None, scopes=None)]) -> int:
219@router.get(
220    "/me/rescued",
221    tags=["reservations"],
222    status_code=status.HTTP_200_OK,
223    summary="Get number of consumer rescued reservations",
224    description="Retrieves number of rescued (collected) reservations.",
225)
226async def get_rescued(conn: database_dependency, consumer: ConsumerAuthDep) -> int:
227    """Get count of rescued (collected) reservations for the authenticated consumer.
228
229    Args:
230      conn: database connection
231      consumer: consumer session
232
233    Returns:
234        number of rescued (collected) reservations
235    """
236    result = await ReservationsQuerier(conn).count_consumer_collected_reservations(
237        consumer_id=consumer.user_id
238    )
239    return result.collected_count if result else 0

Get count of rescued (collected) reservations for the authenticated consumer.

Arguments:
  • conn: database connection
  • consumer: consumer session
Returns:

number of rescued (collected) reservations

class UpdateConsumerForm(pydantic.main.BaseModel):
242class UpdateConsumerForm(BaseModel):
243    """Consumer name update form."""
244
245    first_name: str
246    last_name: str

Consumer name update form.

first_name: str = PydanticUndefined
last_name: str = PydanticUndefined
@router.patch('/me', status_code=status.HTTP_200_OK, summary='Update consumer profile', description='Updates the profile information (first and last name) for the authenticated consumer.')
async def update_consumer( form: UpdateConsumerForm, conn: Annotated[sqlalchemy.ext.asyncio.engine.AsyncConnection, Depends(dependency=<bound method DatabaseManager.get_connection of <internal.database.manager.DatabaseManager object>>, use_cache=True, scope=None)], consumer: Annotated[internal.queries.token.GetSessionByTokenRow, Security(dependency=<function consumer_auth>, use_cache=True, scope=None, scopes=None)]) -> None:
249@router.patch(
250    "/me",
251    status_code=status.HTTP_200_OK,
252    summary="Update consumer profile",
253    description=(
254        "Updates the profile information (first and last name) "
255        "for the authenticated consumer."
256    ),
257)
258async def update_consumer(
259    form: UpdateConsumerForm, conn: database_dependency, consumer: ConsumerAuthDep
260) -> None:
261    """Consumer name update.
262
263    Args:
264        form: consumer update form
265        conn: database connection
266        consumer: consumer session
267
268    Raises:
269        HTTPException: if failed to update consumer
270    """
271    updated_consumer = await ConsumerQuerier(conn).update_consumer(
272        UpdateConsumerParams(
273            user_id=consumer.user_id, fname=form.first_name, lname=form.last_name
274        )
275    )
276    if not updated_consumer:
277        raise HTTPException(
278            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
279            detail="Failed to update consumer",
280        )

Consumer name update.

Arguments:
  • form: consumer update form
  • conn: database connection
  • consumer: consumer session
Raises:
  • HTTPException: if failed to update consumer
@router.get('/me/badges', status_code=status.HTTP_200_OK, summary='Consumer badges', description='Get all acquired badges by consumer', tags=['badges'])
async def get_consumer_badges( conn: Annotated[sqlalchemy.ext.asyncio.engine.AsyncConnection, Depends(dependency=<bound method DatabaseManager.get_connection of <internal.database.manager.DatabaseManager object>>, use_cache=True, scope=None)], consumer: Annotated[internal.queries.token.GetSessionByTokenRow, Security(dependency=<function consumer_auth>, use_cache=True, scope=None, scopes=None)]) -> list[internal.queries.badge.GetConsumerBadgesRow]:
283@router.get(
284    "/me/badges",
285    status_code=status.HTTP_200_OK,
286    summary="Consumer badges",
287    description="Get all acquired badges by consumer",
288    tags=["badges"],
289)
290async def get_consumer_badges(
291    conn: database_dependency, consumer: ConsumerAuthDep
292) -> list[GetConsumerBadgesRow]:
293    """Get badges acquired by consumer.
294
295    Args:
296      conn: database connection
297      consumer: consumer session
298
299    Returns:
300      list of acquired badges
301    """
302    return [
303        badge
304        async for badge in BadgeQuerier(conn).get_consumer_badges(
305            user_id=consumer.user_id
306        )
307    ]

Get badges acquired by consumer.

Arguments:
  • conn: database connection
  • consumer: consumer session
Returns:

list of acquired badges

@router.get('/me/streaks', status_code=status.HTTP_200_OK, summary='Consumer streak', description='Get consumer streak in number of weeks', tags=['analytics'])
async def get_streaks( conn: Annotated[sqlalchemy.ext.asyncio.engine.AsyncConnection, Depends(dependency=<bound method DatabaseManager.get_connection of <internal.database.manager.DatabaseManager object>>, use_cache=True, scope=None)], consumer: Annotated[internal.queries.token.GetSessionByTokenRow, Security(dependency=<function consumer_auth>, use_cache=True, scope=None, scopes=None)]) -> int:
310@router.get(
311    "/me/streaks",
312    status_code=status.HTTP_200_OK,
313    summary="Consumer streak",
314    description="Get consumer streak in number of weeks",
315    tags=["analytics"],
316)
317async def get_streaks(conn: database_dependency, consumer: ConsumerAuthDep) -> int:
318    """Get consumer collection streak in number of weeks.
319
320    Args:
321        conn: database connection
322        consumer: consumer session
323
324    Returns:
325        number of weeks
326    """
327    reservations: list[GetConsumersReservationsFullRow] = [
328        reservation
329        async for reservation in ReservationsQuerier(
330            conn
331        ).get_consumers_reservations_full(consumer_id=consumer.user_id)
332    ]
333    if len(reservations) == 0:
334        return 0
335    reservations.sort(key=lambda reservation: reservation.window_end, reverse=True)
336    streak_count = 0
337    last_counted_week = None
338    today = datetime.now(tz=UTC)
339    last_week = (today - timedelta(weeks=1)).isocalendar()[:2]
340    anchor_date = today
341    for reservation in reservations:
342        if reservation.window_end > today:
343            continue
344        if reservation.collected_at is None:
345            break
346        check_week = reservation.window_start.date().isocalendar()[:2]
347        if check_week == last_counted_week:
348            continue
349        if last_counted_week is None and check_week == last_week:
350            anchor_date = today - timedelta(weeks=1)
351        expected_week = (anchor_date - timedelta(weeks=streak_count)).isocalendar()[:2]
352        if check_week == expected_week:
353            streak_count += 1
354            last_counted_week = check_week
355        else:
356            break
357
358    return streak_count

Get consumer collection streak in number of weeks.

Arguments:
  • conn: database connection
  • consumer: consumer session
Returns:

number of weeks