internal.analytics.graphs

Module for processing and formatting seller analytics data into graph points.

  1"""Module for processing and formatting seller analytics data into graph points."""
  2
  3from datetime import date, datetime, time
  4
  5from pydantic import BaseModel
  6
  7# --- INPUT MODELS ---
  8
  9
 10class BundleRow(BaseModel):
 11    """Schema for a raw bundle row."""
 12
 13    # The date the pickup window falls on.
 14    bundle_date: date
 15
 16    # How many slots the seller made available for this bundle.
 17    # This is what we count as "posted" on the sales and gauge graphs.
 18    total_qty: int
 19
 20
 21class ReservationRow(BaseModel):
 22    """Schema for a raw reservation row."""
 23
 24    # The date of the bundle this reservation belongs to.
 25    bundle_date: date
 26
 27    # The time window the bundle's pickup opens, used for the histogram.
 28    # Stored as a time object so 9:00 AM sorts correctly before 10:00 AM.
 29    window_start: time
 30
 31    # The category the bundle belongs to, used for the category distribution.
 32    category_ids: list[int]
 33
 34    # None means the consumer never showed up (no-show).
 35    # A timestamp means they collected their bundle (sale).
 36    collected_at: datetime | None
 37
 38
 39# --- OUTPUT MODELS ---
 40
 41
 42class SalesGraphPoint(BaseModel):
 43    """Schema for sales vs. posted graph points."""
 44
 45    day: date
 46    posted_qty: float
 47    sold_qty: float
 48
 49
 50class CategoryGraphPoint(BaseModel):
 51    """Schema for category distribution graph points."""
 52
 53    category_id: int
 54    collected_qty: float
 55
 56
 57class TimeWindowGraphPoint(BaseModel):
 58    """Schema for time window distribution graph points."""
 59
 60    time_window: time
 61    collected_qty: float
 62
 63
 64class GaugeGraphPoint(BaseModel):
 65    """Schema for a single percentage value (for a gauge/donut chart)."""
 66
 67    sell_through_percentage: float
 68
 69
 70# --- THE ANALYTICS ENGINE ---
 71
 72
 73class SellerAnalytics:
 74    """Module for processing and formatting seller analytics data into graph points."""
 75
 76    @staticmethod
 77    def graph_weekly_sales_vs_posted(
 78        bundles: list[BundleRow], reservations: list[ReservationRow]
 79    ) -> list[SalesGraphPoint]:
 80        """Return coordinates for the provided days (Multiline Graph)."""
 81        # Group total posted slots by day from the bundles list.
 82        posted_by_day: dict[date, float] = {}
 83        for bundle in bundles:
 84            current = posted_by_day.get(bundle.bundle_date, 0.0)
 85            # max(value, 0) is a safety net: if a database error ever returns
 86            # a negative total_qty, we force it to 0 so the graph doesn't break.
 87            posted_by_day[bundle.bundle_date] = current + float(
 88                max(bundle.total_qty, 0)
 89            )
 90
 91        # Count collected reservations by day from the reservations list.
 92        sold_by_day: dict[date, float] = {}
 93        for reservation in reservations:
 94            # Make sure every day from bundles has a sold entry even if zero sales
 95            # happened, so it still appears as a point on the line graph.
 96            if reservation.bundle_date not in sold_by_day:
 97                sold_by_day[reservation.bundle_date] = 0.0
 98            # Business rule: a reservation is a sale if the consumer actually
 99            # collected it (collected_at is not None).
100            if reservation.collected_at is not None:
101                sold_by_day[reservation.bundle_date] += 1.0
102
103        # Merge both day-sets and sort chronologically so the line draws left-to-right.
104        all_days = sorted(set(posted_by_day) | set(sold_by_day))
105        return [
106            SalesGraphPoint(
107                day=day,
108                posted_qty=posted_by_day.get(day, 0.0),
109                sold_qty=sold_by_day.get(day, 0.0),
110            )
111            for day in all_days
112        ]
113
114    @staticmethod
115    def graph_sell_through_rate(
116        bundles: list[BundleRow], reservations: list[ReservationRow]
117    ) -> GaugeGraphPoint:
118        """Return the overall sell-through rate (Gauge/Donut Chart)."""
119        # Add up all slots posted across all bundles.
120        total_posted = sum(float(max(b.total_qty, 0)) for b in bundles)
121
122        # Add up all collected reservations.
123        total_sold = sum(1.0 for r in reservations if r.collected_at is not None)
124
125        # Prevent a "Division by Zero" crash if the seller didn't post anything.
126        if not total_posted:
127            return GaugeGraphPoint(sell_through_percentage=0.0)
128
129        # Calculate the percentage.
130        # We use min(..., 100.0) to cap it at 100% just in case a data glitch
131        # says they sold 10 items but only posted 8.
132        percentage = min((total_sold / total_posted) * 100.0, 100.0)
133
134        # Round to 2 decimal places (e.g., 85.45) for frontend display.
135        return GaugeGraphPoint(sell_through_percentage=round(percentage, 2))
136
137    @staticmethod
138    def graph_category_distribution(
139        reservations: list[ReservationRow], top_n: int = 5
140    ) -> list[CategoryGraphPoint]:
141        """Return points for category distribution (Pie/Bar Chart)."""
142        # If we don't want any results or have no data.
143        if top_n <= 0 or not reservations:
144            return []
145
146        # Count collected reservations per category.
147        collected_by_category: dict[int, float] = {}
148        for reservation in reservations:
149            # Skip reservations that were not collected.
150            if reservation.collected_at is None:
151                continue
152            # A bundle can belong to multiple categories.
153            for category_id in reservation.category_ids:
154                collected_by_category[category_id] = (
155                    collected_by_category.get(category_id, 0.0) + 1.0
156                )
157
158        # Sort the categories to find the top ones.
159        # -item[1] means sort by total collected DESCENDING (biggest first).
160        # item[0] is the tie-breaker: sort by category_id ASCENDING.
161        top_categories = sorted(
162            collected_by_category.items(), key=lambda item: (-item[1], item[0])
163        )[:top_n]  # The [:top_n] chops off everything except the top winners
164
165        # Package the winners into the output format.
166        return [
167            CategoryGraphPoint(category_id=cat_id, collected_qty=qty)
168            for cat_id, qty in top_categories
169        ]
170
171    @staticmethod
172    def graph_time_window_distribution(
173        reservations: list[ReservationRow], top_n: int = 5
174    ) -> list[TimeWindowGraphPoint]:
175        """Return points for time-window distribution (Histogram/Bar Chart)."""
176        # If we don't want any results or have no data.
177        if top_n <= 0 or not reservations:
178            return []
179
180        # Count collected reservations per time window.
181        collected_by_window: dict[time, float] = {}
182        for reservation in reservations:
183            # Business rule: only collected reservations count toward a time window.
184            if reservation.collected_at is None:
185                continue
186            collected_by_window[reservation.window_start] = (
187                collected_by_window.get(reservation.window_start, 0.0) + 1.0
188            )
189
190        # Sort by highest volume first.
191        # Because we are using real 'time' objects, the tie-breaker (item[0])
192        # correctly understands that 9:00 AM comes before 10:00 AM.
193        top_windows = sorted(
194            collected_by_window.items(), key=lambda item: (-item[1], item[0])
195        )[:top_n]
196
197        # Package into the output format.
198        return [
199            TimeWindowGraphPoint(time_window=window, collected_qty=qty)
200            for window, qty in top_windows
201        ]
class BundleRow(pydantic.main.BaseModel):
11class BundleRow(BaseModel):
12    """Schema for a raw bundle row."""
13
14    # The date the pickup window falls on.
15    bundle_date: date
16
17    # How many slots the seller made available for this bundle.
18    # This is what we count as "posted" on the sales and gauge graphs.
19    total_qty: int

Schema for a raw bundle row.

bundle_date: datetime.date = PydanticUndefined
total_qty: int = PydanticUndefined
class ReservationRow(pydantic.main.BaseModel):
22class ReservationRow(BaseModel):
23    """Schema for a raw reservation row."""
24
25    # The date of the bundle this reservation belongs to.
26    bundle_date: date
27
28    # The time window the bundle's pickup opens, used for the histogram.
29    # Stored as a time object so 9:00 AM sorts correctly before 10:00 AM.
30    window_start: time
31
32    # The category the bundle belongs to, used for the category distribution.
33    category_ids: list[int]
34
35    # None means the consumer never showed up (no-show).
36    # A timestamp means they collected their bundle (sale).
37    collected_at: datetime | None

Schema for a raw reservation row.

bundle_date: datetime.date = PydanticUndefined
window_start: datetime.time = PydanticUndefined
category_ids: list[int] = PydanticUndefined
collected_at: datetime.datetime | None = PydanticUndefined
class SalesGraphPoint(pydantic.main.BaseModel):
43class SalesGraphPoint(BaseModel):
44    """Schema for sales vs. posted graph points."""
45
46    day: date
47    posted_qty: float
48    sold_qty: float

Schema for sales vs. posted graph points.

day: datetime.date = PydanticUndefined
posted_qty: float = PydanticUndefined
sold_qty: float = PydanticUndefined
class CategoryGraphPoint(pydantic.main.BaseModel):
51class CategoryGraphPoint(BaseModel):
52    """Schema for category distribution graph points."""
53
54    category_id: int
55    collected_qty: float

Schema for category distribution graph points.

category_id: int = PydanticUndefined
collected_qty: float = PydanticUndefined
class TimeWindowGraphPoint(pydantic.main.BaseModel):
58class TimeWindowGraphPoint(BaseModel):
59    """Schema for time window distribution graph points."""
60
61    time_window: time
62    collected_qty: float

Schema for time window distribution graph points.

time_window: datetime.time = PydanticUndefined
collected_qty: float = PydanticUndefined
class GaugeGraphPoint(pydantic.main.BaseModel):
65class GaugeGraphPoint(BaseModel):
66    """Schema for a single percentage value (for a gauge/donut chart)."""
67
68    sell_through_percentage: float

Schema for a single percentage value (for a gauge/donut chart).

sell_through_percentage: float = PydanticUndefined
class SellerAnalytics:
 74class SellerAnalytics:
 75    """Module for processing and formatting seller analytics data into graph points."""
 76
 77    @staticmethod
 78    def graph_weekly_sales_vs_posted(
 79        bundles: list[BundleRow], reservations: list[ReservationRow]
 80    ) -> list[SalesGraphPoint]:
 81        """Return coordinates for the provided days (Multiline Graph)."""
 82        # Group total posted slots by day from the bundles list.
 83        posted_by_day: dict[date, float] = {}
 84        for bundle in bundles:
 85            current = posted_by_day.get(bundle.bundle_date, 0.0)
 86            # max(value, 0) is a safety net: if a database error ever returns
 87            # a negative total_qty, we force it to 0 so the graph doesn't break.
 88            posted_by_day[bundle.bundle_date] = current + float(
 89                max(bundle.total_qty, 0)
 90            )
 91
 92        # Count collected reservations by day from the reservations list.
 93        sold_by_day: dict[date, float] = {}
 94        for reservation in reservations:
 95            # Make sure every day from bundles has a sold entry even if zero sales
 96            # happened, so it still appears as a point on the line graph.
 97            if reservation.bundle_date not in sold_by_day:
 98                sold_by_day[reservation.bundle_date] = 0.0
 99            # Business rule: a reservation is a sale if the consumer actually
100            # collected it (collected_at is not None).
101            if reservation.collected_at is not None:
102                sold_by_day[reservation.bundle_date] += 1.0
103
104        # Merge both day-sets and sort chronologically so the line draws left-to-right.
105        all_days = sorted(set(posted_by_day) | set(sold_by_day))
106        return [
107            SalesGraphPoint(
108                day=day,
109                posted_qty=posted_by_day.get(day, 0.0),
110                sold_qty=sold_by_day.get(day, 0.0),
111            )
112            for day in all_days
113        ]
114
115    @staticmethod
116    def graph_sell_through_rate(
117        bundles: list[BundleRow], reservations: list[ReservationRow]
118    ) -> GaugeGraphPoint:
119        """Return the overall sell-through rate (Gauge/Donut Chart)."""
120        # Add up all slots posted across all bundles.
121        total_posted = sum(float(max(b.total_qty, 0)) for b in bundles)
122
123        # Add up all collected reservations.
124        total_sold = sum(1.0 for r in reservations if r.collected_at is not None)
125
126        # Prevent a "Division by Zero" crash if the seller didn't post anything.
127        if not total_posted:
128            return GaugeGraphPoint(sell_through_percentage=0.0)
129
130        # Calculate the percentage.
131        # We use min(..., 100.0) to cap it at 100% just in case a data glitch
132        # says they sold 10 items but only posted 8.
133        percentage = min((total_sold / total_posted) * 100.0, 100.0)
134
135        # Round to 2 decimal places (e.g., 85.45) for frontend display.
136        return GaugeGraphPoint(sell_through_percentage=round(percentage, 2))
137
138    @staticmethod
139    def graph_category_distribution(
140        reservations: list[ReservationRow], top_n: int = 5
141    ) -> list[CategoryGraphPoint]:
142        """Return points for category distribution (Pie/Bar Chart)."""
143        # If we don't want any results or have no data.
144        if top_n <= 0 or not reservations:
145            return []
146
147        # Count collected reservations per category.
148        collected_by_category: dict[int, float] = {}
149        for reservation in reservations:
150            # Skip reservations that were not collected.
151            if reservation.collected_at is None:
152                continue
153            # A bundle can belong to multiple categories.
154            for category_id in reservation.category_ids:
155                collected_by_category[category_id] = (
156                    collected_by_category.get(category_id, 0.0) + 1.0
157                )
158
159        # Sort the categories to find the top ones.
160        # -item[1] means sort by total collected DESCENDING (biggest first).
161        # item[0] is the tie-breaker: sort by category_id ASCENDING.
162        top_categories = sorted(
163            collected_by_category.items(), key=lambda item: (-item[1], item[0])
164        )[:top_n]  # The [:top_n] chops off everything except the top winners
165
166        # Package the winners into the output format.
167        return [
168            CategoryGraphPoint(category_id=cat_id, collected_qty=qty)
169            for cat_id, qty in top_categories
170        ]
171
172    @staticmethod
173    def graph_time_window_distribution(
174        reservations: list[ReservationRow], top_n: int = 5
175    ) -> list[TimeWindowGraphPoint]:
176        """Return points for time-window distribution (Histogram/Bar Chart)."""
177        # If we don't want any results or have no data.
178        if top_n <= 0 or not reservations:
179            return []
180
181        # Count collected reservations per time window.
182        collected_by_window: dict[time, float] = {}
183        for reservation in reservations:
184            # Business rule: only collected reservations count toward a time window.
185            if reservation.collected_at is None:
186                continue
187            collected_by_window[reservation.window_start] = (
188                collected_by_window.get(reservation.window_start, 0.0) + 1.0
189            )
190
191        # Sort by highest volume first.
192        # Because we are using real 'time' objects, the tie-breaker (item[0])
193        # correctly understands that 9:00 AM comes before 10:00 AM.
194        top_windows = sorted(
195            collected_by_window.items(), key=lambda item: (-item[1], item[0])
196        )[:top_n]
197
198        # Package into the output format.
199        return [
200            TimeWindowGraphPoint(time_window=window, collected_qty=qty)
201            for window, qty in top_windows
202        ]

Module for processing and formatting seller analytics data into graph points.

@staticmethod
def graph_weekly_sales_vs_posted( bundles: list[BundleRow], reservations: list[ReservationRow]) -> list[SalesGraphPoint]:
 77    @staticmethod
 78    def graph_weekly_sales_vs_posted(
 79        bundles: list[BundleRow], reservations: list[ReservationRow]
 80    ) -> list[SalesGraphPoint]:
 81        """Return coordinates for the provided days (Multiline Graph)."""
 82        # Group total posted slots by day from the bundles list.
 83        posted_by_day: dict[date, float] = {}
 84        for bundle in bundles:
 85            current = posted_by_day.get(bundle.bundle_date, 0.0)
 86            # max(value, 0) is a safety net: if a database error ever returns
 87            # a negative total_qty, we force it to 0 so the graph doesn't break.
 88            posted_by_day[bundle.bundle_date] = current + float(
 89                max(bundle.total_qty, 0)
 90            )
 91
 92        # Count collected reservations by day from the reservations list.
 93        sold_by_day: dict[date, float] = {}
 94        for reservation in reservations:
 95            # Make sure every day from bundles has a sold entry even if zero sales
 96            # happened, so it still appears as a point on the line graph.
 97            if reservation.bundle_date not in sold_by_day:
 98                sold_by_day[reservation.bundle_date] = 0.0
 99            # Business rule: a reservation is a sale if the consumer actually
100            # collected it (collected_at is not None).
101            if reservation.collected_at is not None:
102                sold_by_day[reservation.bundle_date] += 1.0
103
104        # Merge both day-sets and sort chronologically so the line draws left-to-right.
105        all_days = sorted(set(posted_by_day) | set(sold_by_day))
106        return [
107            SalesGraphPoint(
108                day=day,
109                posted_qty=posted_by_day.get(day, 0.0),
110                sold_qty=sold_by_day.get(day, 0.0),
111            )
112            for day in all_days
113        ]

Return coordinates for the provided days (Multiline Graph).

@staticmethod
def graph_sell_through_rate( bundles: list[BundleRow], reservations: list[ReservationRow]) -> GaugeGraphPoint:
115    @staticmethod
116    def graph_sell_through_rate(
117        bundles: list[BundleRow], reservations: list[ReservationRow]
118    ) -> GaugeGraphPoint:
119        """Return the overall sell-through rate (Gauge/Donut Chart)."""
120        # Add up all slots posted across all bundles.
121        total_posted = sum(float(max(b.total_qty, 0)) for b in bundles)
122
123        # Add up all collected reservations.
124        total_sold = sum(1.0 for r in reservations if r.collected_at is not None)
125
126        # Prevent a "Division by Zero" crash if the seller didn't post anything.
127        if not total_posted:
128            return GaugeGraphPoint(sell_through_percentage=0.0)
129
130        # Calculate the percentage.
131        # We use min(..., 100.0) to cap it at 100% just in case a data glitch
132        # says they sold 10 items but only posted 8.
133        percentage = min((total_sold / total_posted) * 100.0, 100.0)
134
135        # Round to 2 decimal places (e.g., 85.45) for frontend display.
136        return GaugeGraphPoint(sell_through_percentage=round(percentage, 2))

Return the overall sell-through rate (Gauge/Donut Chart).

@staticmethod
def graph_category_distribution( reservations: list[ReservationRow], top_n: int = 5) -> list[CategoryGraphPoint]:
138    @staticmethod
139    def graph_category_distribution(
140        reservations: list[ReservationRow], top_n: int = 5
141    ) -> list[CategoryGraphPoint]:
142        """Return points for category distribution (Pie/Bar Chart)."""
143        # If we don't want any results or have no data.
144        if top_n <= 0 or not reservations:
145            return []
146
147        # Count collected reservations per category.
148        collected_by_category: dict[int, float] = {}
149        for reservation in reservations:
150            # Skip reservations that were not collected.
151            if reservation.collected_at is None:
152                continue
153            # A bundle can belong to multiple categories.
154            for category_id in reservation.category_ids:
155                collected_by_category[category_id] = (
156                    collected_by_category.get(category_id, 0.0) + 1.0
157                )
158
159        # Sort the categories to find the top ones.
160        # -item[1] means sort by total collected DESCENDING (biggest first).
161        # item[0] is the tie-breaker: sort by category_id ASCENDING.
162        top_categories = sorted(
163            collected_by_category.items(), key=lambda item: (-item[1], item[0])
164        )[:top_n]  # The [:top_n] chops off everything except the top winners
165
166        # Package the winners into the output format.
167        return [
168            CategoryGraphPoint(category_id=cat_id, collected_qty=qty)
169            for cat_id, qty in top_categories
170        ]

Return points for category distribution (Pie/Bar Chart).

@staticmethod
def graph_time_window_distribution( reservations: list[ReservationRow], top_n: int = 5) -> list[TimeWindowGraphPoint]:
172    @staticmethod
173    def graph_time_window_distribution(
174        reservations: list[ReservationRow], top_n: int = 5
175    ) -> list[TimeWindowGraphPoint]:
176        """Return points for time-window distribution (Histogram/Bar Chart)."""
177        # If we don't want any results or have no data.
178        if top_n <= 0 or not reservations:
179            return []
180
181        # Count collected reservations per time window.
182        collected_by_window: dict[time, float] = {}
183        for reservation in reservations:
184            # Business rule: only collected reservations count toward a time window.
185            if reservation.collected_at is None:
186                continue
187            collected_by_window[reservation.window_start] = (
188                collected_by_window.get(reservation.window_start, 0.0) + 1.0
189            )
190
191        # Sort by highest volume first.
192        # Because we are using real 'time' objects, the tie-breaker (item[0])
193        # correctly understands that 9:00 AM comes before 10:00 AM.
194        top_windows = sorted(
195            collected_by_window.items(), key=lambda item: (-item[1], item[0])
196        )[:top_n]
197
198        # Package into the output format.
199        return [
200            TimeWindowGraphPoint(time_window=window, collected_qty=qty)
201            for window, qty in top_windows
202        ]

Return points for time-window distribution (Histogram/Bar Chart).