routers.consumers
Endpoint for consumers.
--- config: mirrorActors: false --- sequenceDiagram title Consumer Registration actor user box ./routers participant consumers.py@{ "type" : "boundary" } end box ./internal/database participant dd as database.py end box ./internal/auth participant creation.py participant security.py end box ./internal/queries participant user.py participant cq as consumer.py end participant database@{ "type" : "database" } user->>consumer.py: register consumer activate consumers.py dd->>consumers.py: yield connection activate dd consumers.py->>creation.py: await create_consumer() activate creation.py creation.py->>creation.py: create_user() creation.py->>security.py: hash_password() activate security.py security.py-->>creation.py: password hash deactivate security.py creation.py->>user.py: Queries.create_user() activate user.py user.py->>database: insert user activate database database-->>user.py: created user deactivate database user.py-->>creation.py: created user deactivate user.py creation.py-->>creation.py: created user creation.py->>cq: Queries.create_consumer() activate cq cq->>database: insert consumer activate database database-->>cq: created consumer deactivate database cq-->>creation.py: created consumer deactivate cq creation.py-->>consumers.py: created consumer deactivate creation.py consumers.py-->>user: 201 OK consumers.py-->>dd: return connection deactivate dd deactivate consumers.py
1"""Endpoint for consumers. 2 3```mermaid 4--- 5config: 6 mirrorActors: false 7--- 8sequenceDiagram 9 title Consumer Registration 10 actor user 11 box ./routers 12 participant consumers.py@{ "type" : "boundary" } 13 end 14 box ./internal/database 15 participant dd as database.py 16 end 17 box ./internal/auth 18 participant creation.py 19 participant security.py 20 end 21 box ./internal/queries 22 participant user.py 23 participant cq as consumer.py 24 end 25 participant database@{ "type" : "database" } 26 27 user->>consumer.py: register consumer 28 activate consumers.py 29 dd->>consumers.py: yield connection 30 activate dd 31 consumers.py->>creation.py: await create_consumer() 32 activate creation.py 33 creation.py->>creation.py: create_user() 34 creation.py->>security.py: hash_password() 35 activate security.py 36 security.py-->>creation.py: password hash 37 deactivate security.py 38 creation.py->>user.py: Queries.create_user() 39 activate user.py 40 user.py->>database: insert user 41 activate database 42 database-->>user.py: created user 43 deactivate database 44 user.py-->>creation.py: created user 45 deactivate user.py 46 creation.py-->>creation.py: created user 47 creation.py->>cq: Queries.create_consumer() 48 activate cq 49 cq->>database: insert consumer 50 activate database 51 database-->>cq: created consumer 52 deactivate database 53 cq-->>creation.py: created consumer 54 deactivate cq 55 creation.py-->>consumers.py: created consumer 56 deactivate creation.py 57 consumers.py-->>user: 201 OK 58 consumers.py-->>dd: return connection 59 deactivate dd 60 deactivate consumers.py 61``` 62""" 63 64from datetime import UTC, datetime, timedelta 65 66from fastapi import APIRouter, HTTPException, status 67from internal.auth.creation import CreateConsumerForm, create_consumer 68from internal.auth.middleware import ConsumerAuthDep 69from internal.database.dependency import database_dependency 70from internal.queries.badge import AsyncQuerier as BadgeQuerier 71from internal.queries.badge import GetConsumerBadgesRow 72from internal.queries.consumer import AsyncQuerier as ConsumerQuerier 73from internal.queries.consumer import ( 74 GetConsumerRow, 75 GetConsumersRow, 76 UpdateConsumerParams, 77) 78from internal.queries.models import Reservation 79from internal.queries.reservations import AsyncQuerier as ReservationsQuerier 80from internal.queries.reservations import GetConsumersReservationsFullRow 81from pydantic import BaseModel 82 83router = APIRouter(prefix="/consumers", tags=["consumers"]) 84 85 86@router.get( 87 "", 88 status_code=status.HTTP_200_OK, 89 summary="Get all consumers", 90 description="Retrieves a list of all registered consumers.", 91) 92async def get_consumers(conn: database_dependency) -> list[GetConsumersRow]: 93 """Get all consumers. 94 95 Args: 96 conn: database connection 97 98 Returns: 99 list of all consumers 100 """ 101 return [c async for c in ConsumerQuerier(conn).get_consumers()] 102 103 104@router.get( 105 "/me", 106 status_code=status.HTTP_200_OK, 107 summary="Get authenticated consumer", 108 description="Retrieves the profile of the authenticated consumer.", 109) 110async def get_consumer_me( 111 conn: database_dependency, consumer: ConsumerAuthDep 112) -> GetConsumerRow: 113 """Get authenticated consumer profile. 114 115 Args: 116 conn: database connection 117 consumer: consumers session 118 119 Returns: 120 consumer profile 121 122 Raises: 123 HTTPException: if consumer not found 124 """ 125 consumer_profile = await ConsumerQuerier(conn).get_consumer( 126 user_id=consumer.user_id 127 ) 128 if not consumer_profile: 129 raise HTTPException( 130 status_code=status.HTTP_404_NOT_FOUND, detail="Consumer profile not found" 131 ) 132 return consumer_profile 133 134 135@router.get( 136 "/{consumer_id}", 137 status_code=status.HTTP_200_OK, 138 summary="Get consumer by ID", 139 description="Retrieves the profile of a consumer by their unique ID.", 140) 141async def get_consumer_by_id( 142 consumer_id: int, conn: database_dependency 143) -> GetConsumerRow: 144 """Get consumer profile by ID. 145 146 Args: 147 consumer_id: unique identifier of the consumer 148 conn: database connection 149 150 Returns: 151 consumer profile 152 153 Raises: 154 HTTPException: if consumer not found 155 """ 156 consumer_profile = await ConsumerQuerier(conn).get_consumer(user_id=consumer_id) 157 if not consumer_profile: 158 raise HTTPException( 159 status_code=status.HTTP_404_NOT_FOUND, detail="Consumer not found" 160 ) 161 return consumer_profile 162 163 164@router.post( 165 "", 166 status_code=status.HTTP_201_CREATED, 167 summary="Register consumer", 168 description="Registers a new consumer and their corresponding user entity.", 169) 170async def register_consumer( 171 form: CreateConsumerForm, conn: database_dependency 172) -> None: 173 """Register consumer and corresponding user. 174 175 Args: 176 form: signup information for the user 177 conn: database connection 178 """ 179 _ = await create_consumer(form, conn) 180 181 182@router.get( 183 "/me/reservations", 184 tags=["reservations"], 185 status_code=status.HTTP_200_OK, 186 summary="Get consumer reservations", 187 description="Retrieves all reservations made by the authenticated consumer.", 188) 189async def get_reservations( 190 conn: database_dependency, consumer: ConsumerAuthDep 191) -> list[Reservation]: 192 """Get consumers reservations. 193 194 Args: 195 conn: database connection 196 consumer: consumer session 197 198 Returns: 199 list of consumer reservations 200 201 Raises: 202 HTTPException: if failed to get reservations 203 """ 204 reservations = [ 205 item 206 async for item in ReservationsQuerier(conn).get_consumers_reservations( 207 consumer_id=consumer.user_id 208 ) 209 ] 210 if reservations is None: 211 raise HTTPException( 212 status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, 213 detail="Failed to get reservations", 214 ) 215 return list(reservations) 216 217 218@router.get( 219 "/me/rescued", 220 tags=["reservations"], 221 status_code=status.HTTP_200_OK, 222 summary="Get number of consumer rescued reservations", 223 description="Retrieves number of rescued (collected) reservations.", 224) 225async def get_rescued(conn: database_dependency, consumer: ConsumerAuthDep) -> int: 226 """Get count of rescued (collected) reservations for the authenticated consumer. 227 228 Args: 229 conn: database connection 230 consumer: consumer session 231 232 Returns: 233 number of rescued (collected) reservations 234 """ 235 result = await ReservationsQuerier(conn).count_consumer_collected_reservations( 236 consumer_id=consumer.user_id 237 ) 238 return result.collected_count if result else 0 239 240 241class UpdateConsumerForm(BaseModel): 242 """Consumer name update form.""" 243 244 first_name: str 245 last_name: str 246 247 248@router.patch( 249 "/me", 250 status_code=status.HTTP_200_OK, 251 summary="Update consumer profile", 252 description=( 253 "Updates the profile information (first and last name) " 254 "for the authenticated consumer." 255 ), 256) 257async def update_consumer( 258 form: UpdateConsumerForm, conn: database_dependency, consumer: ConsumerAuthDep 259) -> None: 260 """Consumer name update. 261 262 Args: 263 form: consumer update form 264 conn: database connection 265 consumer: consumer session 266 267 Raises: 268 HTTPException: if failed to update consumer 269 """ 270 updated_consumer = await ConsumerQuerier(conn).update_consumer( 271 UpdateConsumerParams( 272 user_id=consumer.user_id, fname=form.first_name, lname=form.last_name 273 ) 274 ) 275 if not updated_consumer: 276 raise HTTPException( 277 status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, 278 detail="Failed to update consumer", 279 ) 280 281 282@router.get( 283 "/me/badges", 284 status_code=status.HTTP_200_OK, 285 summary="Consumer badges", 286 description="Get all acquired badges by consumer", 287 tags=["badges"], 288) 289async def get_consumer_badges( 290 conn: database_dependency, consumer: ConsumerAuthDep 291) -> list[GetConsumerBadgesRow]: 292 """Get badges acquired by consumer. 293 294 Args: 295 conn: database connection 296 consumer: consumer session 297 298 Returns: 299 list of acquired badges 300 """ 301 return [ 302 badge 303 async for badge in BadgeQuerier(conn).get_consumer_badges( 304 user_id=consumer.user_id 305 ) 306 ] 307 308 309@router.get( 310 "/me/streaks", 311 status_code=status.HTTP_200_OK, 312 summary="Consumer streak", 313 description="Get consumer streak in number of weeks", 314 tags=["analytics"], 315) 316async def get_streaks(conn: database_dependency, consumer: ConsumerAuthDep) -> int: 317 """Get consumer collection streak in number of weeks. 318 319 Args: 320 conn: database connection 321 consumer: consumer session 322 323 Returns: 324 number of weeks 325 """ 326 reservations: list[GetConsumersReservationsFullRow] = [ 327 reservation 328 async for reservation in ReservationsQuerier( 329 conn 330 ).get_consumers_reservations_full(consumer_id=consumer.user_id) 331 ] 332 if len(reservations) == 0: 333 return 0 334 reservations.sort(key=lambda reservation: reservation.window_end, reverse=True) 335 streak_count = 0 336 last_counted_week = None 337 today = datetime.now(tz=UTC) 338 last_week = (today - timedelta(weeks=1)).isocalendar()[:2] 339 anchor_date = today 340 for reservation in reservations: 341 if reservation.window_end > today: 342 continue 343 if reservation.collected_at is None: 344 break 345 check_week = reservation.window_start.date().isocalendar()[:2] 346 if check_week == last_counted_week: 347 continue 348 if last_counted_week is None and check_week == last_week: 349 anchor_date = today - timedelta(weeks=1) 350 expected_week = (anchor_date - timedelta(weeks=streak_count)).isocalendar()[:2] 351 if check_week == expected_week: 352 streak_count += 1 353 last_counted_week = check_week 354 else: 355 break 356 357 return streak_count
router =
<fastapi.routing.APIRouter object>
@router.get('', status_code=status.HTTP_200_OK, summary='Get all consumers', description='Retrieves a list of all registered consumers.')
async def
get_consumers( conn: Annotated[sqlalchemy.ext.asyncio.engine.AsyncConnection, Depends(dependency=<bound method DatabaseManager.get_connection of <internal.database.manager.DatabaseManager object>>, use_cache=True, scope=None)]) -> list[internal.queries.consumer.GetConsumersRow]:
87@router.get( 88 "", 89 status_code=status.HTTP_200_OK, 90 summary="Get all consumers", 91 description="Retrieves a list of all registered consumers.", 92) 93async def get_consumers(conn: database_dependency) -> list[GetConsumersRow]: 94 """Get all consumers. 95 96 Args: 97 conn: database connection 98 99 Returns: 100 list of all consumers 101 """ 102 return [c async for c in ConsumerQuerier(conn).get_consumers()]
Get all consumers.
Arguments:
- conn: database connection
Returns:
list of all consumers
@router.get('/me', status_code=status.HTTP_200_OK, summary='Get authenticated consumer', description='Retrieves the profile of the authenticated consumer.')
async def
get_consumer_me( conn: Annotated[sqlalchemy.ext.asyncio.engine.AsyncConnection, Depends(dependency=<bound method DatabaseManager.get_connection of <internal.database.manager.DatabaseManager object>>, use_cache=True, scope=None)], consumer: Annotated[internal.queries.token.GetSessionByTokenRow, Security(dependency=<function consumer_auth>, use_cache=True, scope=None, scopes=None)]) -> internal.queries.consumer.GetConsumerRow:
105@router.get( 106 "/me", 107 status_code=status.HTTP_200_OK, 108 summary="Get authenticated consumer", 109 description="Retrieves the profile of the authenticated consumer.", 110) 111async def get_consumer_me( 112 conn: database_dependency, consumer: ConsumerAuthDep 113) -> GetConsumerRow: 114 """Get authenticated consumer profile. 115 116 Args: 117 conn: database connection 118 consumer: consumers session 119 120 Returns: 121 consumer profile 122 123 Raises: 124 HTTPException: if consumer not found 125 """ 126 consumer_profile = await ConsumerQuerier(conn).get_consumer( 127 user_id=consumer.user_id 128 ) 129 if not consumer_profile: 130 raise HTTPException( 131 status_code=status.HTTP_404_NOT_FOUND, detail="Consumer profile not found" 132 ) 133 return consumer_profile
Get authenticated consumer profile.
Arguments:
- conn: database connection
- consumer: consumers session
Returns:
consumer profile
Raises:
- HTTPException: if consumer not found
@router.get('/{consumer_id}', status_code=status.HTTP_200_OK, summary='Get consumer by ID', description='Retrieves the profile of a consumer by their unique ID.')
async def
get_consumer_by_id( consumer_id: int, conn: Annotated[sqlalchemy.ext.asyncio.engine.AsyncConnection, Depends(dependency=<bound method DatabaseManager.get_connection of <internal.database.manager.DatabaseManager object>>, use_cache=True, scope=None)]) -> internal.queries.consumer.GetConsumerRow:
136@router.get( 137 "/{consumer_id}", 138 status_code=status.HTTP_200_OK, 139 summary="Get consumer by ID", 140 description="Retrieves the profile of a consumer by their unique ID.", 141) 142async def get_consumer_by_id( 143 consumer_id: int, conn: database_dependency 144) -> GetConsumerRow: 145 """Get consumer profile by ID. 146 147 Args: 148 consumer_id: unique identifier of the consumer 149 conn: database connection 150 151 Returns: 152 consumer profile 153 154 Raises: 155 HTTPException: if consumer not found 156 """ 157 consumer_profile = await ConsumerQuerier(conn).get_consumer(user_id=consumer_id) 158 if not consumer_profile: 159 raise HTTPException( 160 status_code=status.HTTP_404_NOT_FOUND, detail="Consumer not found" 161 ) 162 return consumer_profile
Get consumer profile by ID.
Arguments:
- consumer_id: unique identifier of the consumer
- conn: database connection
Returns:
consumer profile
Raises:
- HTTPException: if consumer not found
@router.post('', status_code=status.HTTP_201_CREATED, summary='Register consumer', description='Registers a new consumer and their corresponding user entity.')
async def
register_consumer( form: internal.auth.creation.CreateConsumerForm, conn: Annotated[sqlalchemy.ext.asyncio.engine.AsyncConnection, Depends(dependency=<bound method DatabaseManager.get_connection of <internal.database.manager.DatabaseManager object>>, use_cache=True, scope=None)]) -> None:
165@router.post( 166 "", 167 status_code=status.HTTP_201_CREATED, 168 summary="Register consumer", 169 description="Registers a new consumer and their corresponding user entity.", 170) 171async def register_consumer( 172 form: CreateConsumerForm, conn: database_dependency 173) -> None: 174 """Register consumer and corresponding user. 175 176 Args: 177 form: signup information for the user 178 conn: database connection 179 """ 180 _ = await create_consumer(form, conn)
Register consumer and corresponding user.
Arguments:
- form: signup information for the user
- conn: database connection
@router.get('/me/reservations', tags=['reservations'], status_code=status.HTTP_200_OK, summary='Get consumer reservations', description='Retrieves all reservations made by the authenticated consumer.')
async def
get_reservations( conn: Annotated[sqlalchemy.ext.asyncio.engine.AsyncConnection, Depends(dependency=<bound method DatabaseManager.get_connection of <internal.database.manager.DatabaseManager object>>, use_cache=True, scope=None)], consumer: Annotated[internal.queries.token.GetSessionByTokenRow, Security(dependency=<function consumer_auth>, use_cache=True, scope=None, scopes=None)]) -> list[internal.queries.models.Reservation]:
183@router.get( 184 "/me/reservations", 185 tags=["reservations"], 186 status_code=status.HTTP_200_OK, 187 summary="Get consumer reservations", 188 description="Retrieves all reservations made by the authenticated consumer.", 189) 190async def get_reservations( 191 conn: database_dependency, consumer: ConsumerAuthDep 192) -> list[Reservation]: 193 """Get consumers reservations. 194 195 Args: 196 conn: database connection 197 consumer: consumer session 198 199 Returns: 200 list of consumer reservations 201 202 Raises: 203 HTTPException: if failed to get reservations 204 """ 205 reservations = [ 206 item 207 async for item in ReservationsQuerier(conn).get_consumers_reservations( 208 consumer_id=consumer.user_id 209 ) 210 ] 211 if reservations is None: 212 raise HTTPException( 213 status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, 214 detail="Failed to get reservations", 215 ) 216 return list(reservations)
Get consumers reservations.
Arguments:
- conn: database connection
- consumer: consumer session
Returns:
list of consumer reservations
Raises:
- HTTPException: if failed to get reservations
@router.get('/me/rescued', tags=['reservations'], status_code=status.HTTP_200_OK, summary='Get number of consumer rescued reservations', description='Retrieves number of rescued (collected) reservations.')
async def
get_rescued( conn: Annotated[sqlalchemy.ext.asyncio.engine.AsyncConnection, Depends(dependency=<bound method DatabaseManager.get_connection of <internal.database.manager.DatabaseManager object>>, use_cache=True, scope=None)], consumer: Annotated[internal.queries.token.GetSessionByTokenRow, Security(dependency=<function consumer_auth>, use_cache=True, scope=None, scopes=None)]) -> int:
219@router.get( 220 "/me/rescued", 221 tags=["reservations"], 222 status_code=status.HTTP_200_OK, 223 summary="Get number of consumer rescued reservations", 224 description="Retrieves number of rescued (collected) reservations.", 225) 226async def get_rescued(conn: database_dependency, consumer: ConsumerAuthDep) -> int: 227 """Get count of rescued (collected) reservations for the authenticated consumer. 228 229 Args: 230 conn: database connection 231 consumer: consumer session 232 233 Returns: 234 number of rescued (collected) reservations 235 """ 236 result = await ReservationsQuerier(conn).count_consumer_collected_reservations( 237 consumer_id=consumer.user_id 238 ) 239 return result.collected_count if result else 0
Get count of rescued (collected) reservations for the authenticated consumer.
Arguments:
- conn: database connection
- consumer: consumer session
Returns:
number of rescued (collected) reservations
class
UpdateConsumerForm(pydantic.main.BaseModel):
242class UpdateConsumerForm(BaseModel): 243 """Consumer name update form.""" 244 245 first_name: str 246 last_name: str
Consumer name update form.
@router.patch('/me', status_code=status.HTTP_200_OK, summary='Update consumer profile', description='Updates the profile information (first and last name) for the authenticated consumer.')
async def
update_consumer( form: UpdateConsumerForm, conn: Annotated[sqlalchemy.ext.asyncio.engine.AsyncConnection, Depends(dependency=<bound method DatabaseManager.get_connection of <internal.database.manager.DatabaseManager object>>, use_cache=True, scope=None)], consumer: Annotated[internal.queries.token.GetSessionByTokenRow, Security(dependency=<function consumer_auth>, use_cache=True, scope=None, scopes=None)]) -> None:
249@router.patch( 250 "/me", 251 status_code=status.HTTP_200_OK, 252 summary="Update consumer profile", 253 description=( 254 "Updates the profile information (first and last name) " 255 "for the authenticated consumer." 256 ), 257) 258async def update_consumer( 259 form: UpdateConsumerForm, conn: database_dependency, consumer: ConsumerAuthDep 260) -> None: 261 """Consumer name update. 262 263 Args: 264 form: consumer update form 265 conn: database connection 266 consumer: consumer session 267 268 Raises: 269 HTTPException: if failed to update consumer 270 """ 271 updated_consumer = await ConsumerQuerier(conn).update_consumer( 272 UpdateConsumerParams( 273 user_id=consumer.user_id, fname=form.first_name, lname=form.last_name 274 ) 275 ) 276 if not updated_consumer: 277 raise HTTPException( 278 status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, 279 detail="Failed to update consumer", 280 )
Consumer name update.
Arguments:
- form: consumer update form
- conn: database connection
- consumer: consumer session
Raises:
- HTTPException: if failed to update consumer
@router.get('/me/badges', status_code=status.HTTP_200_OK, summary='Consumer badges', description='Get all acquired badges by consumer', tags=['badges'])
async def
get_consumer_badges( conn: Annotated[sqlalchemy.ext.asyncio.engine.AsyncConnection, Depends(dependency=<bound method DatabaseManager.get_connection of <internal.database.manager.DatabaseManager object>>, use_cache=True, scope=None)], consumer: Annotated[internal.queries.token.GetSessionByTokenRow, Security(dependency=<function consumer_auth>, use_cache=True, scope=None, scopes=None)]) -> list[internal.queries.badge.GetConsumerBadgesRow]:
283@router.get( 284 "/me/badges", 285 status_code=status.HTTP_200_OK, 286 summary="Consumer badges", 287 description="Get all acquired badges by consumer", 288 tags=["badges"], 289) 290async def get_consumer_badges( 291 conn: database_dependency, consumer: ConsumerAuthDep 292) -> list[GetConsumerBadgesRow]: 293 """Get badges acquired by consumer. 294 295 Args: 296 conn: database connection 297 consumer: consumer session 298 299 Returns: 300 list of acquired badges 301 """ 302 return [ 303 badge 304 async for badge in BadgeQuerier(conn).get_consumer_badges( 305 user_id=consumer.user_id 306 ) 307 ]
Get badges acquired by consumer.
Arguments:
- conn: database connection
- consumer: consumer session
Returns:
list of acquired badges
@router.get('/me/streaks', status_code=status.HTTP_200_OK, summary='Consumer streak', description='Get consumer streak in number of weeks', tags=['analytics'])
async def
get_streaks( conn: Annotated[sqlalchemy.ext.asyncio.engine.AsyncConnection, Depends(dependency=<bound method DatabaseManager.get_connection of <internal.database.manager.DatabaseManager object>>, use_cache=True, scope=None)], consumer: Annotated[internal.queries.token.GetSessionByTokenRow, Security(dependency=<function consumer_auth>, use_cache=True, scope=None, scopes=None)]) -> int:
310@router.get( 311 "/me/streaks", 312 status_code=status.HTTP_200_OK, 313 summary="Consumer streak", 314 description="Get consumer streak in number of weeks", 315 tags=["analytics"], 316) 317async def get_streaks(conn: database_dependency, consumer: ConsumerAuthDep) -> int: 318 """Get consumer collection streak in number of weeks. 319 320 Args: 321 conn: database connection 322 consumer: consumer session 323 324 Returns: 325 number of weeks 326 """ 327 reservations: list[GetConsumersReservationsFullRow] = [ 328 reservation 329 async for reservation in ReservationsQuerier( 330 conn 331 ).get_consumers_reservations_full(consumer_id=consumer.user_id) 332 ] 333 if len(reservations) == 0: 334 return 0 335 reservations.sort(key=lambda reservation: reservation.window_end, reverse=True) 336 streak_count = 0 337 last_counted_week = None 338 today = datetime.now(tz=UTC) 339 last_week = (today - timedelta(weeks=1)).isocalendar()[:2] 340 anchor_date = today 341 for reservation in reservations: 342 if reservation.window_end > today: 343 continue 344 if reservation.collected_at is None: 345 break 346 check_week = reservation.window_start.date().isocalendar()[:2] 347 if check_week == last_counted_week: 348 continue 349 if last_counted_week is None and check_week == last_week: 350 anchor_date = today - timedelta(weeks=1) 351 expected_week = (anchor_date - timedelta(weeks=streak_count)).isocalendar()[:2] 352 if check_week == expected_week: 353 streak_count += 1 354 last_counted_week = check_week 355 else: 356 break 357 358 return streak_count
Get consumer collection streak in number of weeks.
Arguments:
- conn: database connection
- consumer: consumer session
Returns:
number of weeks