internal.badges.engine

Badges processing engine.

  1"""Badges processing engine."""
  2
  3from datetime import UTC, datetime, timedelta
  4
  5from fastapi import BackgroundTasks
  6from pydantic import BaseModel
  7from sqlalchemy.ext.asyncio import AsyncConnection
  8
  9from internal.database.manager import database_manager
 10from internal.queries.badge import (
 11    AcquireBadgeParams,
 12    GetConsumerBadgesRow,
 13    UpdateBadgeLevelParams,
 14)
 15from internal.queries.badge import AsyncQuerier as BadgeQuerier
 16from internal.queries.models import BadgesAcquired
 17from internal.queries.reservations import AsyncQuerier as ReservationQuerier
 18from internal.queries.reservations import GetConsumersReservationsFullRow
 19from internal.settings.config import BadgesConfig, badges_config
 20
 21
 22class BadgeEngine:
 23    """Badges acquiring engine."""
 24
 25    background_tasks: BackgroundTasks
 26
 27    class AcquireBadgeRule(BaseModel):
 28        """Next badge level to acquire."""
 29
 30        badge_id: int
 31        rule: BadgesConfig.BadgeGoal
 32
 33    def __init__(self, background_tasks: BackgroundTasks) -> None:
 34        """Init engine for consumer."""
 35        self.background_tasks = background_tasks
 36
 37    def run(self, consumer_id: int, bundle_window_start: datetime) -> None:
 38        """Starts background badge check task.
 39
 40        Args:
 41            consumer_id: consumer id
 42            bundle_window_start: current collection window start
 43        """
 44        badges_rules = badges_config.badges_rules.badges_rules
 45        self.background_tasks.add_task(
 46            self.process_badges, consumer_id, badges_rules, bundle_window_start
 47        )
 48
 49    @staticmethod
 50    def to_acquire(
 51        badges_rules: list[BadgesConfig.BadgeRules],
 52        acquired_badges: list[GetConsumerBadgesRow],
 53    ) -> list[BadgeEngine.AcquireBadgeRule]:
 54        """Gives levels of badges next to be acquired.
 55
 56        Args:
 57            badges_rules: all rules for all badges
 58            acquired_badges: badges acquired by consumer
 59
 60        Returns:
 61            list of badges levels to acquire next
 62        """
 63        acquired_levels = {badge.badge_id: badge.level for badge in acquired_badges}
 64        to_acquire_badges: list[BadgeEngine.AcquireBadgeRule] = []
 65        for badge_rules in badges_rules:
 66            current_level = acquired_levels.get(badge_rules.badge_id, 0)
 67            next_level_rule = next(
 68                (rule for rule in badge_rules.rules if rule.level > current_level), None
 69            )
 70            if not next_level_rule:
 71                continue
 72            to_acquire_badges.append(
 73                BadgeEngine.AcquireBadgeRule(
 74                    badge_id=badge_rules.badge_id, rule=next_level_rule
 75                )
 76            )
 77        return to_acquire_badges
 78
 79    @staticmethod
 80    def quantity_goal(
 81        reservations_full: list[GetConsumersReservationsFullRow],
 82        rule: BadgesConfig.QuantityGoal,
 83    ) -> bool:
 84        """Quantity goal rule check.
 85
 86        Args:
 87            reservations_full: extended reservations info,
 88            rule: rules for this badge
 89
 90        Returns:
 91            if badge meets requirements
 92        """
 93        reservations_filtered = [
 94            reservation
 95            for reservation in reservations_full
 96            if reservation.collected_at is not None
 97        ]
 98        if rule.category_id:
 99            reservations_filtered = [
100                reservation
101                for reservation in reservations_filtered
102                if rule.category_id in reservation.category_ids
103            ]
104        if rule.seller_id:
105            reservations_filtered = [
106                reservation
107                for reservation in reservations_filtered
108                if reservation.seller_id == rule.seller_id
109            ]
110        return not len(reservations_filtered) < rule.quantity
111
112    @staticmethod
113    def co2_goal(
114        reservations_full: list[GetConsumersReservationsFullRow],
115        rule: BadgesConfig.CO2Goal,
116    ) -> bool:
117        """CO2 goal rule check.
118
119        Args:
120            reservations_full: extended reservations info,
121            rule: rules for this badge
122
123        Returns:
124            if badge meets requirements
125        """
126        reservations_filtered = [
127            reservation
128            for reservation in reservations_full
129            if reservation.collected_at is not None
130        ]
131        carbon_dioxide = sum(
132            reservation.carbon_dioxide for reservation in reservations_filtered
133        )
134        return not carbon_dioxide < rule.carbon_dioxide
135
136    @staticmethod
137    def diversity_goal(
138        reservations_full: list[GetConsumersReservationsFullRow],
139        rule: BadgesConfig.DiversityGoal,
140    ) -> bool:
141        """Diversity goal rule check.
142
143        Args:
144            reservations_full: extended reservations info,
145            rule: rules for this badge
146
147        Returns:
148            if badge meets requirements
149        """
150        reservations_filtered = [
151            reservation
152            for reservation in reservations_full
153            if reservation.collected_at is not None
154        ]
155        if rule.dif_categories >= 1:
156            categories = {
157                cid
158                for reservation in reservations_filtered
159                for cid in reservation.category_ids
160            }
161            if len(categories) < rule.dif_categories:
162                return False
163        if rule.dif_sellers >= 1:
164            sellers = {reservation.seller_id for reservation in reservations_filtered}
165            if len(sellers) < rule.dif_sellers:
166                return False
167        return True
168
169    @staticmethod
170    def streak_goal_filter_sort(
171        reservations_full: list[GetConsumersReservationsFullRow],
172        rule: BadgesConfig.StreakGoal,
173    ) -> list[GetConsumersReservationsFullRow]:
174        """Filter and sort reservations for streak goal.
175
176        Args:
177            reservations_full: reservations with extended info
178            rule: badge rules
179
180        Returns:
181            list of sorted and filtered reservations
182        """
183        reservations_filtered = reservations_full.copy()
184        if rule.category_id:
185            reservations_filtered = [
186                reservation
187                for reservation in reservations_filtered
188                if rule.category_id in reservation.category_ids
189            ]
190        if rule.seller_id:
191            reservations_filtered = [
192                reservation
193                for reservation in reservations_filtered
194                if reservation.seller_id == rule.seller_id
195            ]
196        return sorted(
197            reservations_filtered,
198            key=lambda reservation: reservation.window_start,
199            reverse=True,
200        )
201
202    @staticmethod
203    def streak_goal(
204        reservations_full: list[GetConsumersReservationsFullRow],
205        rule: BadgesConfig.StreakGoal,
206        bundle_window_start: datetime,
207    ) -> bool:
208        """Streak goal rule check.
209
210        Args:
211            reservations_full: extended reservations info,
212            rule: rules for this badge
213            bundle_window_start: window start for current collection
214
215        Returns:
216            if badge meets requirements
217        """
218        reservations_sorted = BadgeEngine.streak_goal_filter_sort(
219            reservations_full, rule
220        )
221        reservations_streak: list[GetConsumersReservationsFullRow] = []
222        for reservation in reservations_sorted:
223            if (
224                not reservation.collected_at is not None
225                and reservation.window_end > datetime.now(tz=UTC)
226            ):
227                continue
228            if not reservation.collected_at:
229                break
230            reservations_streak.append(reservation)
231        if (
232            rule.streak_quantity >= 1
233            and len(reservations_streak) < rule.streak_quantity
234        ):
235            return False
236        if rule.streak_days >= 1:
237            days = 0
238            previous_date = bundle_window_start.date() + timedelta(days=1)
239            for reservation in reservations_streak:
240                reservation_date = reservation.window_start.date()
241                if reservation_date + timedelta(days=1) == previous_date:
242                    days += 1
243                    previous_date = reservation_date
244                    continue
245                if reservation_date != previous_date:
246                    break
247            if days < rule.streak_days:
248                return False
249        return True
250
251    @staticmethod
252    async def update_badge(
253        conn: AsyncConnection, consumer_id: int, badge_id: int, level: int
254    ) -> BadgesAcquired | None:
255        """Inserts or updates acquired badge record.
256
257        Args:
258            conn: database connection
259            consumer_id: consumer id
260            badge_id: badge id
261            level: badge level
262
263        Returns:
264            acquired badge
265        """
266        if level == 1:
267            return await BadgeQuerier(conn).acquire_badge(
268                AcquireBadgeParams(user_id=consumer_id, badge_id=badge_id, level=level)
269            )
270        return await BadgeQuerier(conn).update_badge_level(
271            UpdateBadgeLevelParams(user_id=consumer_id, badge_id=badge_id, level=level)
272        )
273
274    @staticmethod
275    async def process_badges(
276        consumer_id: int,
277        badges_rules: list[BadgesConfig.BadgeRules],
278        bundle_window_start: datetime,
279    ) -> None:
280        """Background task to acquire badges.
281
282        Args:
283            consumer_id: consumer id
284            badges_rules: rules for all badges
285            bundle_window_start: start window for current collection
286
287        Raises:
288            ValueError: if failed to acquire badge
289        """
290        async for conn in database_manager.get_connection():
291            badges = [
292                badges
293                async for badges in BadgeQuerier(conn).get_consumer_badges(
294                    user_id=consumer_id
295                )
296            ]
297            if not (acquire_badges := BadgeEngine.to_acquire(badges_rules, badges)):
298                return
299            reservations_full = [
300                reservation
301                async for reservation in ReservationQuerier(
302                    conn
303                ).get_consumers_reservations_full(consumer_id=consumer_id)
304            ]
305            for acquire_badge in acquire_badges:
306                rule = acquire_badge.rule
307                to_acquire = False
308                match rule:
309                    case BadgesConfig.QuantityGoal():
310                        to_acquire = BadgeEngine.quantity_goal(reservations_full, rule)
311                    case BadgesConfig.CO2Goal():
312                        to_acquire = BadgeEngine.co2_goal(reservations_full, rule)
313                    case BadgesConfig.DiversityGoal():
314                        to_acquire = BadgeEngine.diversity_goal(reservations_full, rule)
315                    case BadgesConfig.StreakGoal():
316                        to_acquire = BadgeEngine.streak_goal(
317                            reservations_full, rule, bundle_window_start
318                        )
319                if not to_acquire:
320                    continue
321                acquired_badge = await BadgeEngine.update_badge(
322                    conn, consumer_id, acquire_badge.badge_id, acquire_badge.rule.level
323                )
324                if not acquired_badge:
325                    raise ValueError("Failed to insert acquire badge record")
class BadgeEngine:
 23class BadgeEngine:
 24    """Badges acquiring engine."""
 25
 26    background_tasks: BackgroundTasks
 27
 28    class AcquireBadgeRule(BaseModel):
 29        """Next badge level to acquire."""
 30
 31        badge_id: int
 32        rule: BadgesConfig.BadgeGoal
 33
 34    def __init__(self, background_tasks: BackgroundTasks) -> None:
 35        """Init engine for consumer."""
 36        self.background_tasks = background_tasks
 37
 38    def run(self, consumer_id: int, bundle_window_start: datetime) -> None:
 39        """Starts background badge check task.
 40
 41        Args:
 42            consumer_id: consumer id
 43            bundle_window_start: current collection window start
 44        """
 45        badges_rules = badges_config.badges_rules.badges_rules
 46        self.background_tasks.add_task(
 47            self.process_badges, consumer_id, badges_rules, bundle_window_start
 48        )
 49
 50    @staticmethod
 51    def to_acquire(
 52        badges_rules: list[BadgesConfig.BadgeRules],
 53        acquired_badges: list[GetConsumerBadgesRow],
 54    ) -> list[BadgeEngine.AcquireBadgeRule]:
 55        """Gives levels of badges next to be acquired.
 56
 57        Args:
 58            badges_rules: all rules for all badges
 59            acquired_badges: badges acquired by consumer
 60
 61        Returns:
 62            list of badges levels to acquire next
 63        """
 64        acquired_levels = {badge.badge_id: badge.level for badge in acquired_badges}
 65        to_acquire_badges: list[BadgeEngine.AcquireBadgeRule] = []
 66        for badge_rules in badges_rules:
 67            current_level = acquired_levels.get(badge_rules.badge_id, 0)
 68            next_level_rule = next(
 69                (rule for rule in badge_rules.rules if rule.level > current_level), None
 70            )
 71            if not next_level_rule:
 72                continue
 73            to_acquire_badges.append(
 74                BadgeEngine.AcquireBadgeRule(
 75                    badge_id=badge_rules.badge_id, rule=next_level_rule
 76                )
 77            )
 78        return to_acquire_badges
 79
 80    @staticmethod
 81    def quantity_goal(
 82        reservations_full: list[GetConsumersReservationsFullRow],
 83        rule: BadgesConfig.QuantityGoal,
 84    ) -> bool:
 85        """Quantity goal rule check.
 86
 87        Args:
 88            reservations_full: extended reservations info,
 89            rule: rules for this badge
 90
 91        Returns:
 92            if badge meets requirements
 93        """
 94        reservations_filtered = [
 95            reservation
 96            for reservation in reservations_full
 97            if reservation.collected_at is not None
 98        ]
 99        if rule.category_id:
100            reservations_filtered = [
101                reservation
102                for reservation in reservations_filtered
103                if rule.category_id in reservation.category_ids
104            ]
105        if rule.seller_id:
106            reservations_filtered = [
107                reservation
108                for reservation in reservations_filtered
109                if reservation.seller_id == rule.seller_id
110            ]
111        return not len(reservations_filtered) < rule.quantity
112
113    @staticmethod
114    def co2_goal(
115        reservations_full: list[GetConsumersReservationsFullRow],
116        rule: BadgesConfig.CO2Goal,
117    ) -> bool:
118        """CO2 goal rule check.
119
120        Args:
121            reservations_full: extended reservations info,
122            rule: rules for this badge
123
124        Returns:
125            if badge meets requirements
126        """
127        reservations_filtered = [
128            reservation
129            for reservation in reservations_full
130            if reservation.collected_at is not None
131        ]
132        carbon_dioxide = sum(
133            reservation.carbon_dioxide for reservation in reservations_filtered
134        )
135        return not carbon_dioxide < rule.carbon_dioxide
136
137    @staticmethod
138    def diversity_goal(
139        reservations_full: list[GetConsumersReservationsFullRow],
140        rule: BadgesConfig.DiversityGoal,
141    ) -> bool:
142        """Diversity goal rule check.
143
144        Args:
145            reservations_full: extended reservations info,
146            rule: rules for this badge
147
148        Returns:
149            if badge meets requirements
150        """
151        reservations_filtered = [
152            reservation
153            for reservation in reservations_full
154            if reservation.collected_at is not None
155        ]
156        if rule.dif_categories >= 1:
157            categories = {
158                cid
159                for reservation in reservations_filtered
160                for cid in reservation.category_ids
161            }
162            if len(categories) < rule.dif_categories:
163                return False
164        if rule.dif_sellers >= 1:
165            sellers = {reservation.seller_id for reservation in reservations_filtered}
166            if len(sellers) < rule.dif_sellers:
167                return False
168        return True
169
170    @staticmethod
171    def streak_goal_filter_sort(
172        reservations_full: list[GetConsumersReservationsFullRow],
173        rule: BadgesConfig.StreakGoal,
174    ) -> list[GetConsumersReservationsFullRow]:
175        """Filter and sort reservations for streak goal.
176
177        Args:
178            reservations_full: reservations with extended info
179            rule: badge rules
180
181        Returns:
182            list of sorted and filtered reservations
183        """
184        reservations_filtered = reservations_full.copy()
185        if rule.category_id:
186            reservations_filtered = [
187                reservation
188                for reservation in reservations_filtered
189                if rule.category_id in reservation.category_ids
190            ]
191        if rule.seller_id:
192            reservations_filtered = [
193                reservation
194                for reservation in reservations_filtered
195                if reservation.seller_id == rule.seller_id
196            ]
197        return sorted(
198            reservations_filtered,
199            key=lambda reservation: reservation.window_start,
200            reverse=True,
201        )
202
203    @staticmethod
204    def streak_goal(
205        reservations_full: list[GetConsumersReservationsFullRow],
206        rule: BadgesConfig.StreakGoal,
207        bundle_window_start: datetime,
208    ) -> bool:
209        """Streak goal rule check.
210
211        Args:
212            reservations_full: extended reservations info,
213            rule: rules for this badge
214            bundle_window_start: window start for current collection
215
216        Returns:
217            if badge meets requirements
218        """
219        reservations_sorted = BadgeEngine.streak_goal_filter_sort(
220            reservations_full, rule
221        )
222        reservations_streak: list[GetConsumersReservationsFullRow] = []
223        for reservation in reservations_sorted:
224            if (
225                not reservation.collected_at is not None
226                and reservation.window_end > datetime.now(tz=UTC)
227            ):
228                continue
229            if not reservation.collected_at:
230                break
231            reservations_streak.append(reservation)
232        if (
233            rule.streak_quantity >= 1
234            and len(reservations_streak) < rule.streak_quantity
235        ):
236            return False
237        if rule.streak_days >= 1:
238            days = 0
239            previous_date = bundle_window_start.date() + timedelta(days=1)
240            for reservation in reservations_streak:
241                reservation_date = reservation.window_start.date()
242                if reservation_date + timedelta(days=1) == previous_date:
243                    days += 1
244                    previous_date = reservation_date
245                    continue
246                if reservation_date != previous_date:
247                    break
248            if days < rule.streak_days:
249                return False
250        return True
251
252    @staticmethod
253    async def update_badge(
254        conn: AsyncConnection, consumer_id: int, badge_id: int, level: int
255    ) -> BadgesAcquired | None:
256        """Inserts or updates acquired badge record.
257
258        Args:
259            conn: database connection
260            consumer_id: consumer id
261            badge_id: badge id
262            level: badge level
263
264        Returns:
265            acquired badge
266        """
267        if level == 1:
268            return await BadgeQuerier(conn).acquire_badge(
269                AcquireBadgeParams(user_id=consumer_id, badge_id=badge_id, level=level)
270            )
271        return await BadgeQuerier(conn).update_badge_level(
272            UpdateBadgeLevelParams(user_id=consumer_id, badge_id=badge_id, level=level)
273        )
274
275    @staticmethod
276    async def process_badges(
277        consumer_id: int,
278        badges_rules: list[BadgesConfig.BadgeRules],
279        bundle_window_start: datetime,
280    ) -> None:
281        """Background task to acquire badges.
282
283        Args:
284            consumer_id: consumer id
285            badges_rules: rules for all badges
286            bundle_window_start: start window for current collection
287
288        Raises:
289            ValueError: if failed to acquire badge
290        """
291        async for conn in database_manager.get_connection():
292            badges = [
293                badges
294                async for badges in BadgeQuerier(conn).get_consumer_badges(
295                    user_id=consumer_id
296                )
297            ]
298            if not (acquire_badges := BadgeEngine.to_acquire(badges_rules, badges)):
299                return
300            reservations_full = [
301                reservation
302                async for reservation in ReservationQuerier(
303                    conn
304                ).get_consumers_reservations_full(consumer_id=consumer_id)
305            ]
306            for acquire_badge in acquire_badges:
307                rule = acquire_badge.rule
308                to_acquire = False
309                match rule:
310                    case BadgesConfig.QuantityGoal():
311                        to_acquire = BadgeEngine.quantity_goal(reservations_full, rule)
312                    case BadgesConfig.CO2Goal():
313                        to_acquire = BadgeEngine.co2_goal(reservations_full, rule)
314                    case BadgesConfig.DiversityGoal():
315                        to_acquire = BadgeEngine.diversity_goal(reservations_full, rule)
316                    case BadgesConfig.StreakGoal():
317                        to_acquire = BadgeEngine.streak_goal(
318                            reservations_full, rule, bundle_window_start
319                        )
320                if not to_acquire:
321                    continue
322                acquired_badge = await BadgeEngine.update_badge(
323                    conn, consumer_id, acquire_badge.badge_id, acquire_badge.rule.level
324                )
325                if not acquired_badge:
326                    raise ValueError("Failed to insert acquire badge record")

Badges acquiring engine.

BadgeEngine(background_tasks: fastapi.background.BackgroundTasks)
34    def __init__(self, background_tasks: BackgroundTasks) -> None:
35        """Init engine for consumer."""
36        self.background_tasks = background_tasks

Init engine for consumer.

background_tasks: fastapi.background.BackgroundTasks
def run(self, consumer_id: int, bundle_window_start: datetime.datetime) -> None:
38    def run(self, consumer_id: int, bundle_window_start: datetime) -> None:
39        """Starts background badge check task.
40
41        Args:
42            consumer_id: consumer id
43            bundle_window_start: current collection window start
44        """
45        badges_rules = badges_config.badges_rules.badges_rules
46        self.background_tasks.add_task(
47            self.process_badges, consumer_id, badges_rules, bundle_window_start
48        )

Starts background badge check task.

Arguments:
  • consumer_id: consumer id
  • bundle_window_start: current collection window start
@staticmethod
def to_acquire( badges_rules: list[internal.settings.config.BadgesConfig.BadgeRules], acquired_badges: list[internal.queries.badge.GetConsumerBadgesRow]) -> list[BadgeEngine.AcquireBadgeRule]:
50    @staticmethod
51    def to_acquire(
52        badges_rules: list[BadgesConfig.BadgeRules],
53        acquired_badges: list[GetConsumerBadgesRow],
54    ) -> list[BadgeEngine.AcquireBadgeRule]:
55        """Gives levels of badges next to be acquired.
56
57        Args:
58            badges_rules: all rules for all badges
59            acquired_badges: badges acquired by consumer
60
61        Returns:
62            list of badges levels to acquire next
63        """
64        acquired_levels = {badge.badge_id: badge.level for badge in acquired_badges}
65        to_acquire_badges: list[BadgeEngine.AcquireBadgeRule] = []
66        for badge_rules in badges_rules:
67            current_level = acquired_levels.get(badge_rules.badge_id, 0)
68            next_level_rule = next(
69                (rule for rule in badge_rules.rules if rule.level > current_level), None
70            )
71            if not next_level_rule:
72                continue
73            to_acquire_badges.append(
74                BadgeEngine.AcquireBadgeRule(
75                    badge_id=badge_rules.badge_id, rule=next_level_rule
76                )
77            )
78        return to_acquire_badges

Gives levels of badges next to be acquired.

Arguments:
  • badges_rules: all rules for all badges
  • acquired_badges: badges acquired by consumer
Returns:

list of badges levels to acquire next

@staticmethod
def quantity_goal( reservations_full: list[internal.queries.reservations.GetConsumersReservationsFullRow], rule: internal.settings.config.BadgesConfig.QuantityGoal) -> bool:
 80    @staticmethod
 81    def quantity_goal(
 82        reservations_full: list[GetConsumersReservationsFullRow],
 83        rule: BadgesConfig.QuantityGoal,
 84    ) -> bool:
 85        """Quantity goal rule check.
 86
 87        Args:
 88            reservations_full: extended reservations info,
 89            rule: rules for this badge
 90
 91        Returns:
 92            if badge meets requirements
 93        """
 94        reservations_filtered = [
 95            reservation
 96            for reservation in reservations_full
 97            if reservation.collected_at is not None
 98        ]
 99        if rule.category_id:
100            reservations_filtered = [
101                reservation
102                for reservation in reservations_filtered
103                if rule.category_id in reservation.category_ids
104            ]
105        if rule.seller_id:
106            reservations_filtered = [
107                reservation
108                for reservation in reservations_filtered
109                if reservation.seller_id == rule.seller_id
110            ]
111        return not len(reservations_filtered) < rule.quantity

Quantity goal rule check.

Arguments:
  • reservations_full: extended reservations info,
  • rule: rules for this badge
Returns:

if badge meets requirements

@staticmethod
def co2_goal( reservations_full: list[internal.queries.reservations.GetConsumersReservationsFullRow], rule: internal.settings.config.BadgesConfig.CO2Goal) -> bool:
113    @staticmethod
114    def co2_goal(
115        reservations_full: list[GetConsumersReservationsFullRow],
116        rule: BadgesConfig.CO2Goal,
117    ) -> bool:
118        """CO2 goal rule check.
119
120        Args:
121            reservations_full: extended reservations info,
122            rule: rules for this badge
123
124        Returns:
125            if badge meets requirements
126        """
127        reservations_filtered = [
128            reservation
129            for reservation in reservations_full
130            if reservation.collected_at is not None
131        ]
132        carbon_dioxide = sum(
133            reservation.carbon_dioxide for reservation in reservations_filtered
134        )
135        return not carbon_dioxide < rule.carbon_dioxide

CO2 goal rule check.

Arguments:
  • reservations_full: extended reservations info,
  • rule: rules for this badge
Returns:

if badge meets requirements

@staticmethod
def diversity_goal( reservations_full: list[internal.queries.reservations.GetConsumersReservationsFullRow], rule: internal.settings.config.BadgesConfig.DiversityGoal) -> bool:
137    @staticmethod
138    def diversity_goal(
139        reservations_full: list[GetConsumersReservationsFullRow],
140        rule: BadgesConfig.DiversityGoal,
141    ) -> bool:
142        """Diversity goal rule check.
143
144        Args:
145            reservations_full: extended reservations info,
146            rule: rules for this badge
147
148        Returns:
149            if badge meets requirements
150        """
151        reservations_filtered = [
152            reservation
153            for reservation in reservations_full
154            if reservation.collected_at is not None
155        ]
156        if rule.dif_categories >= 1:
157            categories = {
158                cid
159                for reservation in reservations_filtered
160                for cid in reservation.category_ids
161            }
162            if len(categories) < rule.dif_categories:
163                return False
164        if rule.dif_sellers >= 1:
165            sellers = {reservation.seller_id for reservation in reservations_filtered}
166            if len(sellers) < rule.dif_sellers:
167                return False
168        return True

Diversity goal rule check.

Arguments:
  • reservations_full: extended reservations info,
  • rule: rules for this badge
Returns:

if badge meets requirements

170    @staticmethod
171    def streak_goal_filter_sort(
172        reservations_full: list[GetConsumersReservationsFullRow],
173        rule: BadgesConfig.StreakGoal,
174    ) -> list[GetConsumersReservationsFullRow]:
175        """Filter and sort reservations for streak goal.
176
177        Args:
178            reservations_full: reservations with extended info
179            rule: badge rules
180
181        Returns:
182            list of sorted and filtered reservations
183        """
184        reservations_filtered = reservations_full.copy()
185        if rule.category_id:
186            reservations_filtered = [
187                reservation
188                for reservation in reservations_filtered
189                if rule.category_id in reservation.category_ids
190            ]
191        if rule.seller_id:
192            reservations_filtered = [
193                reservation
194                for reservation in reservations_filtered
195                if reservation.seller_id == rule.seller_id
196            ]
197        return sorted(
198            reservations_filtered,
199            key=lambda reservation: reservation.window_start,
200            reverse=True,
201        )

Filter and sort reservations for streak goal.

Arguments:
  • reservations_full: reservations with extended info
  • rule: badge rules
Returns:

list of sorted and filtered reservations

@staticmethod
def streak_goal( reservations_full: list[internal.queries.reservations.GetConsumersReservationsFullRow], rule: internal.settings.config.BadgesConfig.StreakGoal, bundle_window_start: datetime.datetime) -> bool:
203    @staticmethod
204    def streak_goal(
205        reservations_full: list[GetConsumersReservationsFullRow],
206        rule: BadgesConfig.StreakGoal,
207        bundle_window_start: datetime,
208    ) -> bool:
209        """Streak goal rule check.
210
211        Args:
212            reservations_full: extended reservations info,
213            rule: rules for this badge
214            bundle_window_start: window start for current collection
215
216        Returns:
217            if badge meets requirements
218        """
219        reservations_sorted = BadgeEngine.streak_goal_filter_sort(
220            reservations_full, rule
221        )
222        reservations_streak: list[GetConsumersReservationsFullRow] = []
223        for reservation in reservations_sorted:
224            if (
225                not reservation.collected_at is not None
226                and reservation.window_end > datetime.now(tz=UTC)
227            ):
228                continue
229            if not reservation.collected_at:
230                break
231            reservations_streak.append(reservation)
232        if (
233            rule.streak_quantity >= 1
234            and len(reservations_streak) < rule.streak_quantity
235        ):
236            return False
237        if rule.streak_days >= 1:
238            days = 0
239            previous_date = bundle_window_start.date() + timedelta(days=1)
240            for reservation in reservations_streak:
241                reservation_date = reservation.window_start.date()
242                if reservation_date + timedelta(days=1) == previous_date:
243                    days += 1
244                    previous_date = reservation_date
245                    continue
246                if reservation_date != previous_date:
247                    break
248            if days < rule.streak_days:
249                return False
250        return True

Streak goal rule check.

Arguments:
  • reservations_full: extended reservations info,
  • rule: rules for this badge
  • bundle_window_start: window start for current collection
Returns:

if badge meets requirements

@staticmethod
async def update_badge( conn: sqlalchemy.ext.asyncio.engine.AsyncConnection, consumer_id: int, badge_id: int, level: int) -> internal.queries.models.BadgesAcquired | None:
252    @staticmethod
253    async def update_badge(
254        conn: AsyncConnection, consumer_id: int, badge_id: int, level: int
255    ) -> BadgesAcquired | None:
256        """Inserts or updates acquired badge record.
257
258        Args:
259            conn: database connection
260            consumer_id: consumer id
261            badge_id: badge id
262            level: badge level
263
264        Returns:
265            acquired badge
266        """
267        if level == 1:
268            return await BadgeQuerier(conn).acquire_badge(
269                AcquireBadgeParams(user_id=consumer_id, badge_id=badge_id, level=level)
270            )
271        return await BadgeQuerier(conn).update_badge_level(
272            UpdateBadgeLevelParams(user_id=consumer_id, badge_id=badge_id, level=level)
273        )

Inserts or updates acquired badge record.

Arguments:
  • conn: database connection
  • consumer_id: consumer id
  • badge_id: badge id
  • level: badge level
Returns:

acquired badge

@staticmethod
async def process_badges( consumer_id: int, badges_rules: list[internal.settings.config.BadgesConfig.BadgeRules], bundle_window_start: datetime.datetime) -> None:
275    @staticmethod
276    async def process_badges(
277        consumer_id: int,
278        badges_rules: list[BadgesConfig.BadgeRules],
279        bundle_window_start: datetime,
280    ) -> None:
281        """Background task to acquire badges.
282
283        Args:
284            consumer_id: consumer id
285            badges_rules: rules for all badges
286            bundle_window_start: start window for current collection
287
288        Raises:
289            ValueError: if failed to acquire badge
290        """
291        async for conn in database_manager.get_connection():
292            badges = [
293                badges
294                async for badges in BadgeQuerier(conn).get_consumer_badges(
295                    user_id=consumer_id
296                )
297            ]
298            if not (acquire_badges := BadgeEngine.to_acquire(badges_rules, badges)):
299                return
300            reservations_full = [
301                reservation
302                async for reservation in ReservationQuerier(
303                    conn
304                ).get_consumers_reservations_full(consumer_id=consumer_id)
305            ]
306            for acquire_badge in acquire_badges:
307                rule = acquire_badge.rule
308                to_acquire = False
309                match rule:
310                    case BadgesConfig.QuantityGoal():
311                        to_acquire = BadgeEngine.quantity_goal(reservations_full, rule)
312                    case BadgesConfig.CO2Goal():
313                        to_acquire = BadgeEngine.co2_goal(reservations_full, rule)
314                    case BadgesConfig.DiversityGoal():
315                        to_acquire = BadgeEngine.diversity_goal(reservations_full, rule)
316                    case BadgesConfig.StreakGoal():
317                        to_acquire = BadgeEngine.streak_goal(
318                            reservations_full, rule, bundle_window_start
319                        )
320                if not to_acquire:
321                    continue
322                acquired_badge = await BadgeEngine.update_badge(
323                    conn, consumer_id, acquire_badge.badge_id, acquire_badge.rule.level
324                )
325                if not acquired_badge:
326                    raise ValueError("Failed to insert acquire badge record")

Background task to acquire badges.

Arguments:
  • consumer_id: consumer id
  • badges_rules: rules for all badges
  • bundle_window_start: start window for current collection
Raises:
  • ValueError: if failed to acquire badge
class BadgeEngine.AcquireBadgeRule(pydantic.main.BaseModel):
28    class AcquireBadgeRule(BaseModel):
29        """Next badge level to acquire."""
30
31        badge_id: int
32        rule: BadgesConfig.BadgeGoal

Next badge level to acquire.

badge_id: int = PydanticUndefined