internal.badges.engine
Badges processing engine.
1"""Badges processing engine.""" 2 3from datetime import UTC, datetime, timedelta 4 5from fastapi import BackgroundTasks 6from pydantic import BaseModel 7from sqlalchemy.ext.asyncio import AsyncConnection 8 9from internal.database.manager import database_manager 10from internal.queries.badge import ( 11 AcquireBadgeParams, 12 GetConsumerBadgesRow, 13 UpdateBadgeLevelParams, 14) 15from internal.queries.badge import AsyncQuerier as BadgeQuerier 16from internal.queries.models import BadgesAcquired 17from internal.queries.reservations import AsyncQuerier as ReservationQuerier 18from internal.queries.reservations import GetConsumersReservationsFullRow 19from internal.settings.config import BadgesConfig, badges_config 20 21 22class BadgeEngine: 23 """Badges acquiring engine.""" 24 25 background_tasks: BackgroundTasks 26 27 class AcquireBadgeRule(BaseModel): 28 """Next badge level to acquire.""" 29 30 badge_id: int 31 rule: BadgesConfig.BadgeGoal 32 33 def __init__(self, background_tasks: BackgroundTasks) -> None: 34 """Init engine for consumer.""" 35 self.background_tasks = background_tasks 36 37 def run(self, consumer_id: int, bundle_window_start: datetime) -> None: 38 """Starts background badge check task. 39 40 Args: 41 consumer_id: consumer id 42 bundle_window_start: current collection window start 43 """ 44 badges_rules = badges_config.badges_rules.badges_rules 45 self.background_tasks.add_task( 46 self.process_badges, consumer_id, badges_rules, bundle_window_start 47 ) 48 49 @staticmethod 50 def to_acquire( 51 badges_rules: list[BadgesConfig.BadgeRules], 52 acquired_badges: list[GetConsumerBadgesRow], 53 ) -> list[BadgeEngine.AcquireBadgeRule]: 54 """Gives levels of badges next to be acquired. 55 56 Args: 57 badges_rules: all rules for all badges 58 acquired_badges: badges acquired by consumer 59 60 Returns: 61 list of badges levels to acquire next 62 """ 63 acquired_levels = {badge.badge_id: badge.level for badge in acquired_badges} 64 to_acquire_badges: list[BadgeEngine.AcquireBadgeRule] = [] 65 for badge_rules in badges_rules: 66 current_level = acquired_levels.get(badge_rules.badge_id, 0) 67 next_level_rule = next( 68 (rule for rule in badge_rules.rules if rule.level > current_level), None 69 ) 70 if not next_level_rule: 71 continue 72 to_acquire_badges.append( 73 BadgeEngine.AcquireBadgeRule( 74 badge_id=badge_rules.badge_id, rule=next_level_rule 75 ) 76 ) 77 return to_acquire_badges 78 79 @staticmethod 80 def quantity_goal( 81 reservations_full: list[GetConsumersReservationsFullRow], 82 rule: BadgesConfig.QuantityGoal, 83 ) -> bool: 84 """Quantity goal rule check. 85 86 Args: 87 reservations_full: extended reservations info, 88 rule: rules for this badge 89 90 Returns: 91 if badge meets requirements 92 """ 93 reservations_filtered = [ 94 reservation 95 for reservation in reservations_full 96 if reservation.collected_at is not None 97 ] 98 if rule.category_id: 99 reservations_filtered = [ 100 reservation 101 for reservation in reservations_filtered 102 if rule.category_id in reservation.category_ids 103 ] 104 if rule.seller_id: 105 reservations_filtered = [ 106 reservation 107 for reservation in reservations_filtered 108 if reservation.seller_id == rule.seller_id 109 ] 110 return not len(reservations_filtered) < rule.quantity 111 112 @staticmethod 113 def co2_goal( 114 reservations_full: list[GetConsumersReservationsFullRow], 115 rule: BadgesConfig.CO2Goal, 116 ) -> bool: 117 """CO2 goal rule check. 118 119 Args: 120 reservations_full: extended reservations info, 121 rule: rules for this badge 122 123 Returns: 124 if badge meets requirements 125 """ 126 reservations_filtered = [ 127 reservation 128 for reservation in reservations_full 129 if reservation.collected_at is not None 130 ] 131 carbon_dioxide = sum( 132 reservation.carbon_dioxide for reservation in reservations_filtered 133 ) 134 return not carbon_dioxide < rule.carbon_dioxide 135 136 @staticmethod 137 def diversity_goal( 138 reservations_full: list[GetConsumersReservationsFullRow], 139 rule: BadgesConfig.DiversityGoal, 140 ) -> bool: 141 """Diversity goal rule check. 142 143 Args: 144 reservations_full: extended reservations info, 145 rule: rules for this badge 146 147 Returns: 148 if badge meets requirements 149 """ 150 reservations_filtered = [ 151 reservation 152 for reservation in reservations_full 153 if reservation.collected_at is not None 154 ] 155 if rule.dif_categories >= 1: 156 categories = { 157 cid 158 for reservation in reservations_filtered 159 for cid in reservation.category_ids 160 } 161 if len(categories) < rule.dif_categories: 162 return False 163 if rule.dif_sellers >= 1: 164 sellers = {reservation.seller_id for reservation in reservations_filtered} 165 if len(sellers) < rule.dif_sellers: 166 return False 167 return True 168 169 @staticmethod 170 def streak_goal_filter_sort( 171 reservations_full: list[GetConsumersReservationsFullRow], 172 rule: BadgesConfig.StreakGoal, 173 ) -> list[GetConsumersReservationsFullRow]: 174 """Filter and sort reservations for streak goal. 175 176 Args: 177 reservations_full: reservations with extended info 178 rule: badge rules 179 180 Returns: 181 list of sorted and filtered reservations 182 """ 183 reservations_filtered = reservations_full.copy() 184 if rule.category_id: 185 reservations_filtered = [ 186 reservation 187 for reservation in reservations_filtered 188 if rule.category_id in reservation.category_ids 189 ] 190 if rule.seller_id: 191 reservations_filtered = [ 192 reservation 193 for reservation in reservations_filtered 194 if reservation.seller_id == rule.seller_id 195 ] 196 return sorted( 197 reservations_filtered, 198 key=lambda reservation: reservation.window_start, 199 reverse=True, 200 ) 201 202 @staticmethod 203 def streak_goal( 204 reservations_full: list[GetConsumersReservationsFullRow], 205 rule: BadgesConfig.StreakGoal, 206 bundle_window_start: datetime, 207 ) -> bool: 208 """Streak goal rule check. 209 210 Args: 211 reservations_full: extended reservations info, 212 rule: rules for this badge 213 bundle_window_start: window start for current collection 214 215 Returns: 216 if badge meets requirements 217 """ 218 reservations_sorted = BadgeEngine.streak_goal_filter_sort( 219 reservations_full, rule 220 ) 221 reservations_streak: list[GetConsumersReservationsFullRow] = [] 222 for reservation in reservations_sorted: 223 if ( 224 not reservation.collected_at is not None 225 and reservation.window_end > datetime.now(tz=UTC) 226 ): 227 continue 228 if not reservation.collected_at: 229 break 230 reservations_streak.append(reservation) 231 if ( 232 rule.streak_quantity >= 1 233 and len(reservations_streak) < rule.streak_quantity 234 ): 235 return False 236 if rule.streak_days >= 1: 237 days = 0 238 previous_date = bundle_window_start.date() + timedelta(days=1) 239 for reservation in reservations_streak: 240 reservation_date = reservation.window_start.date() 241 if reservation_date + timedelta(days=1) == previous_date: 242 days += 1 243 previous_date = reservation_date 244 continue 245 if reservation_date != previous_date: 246 break 247 if days < rule.streak_days: 248 return False 249 return True 250 251 @staticmethod 252 async def update_badge( 253 conn: AsyncConnection, consumer_id: int, badge_id: int, level: int 254 ) -> BadgesAcquired | None: 255 """Inserts or updates acquired badge record. 256 257 Args: 258 conn: database connection 259 consumer_id: consumer id 260 badge_id: badge id 261 level: badge level 262 263 Returns: 264 acquired badge 265 """ 266 if level == 1: 267 return await BadgeQuerier(conn).acquire_badge( 268 AcquireBadgeParams(user_id=consumer_id, badge_id=badge_id, level=level) 269 ) 270 return await BadgeQuerier(conn).update_badge_level( 271 UpdateBadgeLevelParams(user_id=consumer_id, badge_id=badge_id, level=level) 272 ) 273 274 @staticmethod 275 async def process_badges( 276 consumer_id: int, 277 badges_rules: list[BadgesConfig.BadgeRules], 278 bundle_window_start: datetime, 279 ) -> None: 280 """Background task to acquire badges. 281 282 Args: 283 consumer_id: consumer id 284 badges_rules: rules for all badges 285 bundle_window_start: start window for current collection 286 287 Raises: 288 ValueError: if failed to acquire badge 289 """ 290 async for conn in database_manager.get_connection(): 291 badges = [ 292 badges 293 async for badges in BadgeQuerier(conn).get_consumer_badges( 294 user_id=consumer_id 295 ) 296 ] 297 if not (acquire_badges := BadgeEngine.to_acquire(badges_rules, badges)): 298 return 299 reservations_full = [ 300 reservation 301 async for reservation in ReservationQuerier( 302 conn 303 ).get_consumers_reservations_full(consumer_id=consumer_id) 304 ] 305 for acquire_badge in acquire_badges: 306 rule = acquire_badge.rule 307 to_acquire = False 308 match rule: 309 case BadgesConfig.QuantityGoal(): 310 to_acquire = BadgeEngine.quantity_goal(reservations_full, rule) 311 case BadgesConfig.CO2Goal(): 312 to_acquire = BadgeEngine.co2_goal(reservations_full, rule) 313 case BadgesConfig.DiversityGoal(): 314 to_acquire = BadgeEngine.diversity_goal(reservations_full, rule) 315 case BadgesConfig.StreakGoal(): 316 to_acquire = BadgeEngine.streak_goal( 317 reservations_full, rule, bundle_window_start 318 ) 319 if not to_acquire: 320 continue 321 acquired_badge = await BadgeEngine.update_badge( 322 conn, consumer_id, acquire_badge.badge_id, acquire_badge.rule.level 323 ) 324 if not acquired_badge: 325 raise ValueError("Failed to insert acquire badge record")
class
BadgeEngine:
23class BadgeEngine: 24 """Badges acquiring engine.""" 25 26 background_tasks: BackgroundTasks 27 28 class AcquireBadgeRule(BaseModel): 29 """Next badge level to acquire.""" 30 31 badge_id: int 32 rule: BadgesConfig.BadgeGoal 33 34 def __init__(self, background_tasks: BackgroundTasks) -> None: 35 """Init engine for consumer.""" 36 self.background_tasks = background_tasks 37 38 def run(self, consumer_id: int, bundle_window_start: datetime) -> None: 39 """Starts background badge check task. 40 41 Args: 42 consumer_id: consumer id 43 bundle_window_start: current collection window start 44 """ 45 badges_rules = badges_config.badges_rules.badges_rules 46 self.background_tasks.add_task( 47 self.process_badges, consumer_id, badges_rules, bundle_window_start 48 ) 49 50 @staticmethod 51 def to_acquire( 52 badges_rules: list[BadgesConfig.BadgeRules], 53 acquired_badges: list[GetConsumerBadgesRow], 54 ) -> list[BadgeEngine.AcquireBadgeRule]: 55 """Gives levels of badges next to be acquired. 56 57 Args: 58 badges_rules: all rules for all badges 59 acquired_badges: badges acquired by consumer 60 61 Returns: 62 list of badges levels to acquire next 63 """ 64 acquired_levels = {badge.badge_id: badge.level for badge in acquired_badges} 65 to_acquire_badges: list[BadgeEngine.AcquireBadgeRule] = [] 66 for badge_rules in badges_rules: 67 current_level = acquired_levels.get(badge_rules.badge_id, 0) 68 next_level_rule = next( 69 (rule for rule in badge_rules.rules if rule.level > current_level), None 70 ) 71 if not next_level_rule: 72 continue 73 to_acquire_badges.append( 74 BadgeEngine.AcquireBadgeRule( 75 badge_id=badge_rules.badge_id, rule=next_level_rule 76 ) 77 ) 78 return to_acquire_badges 79 80 @staticmethod 81 def quantity_goal( 82 reservations_full: list[GetConsumersReservationsFullRow], 83 rule: BadgesConfig.QuantityGoal, 84 ) -> bool: 85 """Quantity goal rule check. 86 87 Args: 88 reservations_full: extended reservations info, 89 rule: rules for this badge 90 91 Returns: 92 if badge meets requirements 93 """ 94 reservations_filtered = [ 95 reservation 96 for reservation in reservations_full 97 if reservation.collected_at is not None 98 ] 99 if rule.category_id: 100 reservations_filtered = [ 101 reservation 102 for reservation in reservations_filtered 103 if rule.category_id in reservation.category_ids 104 ] 105 if rule.seller_id: 106 reservations_filtered = [ 107 reservation 108 for reservation in reservations_filtered 109 if reservation.seller_id == rule.seller_id 110 ] 111 return not len(reservations_filtered) < rule.quantity 112 113 @staticmethod 114 def co2_goal( 115 reservations_full: list[GetConsumersReservationsFullRow], 116 rule: BadgesConfig.CO2Goal, 117 ) -> bool: 118 """CO2 goal rule check. 119 120 Args: 121 reservations_full: extended reservations info, 122 rule: rules for this badge 123 124 Returns: 125 if badge meets requirements 126 """ 127 reservations_filtered = [ 128 reservation 129 for reservation in reservations_full 130 if reservation.collected_at is not None 131 ] 132 carbon_dioxide = sum( 133 reservation.carbon_dioxide for reservation in reservations_filtered 134 ) 135 return not carbon_dioxide < rule.carbon_dioxide 136 137 @staticmethod 138 def diversity_goal( 139 reservations_full: list[GetConsumersReservationsFullRow], 140 rule: BadgesConfig.DiversityGoal, 141 ) -> bool: 142 """Diversity goal rule check. 143 144 Args: 145 reservations_full: extended reservations info, 146 rule: rules for this badge 147 148 Returns: 149 if badge meets requirements 150 """ 151 reservations_filtered = [ 152 reservation 153 for reservation in reservations_full 154 if reservation.collected_at is not None 155 ] 156 if rule.dif_categories >= 1: 157 categories = { 158 cid 159 for reservation in reservations_filtered 160 for cid in reservation.category_ids 161 } 162 if len(categories) < rule.dif_categories: 163 return False 164 if rule.dif_sellers >= 1: 165 sellers = {reservation.seller_id for reservation in reservations_filtered} 166 if len(sellers) < rule.dif_sellers: 167 return False 168 return True 169 170 @staticmethod 171 def streak_goal_filter_sort( 172 reservations_full: list[GetConsumersReservationsFullRow], 173 rule: BadgesConfig.StreakGoal, 174 ) -> list[GetConsumersReservationsFullRow]: 175 """Filter and sort reservations for streak goal. 176 177 Args: 178 reservations_full: reservations with extended info 179 rule: badge rules 180 181 Returns: 182 list of sorted and filtered reservations 183 """ 184 reservations_filtered = reservations_full.copy() 185 if rule.category_id: 186 reservations_filtered = [ 187 reservation 188 for reservation in reservations_filtered 189 if rule.category_id in reservation.category_ids 190 ] 191 if rule.seller_id: 192 reservations_filtered = [ 193 reservation 194 for reservation in reservations_filtered 195 if reservation.seller_id == rule.seller_id 196 ] 197 return sorted( 198 reservations_filtered, 199 key=lambda reservation: reservation.window_start, 200 reverse=True, 201 ) 202 203 @staticmethod 204 def streak_goal( 205 reservations_full: list[GetConsumersReservationsFullRow], 206 rule: BadgesConfig.StreakGoal, 207 bundle_window_start: datetime, 208 ) -> bool: 209 """Streak goal rule check. 210 211 Args: 212 reservations_full: extended reservations info, 213 rule: rules for this badge 214 bundle_window_start: window start for current collection 215 216 Returns: 217 if badge meets requirements 218 """ 219 reservations_sorted = BadgeEngine.streak_goal_filter_sort( 220 reservations_full, rule 221 ) 222 reservations_streak: list[GetConsumersReservationsFullRow] = [] 223 for reservation in reservations_sorted: 224 if ( 225 not reservation.collected_at is not None 226 and reservation.window_end > datetime.now(tz=UTC) 227 ): 228 continue 229 if not reservation.collected_at: 230 break 231 reservations_streak.append(reservation) 232 if ( 233 rule.streak_quantity >= 1 234 and len(reservations_streak) < rule.streak_quantity 235 ): 236 return False 237 if rule.streak_days >= 1: 238 days = 0 239 previous_date = bundle_window_start.date() + timedelta(days=1) 240 for reservation in reservations_streak: 241 reservation_date = reservation.window_start.date() 242 if reservation_date + timedelta(days=1) == previous_date: 243 days += 1 244 previous_date = reservation_date 245 continue 246 if reservation_date != previous_date: 247 break 248 if days < rule.streak_days: 249 return False 250 return True 251 252 @staticmethod 253 async def update_badge( 254 conn: AsyncConnection, consumer_id: int, badge_id: int, level: int 255 ) -> BadgesAcquired | None: 256 """Inserts or updates acquired badge record. 257 258 Args: 259 conn: database connection 260 consumer_id: consumer id 261 badge_id: badge id 262 level: badge level 263 264 Returns: 265 acquired badge 266 """ 267 if level == 1: 268 return await BadgeQuerier(conn).acquire_badge( 269 AcquireBadgeParams(user_id=consumer_id, badge_id=badge_id, level=level) 270 ) 271 return await BadgeQuerier(conn).update_badge_level( 272 UpdateBadgeLevelParams(user_id=consumer_id, badge_id=badge_id, level=level) 273 ) 274 275 @staticmethod 276 async def process_badges( 277 consumer_id: int, 278 badges_rules: list[BadgesConfig.BadgeRules], 279 bundle_window_start: datetime, 280 ) -> None: 281 """Background task to acquire badges. 282 283 Args: 284 consumer_id: consumer id 285 badges_rules: rules for all badges 286 bundle_window_start: start window for current collection 287 288 Raises: 289 ValueError: if failed to acquire badge 290 """ 291 async for conn in database_manager.get_connection(): 292 badges = [ 293 badges 294 async for badges in BadgeQuerier(conn).get_consumer_badges( 295 user_id=consumer_id 296 ) 297 ] 298 if not (acquire_badges := BadgeEngine.to_acquire(badges_rules, badges)): 299 return 300 reservations_full = [ 301 reservation 302 async for reservation in ReservationQuerier( 303 conn 304 ).get_consumers_reservations_full(consumer_id=consumer_id) 305 ] 306 for acquire_badge in acquire_badges: 307 rule = acquire_badge.rule 308 to_acquire = False 309 match rule: 310 case BadgesConfig.QuantityGoal(): 311 to_acquire = BadgeEngine.quantity_goal(reservations_full, rule) 312 case BadgesConfig.CO2Goal(): 313 to_acquire = BadgeEngine.co2_goal(reservations_full, rule) 314 case BadgesConfig.DiversityGoal(): 315 to_acquire = BadgeEngine.diversity_goal(reservations_full, rule) 316 case BadgesConfig.StreakGoal(): 317 to_acquire = BadgeEngine.streak_goal( 318 reservations_full, rule, bundle_window_start 319 ) 320 if not to_acquire: 321 continue 322 acquired_badge = await BadgeEngine.update_badge( 323 conn, consumer_id, acquire_badge.badge_id, acquire_badge.rule.level 324 ) 325 if not acquired_badge: 326 raise ValueError("Failed to insert acquire badge record")
Badges acquiring engine.
BadgeEngine(background_tasks: fastapi.background.BackgroundTasks)
34 def __init__(self, background_tasks: BackgroundTasks) -> None: 35 """Init engine for consumer.""" 36 self.background_tasks = background_tasks
Init engine for consumer.
def
run(self, consumer_id: int, bundle_window_start: datetime.datetime) -> None:
38 def run(self, consumer_id: int, bundle_window_start: datetime) -> None: 39 """Starts background badge check task. 40 41 Args: 42 consumer_id: consumer id 43 bundle_window_start: current collection window start 44 """ 45 badges_rules = badges_config.badges_rules.badges_rules 46 self.background_tasks.add_task( 47 self.process_badges, consumer_id, badges_rules, bundle_window_start 48 )
Starts background badge check task.
Arguments:
- consumer_id: consumer id
- bundle_window_start: current collection window start
@staticmethod
def
to_acquire( badges_rules: list[internal.settings.config.BadgesConfig.BadgeRules], acquired_badges: list[internal.queries.badge.GetConsumerBadgesRow]) -> list[BadgeEngine.AcquireBadgeRule]:
50 @staticmethod 51 def to_acquire( 52 badges_rules: list[BadgesConfig.BadgeRules], 53 acquired_badges: list[GetConsumerBadgesRow], 54 ) -> list[BadgeEngine.AcquireBadgeRule]: 55 """Gives levels of badges next to be acquired. 56 57 Args: 58 badges_rules: all rules for all badges 59 acquired_badges: badges acquired by consumer 60 61 Returns: 62 list of badges levels to acquire next 63 """ 64 acquired_levels = {badge.badge_id: badge.level for badge in acquired_badges} 65 to_acquire_badges: list[BadgeEngine.AcquireBadgeRule] = [] 66 for badge_rules in badges_rules: 67 current_level = acquired_levels.get(badge_rules.badge_id, 0) 68 next_level_rule = next( 69 (rule for rule in badge_rules.rules if rule.level > current_level), None 70 ) 71 if not next_level_rule: 72 continue 73 to_acquire_badges.append( 74 BadgeEngine.AcquireBadgeRule( 75 badge_id=badge_rules.badge_id, rule=next_level_rule 76 ) 77 ) 78 return to_acquire_badges
Gives levels of badges next to be acquired.
Arguments:
- badges_rules: all rules for all badges
- acquired_badges: badges acquired by consumer
Returns:
list of badges levels to acquire next
@staticmethod
def
quantity_goal( reservations_full: list[internal.queries.reservations.GetConsumersReservationsFullRow], rule: internal.settings.config.BadgesConfig.QuantityGoal) -> bool:
80 @staticmethod 81 def quantity_goal( 82 reservations_full: list[GetConsumersReservationsFullRow], 83 rule: BadgesConfig.QuantityGoal, 84 ) -> bool: 85 """Quantity goal rule check. 86 87 Args: 88 reservations_full: extended reservations info, 89 rule: rules for this badge 90 91 Returns: 92 if badge meets requirements 93 """ 94 reservations_filtered = [ 95 reservation 96 for reservation in reservations_full 97 if reservation.collected_at is not None 98 ] 99 if rule.category_id: 100 reservations_filtered = [ 101 reservation 102 for reservation in reservations_filtered 103 if rule.category_id in reservation.category_ids 104 ] 105 if rule.seller_id: 106 reservations_filtered = [ 107 reservation 108 for reservation in reservations_filtered 109 if reservation.seller_id == rule.seller_id 110 ] 111 return not len(reservations_filtered) < rule.quantity
Quantity goal rule check.
Arguments:
- reservations_full: extended reservations info,
- rule: rules for this badge
Returns:
if badge meets requirements
@staticmethod
def
co2_goal( reservations_full: list[internal.queries.reservations.GetConsumersReservationsFullRow], rule: internal.settings.config.BadgesConfig.CO2Goal) -> bool:
113 @staticmethod 114 def co2_goal( 115 reservations_full: list[GetConsumersReservationsFullRow], 116 rule: BadgesConfig.CO2Goal, 117 ) -> bool: 118 """CO2 goal rule check. 119 120 Args: 121 reservations_full: extended reservations info, 122 rule: rules for this badge 123 124 Returns: 125 if badge meets requirements 126 """ 127 reservations_filtered = [ 128 reservation 129 for reservation in reservations_full 130 if reservation.collected_at is not None 131 ] 132 carbon_dioxide = sum( 133 reservation.carbon_dioxide for reservation in reservations_filtered 134 ) 135 return not carbon_dioxide < rule.carbon_dioxide
CO2 goal rule check.
Arguments:
- reservations_full: extended reservations info,
- rule: rules for this badge
Returns:
if badge meets requirements
@staticmethod
def
diversity_goal( reservations_full: list[internal.queries.reservations.GetConsumersReservationsFullRow], rule: internal.settings.config.BadgesConfig.DiversityGoal) -> bool:
137 @staticmethod 138 def diversity_goal( 139 reservations_full: list[GetConsumersReservationsFullRow], 140 rule: BadgesConfig.DiversityGoal, 141 ) -> bool: 142 """Diversity goal rule check. 143 144 Args: 145 reservations_full: extended reservations info, 146 rule: rules for this badge 147 148 Returns: 149 if badge meets requirements 150 """ 151 reservations_filtered = [ 152 reservation 153 for reservation in reservations_full 154 if reservation.collected_at is not None 155 ] 156 if rule.dif_categories >= 1: 157 categories = { 158 cid 159 for reservation in reservations_filtered 160 for cid in reservation.category_ids 161 } 162 if len(categories) < rule.dif_categories: 163 return False 164 if rule.dif_sellers >= 1: 165 sellers = {reservation.seller_id for reservation in reservations_filtered} 166 if len(sellers) < rule.dif_sellers: 167 return False 168 return True
Diversity goal rule check.
Arguments:
- reservations_full: extended reservations info,
- rule: rules for this badge
Returns:
if badge meets requirements
@staticmethod
def
streak_goal_filter_sort( reservations_full: list[internal.queries.reservations.GetConsumersReservationsFullRow], rule: internal.settings.config.BadgesConfig.StreakGoal) -> list[internal.queries.reservations.GetConsumersReservationsFullRow]:
170 @staticmethod 171 def streak_goal_filter_sort( 172 reservations_full: list[GetConsumersReservationsFullRow], 173 rule: BadgesConfig.StreakGoal, 174 ) -> list[GetConsumersReservationsFullRow]: 175 """Filter and sort reservations for streak goal. 176 177 Args: 178 reservations_full: reservations with extended info 179 rule: badge rules 180 181 Returns: 182 list of sorted and filtered reservations 183 """ 184 reservations_filtered = reservations_full.copy() 185 if rule.category_id: 186 reservations_filtered = [ 187 reservation 188 for reservation in reservations_filtered 189 if rule.category_id in reservation.category_ids 190 ] 191 if rule.seller_id: 192 reservations_filtered = [ 193 reservation 194 for reservation in reservations_filtered 195 if reservation.seller_id == rule.seller_id 196 ] 197 return sorted( 198 reservations_filtered, 199 key=lambda reservation: reservation.window_start, 200 reverse=True, 201 )
Filter and sort reservations for streak goal.
Arguments:
- reservations_full: reservations with extended info
- rule: badge rules
Returns:
list of sorted and filtered reservations
@staticmethod
def
streak_goal( reservations_full: list[internal.queries.reservations.GetConsumersReservationsFullRow], rule: internal.settings.config.BadgesConfig.StreakGoal, bundle_window_start: datetime.datetime) -> bool:
203 @staticmethod 204 def streak_goal( 205 reservations_full: list[GetConsumersReservationsFullRow], 206 rule: BadgesConfig.StreakGoal, 207 bundle_window_start: datetime, 208 ) -> bool: 209 """Streak goal rule check. 210 211 Args: 212 reservations_full: extended reservations info, 213 rule: rules for this badge 214 bundle_window_start: window start for current collection 215 216 Returns: 217 if badge meets requirements 218 """ 219 reservations_sorted = BadgeEngine.streak_goal_filter_sort( 220 reservations_full, rule 221 ) 222 reservations_streak: list[GetConsumersReservationsFullRow] = [] 223 for reservation in reservations_sorted: 224 if ( 225 not reservation.collected_at is not None 226 and reservation.window_end > datetime.now(tz=UTC) 227 ): 228 continue 229 if not reservation.collected_at: 230 break 231 reservations_streak.append(reservation) 232 if ( 233 rule.streak_quantity >= 1 234 and len(reservations_streak) < rule.streak_quantity 235 ): 236 return False 237 if rule.streak_days >= 1: 238 days = 0 239 previous_date = bundle_window_start.date() + timedelta(days=1) 240 for reservation in reservations_streak: 241 reservation_date = reservation.window_start.date() 242 if reservation_date + timedelta(days=1) == previous_date: 243 days += 1 244 previous_date = reservation_date 245 continue 246 if reservation_date != previous_date: 247 break 248 if days < rule.streak_days: 249 return False 250 return True
Streak goal rule check.
Arguments:
- reservations_full: extended reservations info,
- rule: rules for this badge
- bundle_window_start: window start for current collection
Returns:
if badge meets requirements
@staticmethod
async def
update_badge( conn: sqlalchemy.ext.asyncio.engine.AsyncConnection, consumer_id: int, badge_id: int, level: int) -> internal.queries.models.BadgesAcquired | None:
252 @staticmethod 253 async def update_badge( 254 conn: AsyncConnection, consumer_id: int, badge_id: int, level: int 255 ) -> BadgesAcquired | None: 256 """Inserts or updates acquired badge record. 257 258 Args: 259 conn: database connection 260 consumer_id: consumer id 261 badge_id: badge id 262 level: badge level 263 264 Returns: 265 acquired badge 266 """ 267 if level == 1: 268 return await BadgeQuerier(conn).acquire_badge( 269 AcquireBadgeParams(user_id=consumer_id, badge_id=badge_id, level=level) 270 ) 271 return await BadgeQuerier(conn).update_badge_level( 272 UpdateBadgeLevelParams(user_id=consumer_id, badge_id=badge_id, level=level) 273 )
Inserts or updates acquired badge record.
Arguments:
- conn: database connection
- consumer_id: consumer id
- badge_id: badge id
- level: badge level
Returns:
acquired badge
@staticmethod
async def
process_badges( consumer_id: int, badges_rules: list[internal.settings.config.BadgesConfig.BadgeRules], bundle_window_start: datetime.datetime) -> None:
275 @staticmethod 276 async def process_badges( 277 consumer_id: int, 278 badges_rules: list[BadgesConfig.BadgeRules], 279 bundle_window_start: datetime, 280 ) -> None: 281 """Background task to acquire badges. 282 283 Args: 284 consumer_id: consumer id 285 badges_rules: rules for all badges 286 bundle_window_start: start window for current collection 287 288 Raises: 289 ValueError: if failed to acquire badge 290 """ 291 async for conn in database_manager.get_connection(): 292 badges = [ 293 badges 294 async for badges in BadgeQuerier(conn).get_consumer_badges( 295 user_id=consumer_id 296 ) 297 ] 298 if not (acquire_badges := BadgeEngine.to_acquire(badges_rules, badges)): 299 return 300 reservations_full = [ 301 reservation 302 async for reservation in ReservationQuerier( 303 conn 304 ).get_consumers_reservations_full(consumer_id=consumer_id) 305 ] 306 for acquire_badge in acquire_badges: 307 rule = acquire_badge.rule 308 to_acquire = False 309 match rule: 310 case BadgesConfig.QuantityGoal(): 311 to_acquire = BadgeEngine.quantity_goal(reservations_full, rule) 312 case BadgesConfig.CO2Goal(): 313 to_acquire = BadgeEngine.co2_goal(reservations_full, rule) 314 case BadgesConfig.DiversityGoal(): 315 to_acquire = BadgeEngine.diversity_goal(reservations_full, rule) 316 case BadgesConfig.StreakGoal(): 317 to_acquire = BadgeEngine.streak_goal( 318 reservations_full, rule, bundle_window_start 319 ) 320 if not to_acquire: 321 continue 322 acquired_badge = await BadgeEngine.update_badge( 323 conn, consumer_id, acquire_badge.badge_id, acquire_badge.rule.level 324 ) 325 if not acquired_badge: 326 raise ValueError("Failed to insert acquire badge record")
Background task to acquire badges.
Arguments:
- consumer_id: consumer id
- badges_rules: rules for all badges
- bundle_window_start: start window for current collection
Raises:
- ValueError: if failed to acquire badge
class
BadgeEngine.AcquireBadgeRule(pydantic.main.BaseModel):
28 class AcquireBadgeRule(BaseModel): 29 """Next badge level to acquire.""" 30 31 badge_id: int 32 rule: BadgesConfig.BadgeGoal
Next badge level to acquire.
rule: Annotated[internal.settings.config.BadgesConfig.QuantityGoal | internal.settings.config.BadgesConfig.CO2Goal | internal.settings.config.BadgesConfig.DiversityGoal | internal.settings.config.BadgesConfig.StreakGoal, FieldInfo(annotation=NoneType, required=True, discriminator='type')] =
PydanticUndefined