internal.analytics.graphs
Module for processing and formatting seller analytics data into graph points.
1"""Module for processing and formatting seller analytics data into graph points.""" 2 3from datetime import date, datetime, time 4 5from pydantic import BaseModel 6 7# --- INPUT MODELS --- 8 9 10class BundleRow(BaseModel): 11 """Schema for a raw bundle row.""" 12 13 # The date the pickup window falls on. 14 bundle_date: date 15 16 # How many slots the seller made available for this bundle. 17 # This is what we count as "posted" on the sales and gauge graphs. 18 total_qty: int 19 20 21class ReservationRow(BaseModel): 22 """Schema for a raw reservation row.""" 23 24 # The date of the bundle this reservation belongs to. 25 bundle_date: date 26 27 # The time window the bundle's pickup opens, used for the histogram. 28 # Stored as a time object so 9:00 AM sorts correctly before 10:00 AM. 29 window_start: time 30 31 # The category the bundle belongs to, used for the category distribution. 32 category_ids: list[int] 33 34 # None means the consumer never showed up (no-show). 35 # A timestamp means they collected their bundle (sale). 36 collected_at: datetime | None 37 38 39# --- OUTPUT MODELS --- 40 41 42class SalesGraphPoint(BaseModel): 43 """Schema for sales vs. posted graph points.""" 44 45 day: date 46 posted_qty: float 47 sold_qty: float 48 49 50class CategoryGraphPoint(BaseModel): 51 """Schema for category distribution graph points.""" 52 53 category_id: int 54 collected_qty: float 55 56 57class TimeWindowGraphPoint(BaseModel): 58 """Schema for time window distribution graph points.""" 59 60 time_window: time 61 collected_qty: float 62 63 64class GaugeGraphPoint(BaseModel): 65 """Schema for a single percentage value (for a gauge/donut chart).""" 66 67 sell_through_percentage: float 68 69 70# --- THE ANALYTICS ENGINE --- 71 72 73class SellerAnalytics: 74 """Module for processing and formatting seller analytics data into graph points.""" 75 76 @staticmethod 77 def graph_weekly_sales_vs_posted( 78 bundles: list[BundleRow], reservations: list[ReservationRow] 79 ) -> list[SalesGraphPoint]: 80 """Return coordinates for the provided days (Multiline Graph).""" 81 # Group total posted slots by day from the bundles list. 82 posted_by_day: dict[date, float] = {} 83 for bundle in bundles: 84 current = posted_by_day.get(bundle.bundle_date, 0.0) 85 # max(value, 0) is a safety net: if a database error ever returns 86 # a negative total_qty, we force it to 0 so the graph doesn't break. 87 posted_by_day[bundle.bundle_date] = current + float( 88 max(bundle.total_qty, 0) 89 ) 90 91 # Count collected reservations by day from the reservations list. 92 sold_by_day: dict[date, float] = {} 93 for reservation in reservations: 94 # Make sure every day from bundles has a sold entry even if zero sales 95 # happened, so it still appears as a point on the line graph. 96 if reservation.bundle_date not in sold_by_day: 97 sold_by_day[reservation.bundle_date] = 0.0 98 # Business rule: a reservation is a sale if the consumer actually 99 # collected it (collected_at is not None). 100 if reservation.collected_at is not None: 101 sold_by_day[reservation.bundle_date] += 1.0 102 103 # Merge both day-sets and sort chronologically so the line draws left-to-right. 104 all_days = sorted(set(posted_by_day) | set(sold_by_day)) 105 return [ 106 SalesGraphPoint( 107 day=day, 108 posted_qty=posted_by_day.get(day, 0.0), 109 sold_qty=sold_by_day.get(day, 0.0), 110 ) 111 for day in all_days 112 ] 113 114 @staticmethod 115 def graph_sell_through_rate( 116 bundles: list[BundleRow], reservations: list[ReservationRow] 117 ) -> GaugeGraphPoint: 118 """Return the overall sell-through rate (Gauge/Donut Chart).""" 119 # Add up all slots posted across all bundles. 120 total_posted = sum(float(max(b.total_qty, 0)) for b in bundles) 121 122 # Add up all collected reservations. 123 total_sold = sum(1.0 for r in reservations if r.collected_at is not None) 124 125 # Prevent a "Division by Zero" crash if the seller didn't post anything. 126 if not total_posted: 127 return GaugeGraphPoint(sell_through_percentage=0.0) 128 129 # Calculate the percentage. 130 # We use min(..., 100.0) to cap it at 100% just in case a data glitch 131 # says they sold 10 items but only posted 8. 132 percentage = min((total_sold / total_posted) * 100.0, 100.0) 133 134 # Round to 2 decimal places (e.g., 85.45) for frontend display. 135 return GaugeGraphPoint(sell_through_percentage=round(percentage, 2)) 136 137 @staticmethod 138 def graph_category_distribution( 139 reservations: list[ReservationRow], top_n: int = 5 140 ) -> list[CategoryGraphPoint]: 141 """Return points for category distribution (Pie/Bar Chart).""" 142 # If we don't want any results or have no data. 143 if top_n <= 0 or not reservations: 144 return [] 145 146 # Count collected reservations per category. 147 collected_by_category: dict[int, float] = {} 148 for reservation in reservations: 149 # Skip reservations that were not collected. 150 if reservation.collected_at is None: 151 continue 152 # A bundle can belong to multiple categories. 153 for category_id in reservation.category_ids: 154 collected_by_category[category_id] = ( 155 collected_by_category.get(category_id, 0.0) + 1.0 156 ) 157 158 # Sort the categories to find the top ones. 159 # -item[1] means sort by total collected DESCENDING (biggest first). 160 # item[0] is the tie-breaker: sort by category_id ASCENDING. 161 top_categories = sorted( 162 collected_by_category.items(), key=lambda item: (-item[1], item[0]) 163 )[:top_n] # The [:top_n] chops off everything except the top winners 164 165 # Package the winners into the output format. 166 return [ 167 CategoryGraphPoint(category_id=cat_id, collected_qty=qty) 168 for cat_id, qty in top_categories 169 ] 170 171 @staticmethod 172 def graph_time_window_distribution( 173 reservations: list[ReservationRow], top_n: int = 5 174 ) -> list[TimeWindowGraphPoint]: 175 """Return points for time-window distribution (Histogram/Bar Chart).""" 176 # If we don't want any results or have no data. 177 if top_n <= 0 or not reservations: 178 return [] 179 180 # Count collected reservations per time window. 181 collected_by_window: dict[time, float] = {} 182 for reservation in reservations: 183 # Business rule: only collected reservations count toward a time window. 184 if reservation.collected_at is None: 185 continue 186 collected_by_window[reservation.window_start] = ( 187 collected_by_window.get(reservation.window_start, 0.0) + 1.0 188 ) 189 190 # Sort by highest volume first. 191 # Because we are using real 'time' objects, the tie-breaker (item[0]) 192 # correctly understands that 9:00 AM comes before 10:00 AM. 193 top_windows = sorted( 194 collected_by_window.items(), key=lambda item: (-item[1], item[0]) 195 )[:top_n] 196 197 # Package into the output format. 198 return [ 199 TimeWindowGraphPoint(time_window=window, collected_qty=qty) 200 for window, qty in top_windows 201 ]
class
BundleRow(pydantic.main.BaseModel):
11class BundleRow(BaseModel): 12 """Schema for a raw bundle row.""" 13 14 # The date the pickup window falls on. 15 bundle_date: date 16 17 # How many slots the seller made available for this bundle. 18 # This is what we count as "posted" on the sales and gauge graphs. 19 total_qty: int
Schema for a raw bundle row.
class
ReservationRow(pydantic.main.BaseModel):
22class ReservationRow(BaseModel): 23 """Schema for a raw reservation row.""" 24 25 # The date of the bundle this reservation belongs to. 26 bundle_date: date 27 28 # The time window the bundle's pickup opens, used for the histogram. 29 # Stored as a time object so 9:00 AM sorts correctly before 10:00 AM. 30 window_start: time 31 32 # The category the bundle belongs to, used for the category distribution. 33 category_ids: list[int] 34 35 # None means the consumer never showed up (no-show). 36 # A timestamp means they collected their bundle (sale). 37 collected_at: datetime | None
Schema for a raw reservation row.
class
SalesGraphPoint(pydantic.main.BaseModel):
43class SalesGraphPoint(BaseModel): 44 """Schema for sales vs. posted graph points.""" 45 46 day: date 47 posted_qty: float 48 sold_qty: float
Schema for sales vs. posted graph points.
class
CategoryGraphPoint(pydantic.main.BaseModel):
51class CategoryGraphPoint(BaseModel): 52 """Schema for category distribution graph points.""" 53 54 category_id: int 55 collected_qty: float
Schema for category distribution graph points.
class
TimeWindowGraphPoint(pydantic.main.BaseModel):
58class TimeWindowGraphPoint(BaseModel): 59 """Schema for time window distribution graph points.""" 60 61 time_window: time 62 collected_qty: float
Schema for time window distribution graph points.
class
GaugeGraphPoint(pydantic.main.BaseModel):
65class GaugeGraphPoint(BaseModel): 66 """Schema for a single percentage value (for a gauge/donut chart).""" 67 68 sell_through_percentage: float
Schema for a single percentage value (for a gauge/donut chart).
class
SellerAnalytics:
74class SellerAnalytics: 75 """Module for processing and formatting seller analytics data into graph points.""" 76 77 @staticmethod 78 def graph_weekly_sales_vs_posted( 79 bundles: list[BundleRow], reservations: list[ReservationRow] 80 ) -> list[SalesGraphPoint]: 81 """Return coordinates for the provided days (Multiline Graph).""" 82 # Group total posted slots by day from the bundles list. 83 posted_by_day: dict[date, float] = {} 84 for bundle in bundles: 85 current = posted_by_day.get(bundle.bundle_date, 0.0) 86 # max(value, 0) is a safety net: if a database error ever returns 87 # a negative total_qty, we force it to 0 so the graph doesn't break. 88 posted_by_day[bundle.bundle_date] = current + float( 89 max(bundle.total_qty, 0) 90 ) 91 92 # Count collected reservations by day from the reservations list. 93 sold_by_day: dict[date, float] = {} 94 for reservation in reservations: 95 # Make sure every day from bundles has a sold entry even if zero sales 96 # happened, so it still appears as a point on the line graph. 97 if reservation.bundle_date not in sold_by_day: 98 sold_by_day[reservation.bundle_date] = 0.0 99 # Business rule: a reservation is a sale if the consumer actually 100 # collected it (collected_at is not None). 101 if reservation.collected_at is not None: 102 sold_by_day[reservation.bundle_date] += 1.0 103 104 # Merge both day-sets and sort chronologically so the line draws left-to-right. 105 all_days = sorted(set(posted_by_day) | set(sold_by_day)) 106 return [ 107 SalesGraphPoint( 108 day=day, 109 posted_qty=posted_by_day.get(day, 0.0), 110 sold_qty=sold_by_day.get(day, 0.0), 111 ) 112 for day in all_days 113 ] 114 115 @staticmethod 116 def graph_sell_through_rate( 117 bundles: list[BundleRow], reservations: list[ReservationRow] 118 ) -> GaugeGraphPoint: 119 """Return the overall sell-through rate (Gauge/Donut Chart).""" 120 # Add up all slots posted across all bundles. 121 total_posted = sum(float(max(b.total_qty, 0)) for b in bundles) 122 123 # Add up all collected reservations. 124 total_sold = sum(1.0 for r in reservations if r.collected_at is not None) 125 126 # Prevent a "Division by Zero" crash if the seller didn't post anything. 127 if not total_posted: 128 return GaugeGraphPoint(sell_through_percentage=0.0) 129 130 # Calculate the percentage. 131 # We use min(..., 100.0) to cap it at 100% just in case a data glitch 132 # says they sold 10 items but only posted 8. 133 percentage = min((total_sold / total_posted) * 100.0, 100.0) 134 135 # Round to 2 decimal places (e.g., 85.45) for frontend display. 136 return GaugeGraphPoint(sell_through_percentage=round(percentage, 2)) 137 138 @staticmethod 139 def graph_category_distribution( 140 reservations: list[ReservationRow], top_n: int = 5 141 ) -> list[CategoryGraphPoint]: 142 """Return points for category distribution (Pie/Bar Chart).""" 143 # If we don't want any results or have no data. 144 if top_n <= 0 or not reservations: 145 return [] 146 147 # Count collected reservations per category. 148 collected_by_category: dict[int, float] = {} 149 for reservation in reservations: 150 # Skip reservations that were not collected. 151 if reservation.collected_at is None: 152 continue 153 # A bundle can belong to multiple categories. 154 for category_id in reservation.category_ids: 155 collected_by_category[category_id] = ( 156 collected_by_category.get(category_id, 0.0) + 1.0 157 ) 158 159 # Sort the categories to find the top ones. 160 # -item[1] means sort by total collected DESCENDING (biggest first). 161 # item[0] is the tie-breaker: sort by category_id ASCENDING. 162 top_categories = sorted( 163 collected_by_category.items(), key=lambda item: (-item[1], item[0]) 164 )[:top_n] # The [:top_n] chops off everything except the top winners 165 166 # Package the winners into the output format. 167 return [ 168 CategoryGraphPoint(category_id=cat_id, collected_qty=qty) 169 for cat_id, qty in top_categories 170 ] 171 172 @staticmethod 173 def graph_time_window_distribution( 174 reservations: list[ReservationRow], top_n: int = 5 175 ) -> list[TimeWindowGraphPoint]: 176 """Return points for time-window distribution (Histogram/Bar Chart).""" 177 # If we don't want any results or have no data. 178 if top_n <= 0 or not reservations: 179 return [] 180 181 # Count collected reservations per time window. 182 collected_by_window: dict[time, float] = {} 183 for reservation in reservations: 184 # Business rule: only collected reservations count toward a time window. 185 if reservation.collected_at is None: 186 continue 187 collected_by_window[reservation.window_start] = ( 188 collected_by_window.get(reservation.window_start, 0.0) + 1.0 189 ) 190 191 # Sort by highest volume first. 192 # Because we are using real 'time' objects, the tie-breaker (item[0]) 193 # correctly understands that 9:00 AM comes before 10:00 AM. 194 top_windows = sorted( 195 collected_by_window.items(), key=lambda item: (-item[1], item[0]) 196 )[:top_n] 197 198 # Package into the output format. 199 return [ 200 TimeWindowGraphPoint(time_window=window, collected_qty=qty) 201 for window, qty in top_windows 202 ]
Module for processing and formatting seller analytics data into graph points.
@staticmethod
def
graph_weekly_sales_vs_posted( bundles: list[BundleRow], reservations: list[ReservationRow]) -> list[SalesGraphPoint]:
77 @staticmethod 78 def graph_weekly_sales_vs_posted( 79 bundles: list[BundleRow], reservations: list[ReservationRow] 80 ) -> list[SalesGraphPoint]: 81 """Return coordinates for the provided days (Multiline Graph).""" 82 # Group total posted slots by day from the bundles list. 83 posted_by_day: dict[date, float] = {} 84 for bundle in bundles: 85 current = posted_by_day.get(bundle.bundle_date, 0.0) 86 # max(value, 0) is a safety net: if a database error ever returns 87 # a negative total_qty, we force it to 0 so the graph doesn't break. 88 posted_by_day[bundle.bundle_date] = current + float( 89 max(bundle.total_qty, 0) 90 ) 91 92 # Count collected reservations by day from the reservations list. 93 sold_by_day: dict[date, float] = {} 94 for reservation in reservations: 95 # Make sure every day from bundles has a sold entry even if zero sales 96 # happened, so it still appears as a point on the line graph. 97 if reservation.bundle_date not in sold_by_day: 98 sold_by_day[reservation.bundle_date] = 0.0 99 # Business rule: a reservation is a sale if the consumer actually 100 # collected it (collected_at is not None). 101 if reservation.collected_at is not None: 102 sold_by_day[reservation.bundle_date] += 1.0 103 104 # Merge both day-sets and sort chronologically so the line draws left-to-right. 105 all_days = sorted(set(posted_by_day) | set(sold_by_day)) 106 return [ 107 SalesGraphPoint( 108 day=day, 109 posted_qty=posted_by_day.get(day, 0.0), 110 sold_qty=sold_by_day.get(day, 0.0), 111 ) 112 for day in all_days 113 ]
Return coordinates for the provided days (Multiline Graph).
@staticmethod
def
graph_sell_through_rate( bundles: list[BundleRow], reservations: list[ReservationRow]) -> GaugeGraphPoint:
115 @staticmethod 116 def graph_sell_through_rate( 117 bundles: list[BundleRow], reservations: list[ReservationRow] 118 ) -> GaugeGraphPoint: 119 """Return the overall sell-through rate (Gauge/Donut Chart).""" 120 # Add up all slots posted across all bundles. 121 total_posted = sum(float(max(b.total_qty, 0)) for b in bundles) 122 123 # Add up all collected reservations. 124 total_sold = sum(1.0 for r in reservations if r.collected_at is not None) 125 126 # Prevent a "Division by Zero" crash if the seller didn't post anything. 127 if not total_posted: 128 return GaugeGraphPoint(sell_through_percentage=0.0) 129 130 # Calculate the percentage. 131 # We use min(..., 100.0) to cap it at 100% just in case a data glitch 132 # says they sold 10 items but only posted 8. 133 percentage = min((total_sold / total_posted) * 100.0, 100.0) 134 135 # Round to 2 decimal places (e.g., 85.45) for frontend display. 136 return GaugeGraphPoint(sell_through_percentage=round(percentage, 2))
Return the overall sell-through rate (Gauge/Donut Chart).
@staticmethod
def
graph_category_distribution( reservations: list[ReservationRow], top_n: int = 5) -> list[CategoryGraphPoint]:
138 @staticmethod 139 def graph_category_distribution( 140 reservations: list[ReservationRow], top_n: int = 5 141 ) -> list[CategoryGraphPoint]: 142 """Return points for category distribution (Pie/Bar Chart).""" 143 # If we don't want any results or have no data. 144 if top_n <= 0 or not reservations: 145 return [] 146 147 # Count collected reservations per category. 148 collected_by_category: dict[int, float] = {} 149 for reservation in reservations: 150 # Skip reservations that were not collected. 151 if reservation.collected_at is None: 152 continue 153 # A bundle can belong to multiple categories. 154 for category_id in reservation.category_ids: 155 collected_by_category[category_id] = ( 156 collected_by_category.get(category_id, 0.0) + 1.0 157 ) 158 159 # Sort the categories to find the top ones. 160 # -item[1] means sort by total collected DESCENDING (biggest first). 161 # item[0] is the tie-breaker: sort by category_id ASCENDING. 162 top_categories = sorted( 163 collected_by_category.items(), key=lambda item: (-item[1], item[0]) 164 )[:top_n] # The [:top_n] chops off everything except the top winners 165 166 # Package the winners into the output format. 167 return [ 168 CategoryGraphPoint(category_id=cat_id, collected_qty=qty) 169 for cat_id, qty in top_categories 170 ]
Return points for category distribution (Pie/Bar Chart).
@staticmethod
def
graph_time_window_distribution( reservations: list[ReservationRow], top_n: int = 5) -> list[TimeWindowGraphPoint]:
172 @staticmethod 173 def graph_time_window_distribution( 174 reservations: list[ReservationRow], top_n: int = 5 175 ) -> list[TimeWindowGraphPoint]: 176 """Return points for time-window distribution (Histogram/Bar Chart).""" 177 # If we don't want any results or have no data. 178 if top_n <= 0 or not reservations: 179 return [] 180 181 # Count collected reservations per time window. 182 collected_by_window: dict[time, float] = {} 183 for reservation in reservations: 184 # Business rule: only collected reservations count toward a time window. 185 if reservation.collected_at is None: 186 continue 187 collected_by_window[reservation.window_start] = ( 188 collected_by_window.get(reservation.window_start, 0.0) + 1.0 189 ) 190 191 # Sort by highest volume first. 192 # Because we are using real 'time' objects, the tie-breaker (item[0]) 193 # correctly understands that 9:00 AM comes before 10:00 AM. 194 top_windows = sorted( 195 collected_by_window.items(), key=lambda item: (-item[1], item[0]) 196 )[:top_n] 197 198 # Package into the output format. 199 return [ 200 TimeWindowGraphPoint(time_window=window, collected_qty=qty) 201 for window, qty in top_windows 202 ]
Return points for time-window distribution (Histogram/Bar Chart).