internal.analytics.processing
Graph refreshing background processing.
1"""Graph refreshing background processing.""" 2 3import datetime 4from decimal import Decimal 5 6from fastapi import BackgroundTasks 7from internal.analytics.forecast import ForecastQuery, generate_seller_forecasts 8from internal.analytics.forecast_info import BundleDetails, build_forecast_query 9from internal.analytics.graphs import BundleRow, ReservationRow, SellerAnalytics 10from internal.database.manager import database_manager 11from internal.logger.logger import logger 12from internal.queries.analytics import AsyncQuerier as AnalyticsQuerier 13from internal.queries.analytics import ( 14 CreateGraphParams, 15 CreateGraphPointParams, 16 CreateGraphSeriesParams, 17 GetGraphParams, 18) 19from internal.queries.bundle import AsyncQuerier as BundleQuerier 20from internal.queries.category import AsyncQuerier as CategoryQuerier 21from internal.queries.forecast import AsyncQuerier as ForecastQuerier 22from internal.queries.forecast import UpsertForecastOutputParams 23from internal.queries.reservations import AsyncQuerier as ReservationQuerier 24from internal.queries.seller import AsyncQuerier as SellerQuerier 25from sqlalchemy.ext.asyncio import AsyncConnection 26 27 28class AnalyticsProcesser: 29 """Analytics processing.""" 30 31 background_tasks: BackgroundTasks 32 33 def __init__(self, background_tasks: BackgroundTasks) -> None: 34 """Init processing for seller.""" 35 self.background_tasks = background_tasks 36 37 def run(self, seller_id: int) -> None: 38 """Starts background analytics task. 39 40 Args: 41 seller_id: seller id 42 """ 43 self.background_tasks.add_task(self.process_analytics, seller_id) 44 45 @staticmethod 46 async def add_sales_vs_posted_graph( 47 analytics_querier: AnalyticsQuerier, 48 seller_id: int, 49 bundle_rows: list[BundleRow], 50 reservation_rows: list[ReservationRow], 51 ) -> None: 52 """Add seles vs posted graph. 53 54 Args: 55 analytics_querier: async analytics queries 56 seller_id: seller id 57 bundle_rows: formatted bundle rows 58 reservation_rows: formatted reservation rows 59 """ 60 if ( 61 graph := await analytics_querier.get_graph( 62 GetGraphParams(seller_id=seller_id, graph_type=1) 63 ) 64 ) is None: 65 logger.exception("failed to get seles_vs_posted graph") 66 return 67 if ( 68 sales_series := await analytics_querier.create_graph_series( 69 CreateGraphSeriesParams( 70 graph_id=graph.graph_id, series_name="sales", sort_index=0 71 ) 72 ) 73 ) is None: 74 logger.exception("failed to create sales series for sales_vs_posted") 75 return 76 if ( 77 posted_series := await analytics_querier.create_graph_series( 78 CreateGraphSeriesParams( 79 graph_id=graph.graph_id, series_name="posted", sort_index=1 80 ) 81 ) 82 ) is None: 83 logger.exception("failed to create posted series for sales_vs_posted") 84 return 85 for i, point in enumerate( 86 SellerAnalytics.graph_weekly_sales_vs_posted(bundle_rows, reservation_rows) 87 ): 88 if ( 89 await analytics_querier.create_graph_point( 90 CreateGraphPointParams( 91 series_id=sales_series.series_id, 92 sort_index=i, 93 x=point.day.strftime("%Y-%m-%d"), 94 y=Decimal(point.sold_qty), 95 ) 96 ) 97 is None 98 ): 99 logger.exception( 100 "failed to create point for sales series for sales_vs_posted" 101 ) 102 return 103 if ( 104 await analytics_querier.create_graph_point( 105 CreateGraphPointParams( 106 series_id=posted_series.series_id, 107 sort_index=i, 108 x=point.day.strftime("%Y-%m-%d"), 109 y=Decimal(point.posted_qty), 110 ) 111 ) 112 is None 113 ): 114 logger.exception( 115 "failed to create point for sales series for sales_vs_posted" 116 ) 117 return 118 119 @staticmethod 120 async def add_sell_through_rate_graph( 121 analytics_querier: AnalyticsQuerier, 122 seller_id: int, 123 bundle_rows: list[BundleRow], 124 reservation_rows: list[ReservationRow], 125 ) -> None: 126 """Add sell through rate graph. 127 128 Args: 129 analytics_querier: async analytics queries 130 seller_id: seller id 131 bundle_rows: formatted bundle rows 132 reservation_rows: formatted reservation rows 133 """ 134 if ( 135 graph := await analytics_querier.get_graph( 136 GetGraphParams(seller_id=seller_id, graph_type=2) 137 ) 138 ) is None: 139 logger.exception("failed to get sell_through_rate graph") 140 return 141 if ( 142 sell_series := await analytics_querier.create_graph_series( 143 CreateGraphSeriesParams( 144 graph_id=graph.graph_id, series_name="sell_rate", sort_index=0 145 ) 146 ) 147 ) is None: 148 logger.exception("failed to create sell series for sell_through_rate graph") 149 return 150 sell_through_rate = SellerAnalytics.graph_sell_through_rate( 151 bundle_rows, reservation_rows 152 ).sell_through_percentage 153 if ( 154 await analytics_querier.create_graph_point( 155 CreateGraphPointParams( 156 series_id=sell_series.series_id, 157 sort_index=0, 158 x="sold", 159 y=Decimal(sell_through_rate), 160 ) 161 ) 162 ) is None: 163 logger.exception("failed to create sold point for sell_through_rate graph") 164 return 165 if ( 166 await analytics_querier.create_graph_point( 167 CreateGraphPointParams( 168 series_id=sell_series.series_id, 169 sort_index=1, 170 x="unsold", 171 y=Decimal(100 - sell_through_rate), 172 ) 173 ) 174 ) is None: 175 logger.exception( 176 "failed to create unsold point for sell_through_rate graph" 177 ) 178 return 179 180 @staticmethod 181 async def add_cateogry_distribution_graph( 182 analytics_querier: AnalyticsQuerier, 183 seller_id: int, 184 reservation_rows: list[ReservationRow], 185 conn: AsyncConnection, 186 ) -> None: 187 """Add category distribution graph. 188 189 Args: 190 analytics_querier: async analytics queries 191 seller_id: seller id 192 reservation_rows: formatted reservation rows 193 conn: database connection 194 """ 195 if ( 196 graph := await analytics_querier.get_graph( 197 GetGraphParams(seller_id=seller_id, graph_type=3) 198 ) 199 ) is None: 200 logger.exception("failed to get category_distribution graph") 201 return 202 if ( 203 categories_series := await analytics_querier.create_graph_series( 204 CreateGraphSeriesParams( 205 graph_id=graph.graph_id, series_name="categories", sort_index=0 206 ) 207 ) 208 ) is None: 209 logger.exception( 210 "failed to create categories series for category_distribution graph" 211 ) 212 return 213 category_distribution = SellerAnalytics.graph_category_distribution( 214 reservation_rows 215 ) 216 category_querier = CategoryQuerier(conn) 217 for i, category in enumerate(category_distribution): 218 if ( 219 category_row := await category_querier.get_category( 220 category_id=category.category_id 221 ) 222 ) is None: 223 logger.exception("failed to get category") 224 return 225 if ( 226 await analytics_querier.create_graph_point( 227 CreateGraphPointParams( 228 series_id=categories_series.series_id, 229 sort_index=i, 230 x=category_row.category_name, 231 y=Decimal(category.collected_qty), 232 ) 233 ) 234 ) is None: 235 logger.exception( 236 "failed to create category point for category_distribution graph" 237 ) 238 return 239 240 @staticmethod 241 async def add_time_window_distribution_graph( 242 analytics_querier: AnalyticsQuerier, 243 seller_id: int, 244 reservation_rows: list[ReservationRow], 245 ) -> None: 246 """Add time window distribution graph. 247 248 Args: 249 analytics_querier: async analytics queries 250 seller_id: seller id 251 reservation_rows: formatted reservation rows 252 """ 253 if ( 254 graph := await analytics_querier.get_graph( 255 GetGraphParams(seller_id=seller_id, graph_type=4) 256 ) 257 ) is None: 258 logger.exception("failed to get time_window_distribution graph") 259 return 260 if ( 261 time_windows_series := await analytics_querier.create_graph_series( 262 CreateGraphSeriesParams( 263 graph_id=graph.graph_id, series_name="time_windows", sort_index=0 264 ) 265 ) 266 ) is None: 267 logger.exception("failed to create time_windows series") 268 return 269 time_windows_distribution = SellerAnalytics.graph_time_window_distribution( 270 reservation_rows 271 ) 272 for i, time_window in enumerate(time_windows_distribution): 273 if ( 274 await analytics_querier.create_graph_point( 275 CreateGraphPointParams( 276 series_id=time_windows_series.series_id, 277 sort_index=i, 278 x=time_window.time_window.strftime("%H:%M"), 279 y=Decimal(time_window.collected_qty), 280 ) 281 ) 282 ) is None: 283 logger.exception("failed to create time_window point") 284 return 285 286 @staticmethod 287 async def add_forecast_graph( 288 analytics_querier: AnalyticsQuerier, 289 seller_id: int, 290 bundle_rows: list[BundleRow], 291 reservation_rows: list[ReservationRow], 292 ) -> None: 293 """Add forecast vs posted graph. 294 295 Args: 296 analytics_querier: async analytics queries 297 seller_id: seller id 298 bundle_rows: formatted bundle rows 299 reservation_rows: formatted reservation rows 300 """ 301 if ( 302 graph := await analytics_querier.get_graph( 303 GetGraphParams(seller_id=seller_id, graph_type=5) 304 ) 305 ) is None: 306 logger.exception("failed to get forecast graph") 307 return 308 if ( 309 sales_series := await analytics_querier.create_graph_series( 310 CreateGraphSeriesParams( 311 graph_id=graph.graph_id, series_name="sales", sort_index=0 312 ) 313 ) 314 ) is None: 315 logger.exception("failed to create sales series for forecast") 316 return 317 if ( 318 posted_series := await analytics_querier.create_graph_series( 319 CreateGraphSeriesParams( 320 graph_id=graph.graph_id, series_name="posted", sort_index=1 321 ) 322 ) 323 ) is None: 324 logger.exception("failed to create posted series for forecast") 325 return 326 for i, point in enumerate( 327 SellerAnalytics.graph_weekly_sales_vs_posted(bundle_rows, reservation_rows) 328 ): 329 if ( 330 await analytics_querier.create_graph_point( 331 CreateGraphPointParams( 332 series_id=sales_series.series_id, 333 sort_index=i, 334 x=point.day.strftime("%Y-%m-%d"), 335 y=Decimal(point.sold_qty), 336 ) 337 ) 338 is None 339 ): 340 logger.exception("failed to create point for sales series for forecast") 341 return 342 if ( 343 await analytics_querier.create_graph_point( 344 CreateGraphPointParams( 345 series_id=posted_series.series_id, 346 sort_index=i, 347 x=point.day.strftime("%Y-%m-%d"), 348 y=Decimal(point.posted_qty), 349 ) 350 ) 351 is None 352 ): 353 logger.exception( 354 "failed to create point for posted series for forecast" 355 ) 356 return 357 358 @staticmethod 359 async def add_forecast_outputs( 360 forecast_querier: ForecastQuerier, 361 category_querier: CategoryQuerier, 362 seller_querier: SellerQuerier, 363 seller_id: int, 364 conn: AsyncConnection, 365 ) -> None: 366 """Save forecasts for seller's future bundles to forecast_output table.""" 367 history = [ 368 row 369 async for row in forecast_querier.get_forecast_inputs_by_seller( 370 seller_id=seller_id 371 ) 372 ] 373 374 seller = await seller_querier.get_seller(user_id=seller_id) 375 if seller is None: 376 logger.exception("failed to get seller for forecast") 377 return 378 379 bundles_querier = BundleQuerier(conn) 380 bundles = [ 381 b async for b in bundles_querier.get_sellers_bundles(seller_id=seller_id) 382 ] 383 future_bundles = [ 384 b for b in bundles if b.window_start > datetime.datetime.now(datetime.UTC) 385 ] 386 387 bundle_queries: list[tuple[int, ForecastQuery]] = [] 388 for bundle in future_bundles: 389 categories = [ 390 cat_id 391 async for cat_id in category_querier.get_bundle_categories( 392 bundle_id=bundle.bundle_id 393 ) 394 ] 395 details = BundleDetails( 396 bundle_id=bundle.bundle_id, 397 bundle_date=bundle.window_start.date(), 398 window_start=bundle.window_start, 399 window_end=bundle.window_end, 400 seller_id=bundle.seller_id, 401 category_ids=categories, 402 latitude=seller.latitude, 403 longitude=seller.longitude, 404 posted_qty=bundle.total_qty, 405 ) 406 query = build_forecast_query(details) 407 bundle_queries.append((bundle.bundle_id, query)) 408 409 forecasts = generate_seller_forecasts(history, bundle_queries) 410 411 for forecast in forecasts: 412 await forecast_querier.upsert_forecast_output( 413 UpsertForecastOutputParams( 414 bundle_id=forecast.bundle_id, 415 seller_id=forecast.seller_id, 416 window_start=forecast.window_start, 417 predicted_sales=forecast.predicted_sales, 418 posted_qty=forecast.posted_qty, 419 predicted_no_show_prob=forecast.predicted_no_show_prob, 420 confidence=forecast.confidence, 421 rationale=forecast.rationale, 422 ) 423 ) 424 425 @staticmethod 426 async def process_analytics(seller_id: int) -> None: 427 """Background graph processing. 428 429 Args: 430 seller_id: seller_id 431 432 """ 433 async for conn in database_manager.get_connection(): 434 analytics_querier = AnalyticsQuerier(conn) 435 graph_types = analytics_querier.get_graphs_types() 436 async for graph_type in graph_types: 437 if ( 438 graph := await analytics_querier.get_graph( 439 GetGraphParams( 440 seller_id=seller_id, graph_type=graph_type.graph_type_id 441 ) 442 ) 443 ) is not None: 444 await analytics_querier.delete_graph_series(graph_id=graph.graph_id) 445 elif ( 446 await analytics_querier.create_graph( 447 CreateGraphParams( 448 seller_id=seller_id, graph_type=graph_type.graph_type_id 449 ) 450 ) 451 is None 452 ): 453 logger.exception("Failed to create analytics graph") 454 return 455 seller_bundles = BundleQuerier(conn).get_sellers_bundles( 456 seller_id=seller_id 457 ) 458 seller_reservations = ReservationQuerier(conn).get_seller_reservations_full( 459 seller_id=seller_id 460 ) 461 bundle_rows = [ 462 BundleRow( 463 bundle_date=bundle.window_start.date(), total_qty=bundle.total_qty 464 ) 465 async for bundle in seller_bundles 466 ] 467 reservation_rows = [ 468 ReservationRow( 469 bundle_date=reservation.window_start.date(), 470 window_start=reservation.window_start.time(), 471 category_ids=reservation.category_ids, 472 collected_at=reservation.collected_at, 473 ) 474 async for reservation in seller_reservations 475 ] 476 await AnalyticsProcesser.add_sales_vs_posted_graph( 477 analytics_querier, seller_id, bundle_rows, reservation_rows 478 ) 479 await AnalyticsProcesser.add_sell_through_rate_graph( 480 analytics_querier, seller_id, bundle_rows, reservation_rows 481 ) 482 await AnalyticsProcesser.add_cateogry_distribution_graph( 483 analytics_querier, seller_id, reservation_rows, conn 484 ) 485 await AnalyticsProcesser.add_time_window_distribution_graph( 486 analytics_querier, seller_id, reservation_rows 487 ) 488 await AnalyticsProcesser.add_forecast_graph( 489 analytics_querier, seller_id, bundle_rows, reservation_rows 490 ) 491 await AnalyticsProcesser.add_forecast_outputs( 492 ForecastQuerier(conn), 493 CategoryQuerier(conn), 494 SellerQuerier(conn), 495 seller_id, 496 conn, 497 )
class
AnalyticsProcesser:
29class AnalyticsProcesser: 30 """Analytics processing.""" 31 32 background_tasks: BackgroundTasks 33 34 def __init__(self, background_tasks: BackgroundTasks) -> None: 35 """Init processing for seller.""" 36 self.background_tasks = background_tasks 37 38 def run(self, seller_id: int) -> None: 39 """Starts background analytics task. 40 41 Args: 42 seller_id: seller id 43 """ 44 self.background_tasks.add_task(self.process_analytics, seller_id) 45 46 @staticmethod 47 async def add_sales_vs_posted_graph( 48 analytics_querier: AnalyticsQuerier, 49 seller_id: int, 50 bundle_rows: list[BundleRow], 51 reservation_rows: list[ReservationRow], 52 ) -> None: 53 """Add seles vs posted graph. 54 55 Args: 56 analytics_querier: async analytics queries 57 seller_id: seller id 58 bundle_rows: formatted bundle rows 59 reservation_rows: formatted reservation rows 60 """ 61 if ( 62 graph := await analytics_querier.get_graph( 63 GetGraphParams(seller_id=seller_id, graph_type=1) 64 ) 65 ) is None: 66 logger.exception("failed to get seles_vs_posted graph") 67 return 68 if ( 69 sales_series := await analytics_querier.create_graph_series( 70 CreateGraphSeriesParams( 71 graph_id=graph.graph_id, series_name="sales", sort_index=0 72 ) 73 ) 74 ) is None: 75 logger.exception("failed to create sales series for sales_vs_posted") 76 return 77 if ( 78 posted_series := await analytics_querier.create_graph_series( 79 CreateGraphSeriesParams( 80 graph_id=graph.graph_id, series_name="posted", sort_index=1 81 ) 82 ) 83 ) is None: 84 logger.exception("failed to create posted series for sales_vs_posted") 85 return 86 for i, point in enumerate( 87 SellerAnalytics.graph_weekly_sales_vs_posted(bundle_rows, reservation_rows) 88 ): 89 if ( 90 await analytics_querier.create_graph_point( 91 CreateGraphPointParams( 92 series_id=sales_series.series_id, 93 sort_index=i, 94 x=point.day.strftime("%Y-%m-%d"), 95 y=Decimal(point.sold_qty), 96 ) 97 ) 98 is None 99 ): 100 logger.exception( 101 "failed to create point for sales series for sales_vs_posted" 102 ) 103 return 104 if ( 105 await analytics_querier.create_graph_point( 106 CreateGraphPointParams( 107 series_id=posted_series.series_id, 108 sort_index=i, 109 x=point.day.strftime("%Y-%m-%d"), 110 y=Decimal(point.posted_qty), 111 ) 112 ) 113 is None 114 ): 115 logger.exception( 116 "failed to create point for sales series for sales_vs_posted" 117 ) 118 return 119 120 @staticmethod 121 async def add_sell_through_rate_graph( 122 analytics_querier: AnalyticsQuerier, 123 seller_id: int, 124 bundle_rows: list[BundleRow], 125 reservation_rows: list[ReservationRow], 126 ) -> None: 127 """Add sell through rate graph. 128 129 Args: 130 analytics_querier: async analytics queries 131 seller_id: seller id 132 bundle_rows: formatted bundle rows 133 reservation_rows: formatted reservation rows 134 """ 135 if ( 136 graph := await analytics_querier.get_graph( 137 GetGraphParams(seller_id=seller_id, graph_type=2) 138 ) 139 ) is None: 140 logger.exception("failed to get sell_through_rate graph") 141 return 142 if ( 143 sell_series := await analytics_querier.create_graph_series( 144 CreateGraphSeriesParams( 145 graph_id=graph.graph_id, series_name="sell_rate", sort_index=0 146 ) 147 ) 148 ) is None: 149 logger.exception("failed to create sell series for sell_through_rate graph") 150 return 151 sell_through_rate = SellerAnalytics.graph_sell_through_rate( 152 bundle_rows, reservation_rows 153 ).sell_through_percentage 154 if ( 155 await analytics_querier.create_graph_point( 156 CreateGraphPointParams( 157 series_id=sell_series.series_id, 158 sort_index=0, 159 x="sold", 160 y=Decimal(sell_through_rate), 161 ) 162 ) 163 ) is None: 164 logger.exception("failed to create sold point for sell_through_rate graph") 165 return 166 if ( 167 await analytics_querier.create_graph_point( 168 CreateGraphPointParams( 169 series_id=sell_series.series_id, 170 sort_index=1, 171 x="unsold", 172 y=Decimal(100 - sell_through_rate), 173 ) 174 ) 175 ) is None: 176 logger.exception( 177 "failed to create unsold point for sell_through_rate graph" 178 ) 179 return 180 181 @staticmethod 182 async def add_cateogry_distribution_graph( 183 analytics_querier: AnalyticsQuerier, 184 seller_id: int, 185 reservation_rows: list[ReservationRow], 186 conn: AsyncConnection, 187 ) -> None: 188 """Add category distribution graph. 189 190 Args: 191 analytics_querier: async analytics queries 192 seller_id: seller id 193 reservation_rows: formatted reservation rows 194 conn: database connection 195 """ 196 if ( 197 graph := await analytics_querier.get_graph( 198 GetGraphParams(seller_id=seller_id, graph_type=3) 199 ) 200 ) is None: 201 logger.exception("failed to get category_distribution graph") 202 return 203 if ( 204 categories_series := await analytics_querier.create_graph_series( 205 CreateGraphSeriesParams( 206 graph_id=graph.graph_id, series_name="categories", sort_index=0 207 ) 208 ) 209 ) is None: 210 logger.exception( 211 "failed to create categories series for category_distribution graph" 212 ) 213 return 214 category_distribution = SellerAnalytics.graph_category_distribution( 215 reservation_rows 216 ) 217 category_querier = CategoryQuerier(conn) 218 for i, category in enumerate(category_distribution): 219 if ( 220 category_row := await category_querier.get_category( 221 category_id=category.category_id 222 ) 223 ) is None: 224 logger.exception("failed to get category") 225 return 226 if ( 227 await analytics_querier.create_graph_point( 228 CreateGraphPointParams( 229 series_id=categories_series.series_id, 230 sort_index=i, 231 x=category_row.category_name, 232 y=Decimal(category.collected_qty), 233 ) 234 ) 235 ) is None: 236 logger.exception( 237 "failed to create category point for category_distribution graph" 238 ) 239 return 240 241 @staticmethod 242 async def add_time_window_distribution_graph( 243 analytics_querier: AnalyticsQuerier, 244 seller_id: int, 245 reservation_rows: list[ReservationRow], 246 ) -> None: 247 """Add time window distribution graph. 248 249 Args: 250 analytics_querier: async analytics queries 251 seller_id: seller id 252 reservation_rows: formatted reservation rows 253 """ 254 if ( 255 graph := await analytics_querier.get_graph( 256 GetGraphParams(seller_id=seller_id, graph_type=4) 257 ) 258 ) is None: 259 logger.exception("failed to get time_window_distribution graph") 260 return 261 if ( 262 time_windows_series := await analytics_querier.create_graph_series( 263 CreateGraphSeriesParams( 264 graph_id=graph.graph_id, series_name="time_windows", sort_index=0 265 ) 266 ) 267 ) is None: 268 logger.exception("failed to create time_windows series") 269 return 270 time_windows_distribution = SellerAnalytics.graph_time_window_distribution( 271 reservation_rows 272 ) 273 for i, time_window in enumerate(time_windows_distribution): 274 if ( 275 await analytics_querier.create_graph_point( 276 CreateGraphPointParams( 277 series_id=time_windows_series.series_id, 278 sort_index=i, 279 x=time_window.time_window.strftime("%H:%M"), 280 y=Decimal(time_window.collected_qty), 281 ) 282 ) 283 ) is None: 284 logger.exception("failed to create time_window point") 285 return 286 287 @staticmethod 288 async def add_forecast_graph( 289 analytics_querier: AnalyticsQuerier, 290 seller_id: int, 291 bundle_rows: list[BundleRow], 292 reservation_rows: list[ReservationRow], 293 ) -> None: 294 """Add forecast vs posted graph. 295 296 Args: 297 analytics_querier: async analytics queries 298 seller_id: seller id 299 bundle_rows: formatted bundle rows 300 reservation_rows: formatted reservation rows 301 """ 302 if ( 303 graph := await analytics_querier.get_graph( 304 GetGraphParams(seller_id=seller_id, graph_type=5) 305 ) 306 ) is None: 307 logger.exception("failed to get forecast graph") 308 return 309 if ( 310 sales_series := await analytics_querier.create_graph_series( 311 CreateGraphSeriesParams( 312 graph_id=graph.graph_id, series_name="sales", sort_index=0 313 ) 314 ) 315 ) is None: 316 logger.exception("failed to create sales series for forecast") 317 return 318 if ( 319 posted_series := await analytics_querier.create_graph_series( 320 CreateGraphSeriesParams( 321 graph_id=graph.graph_id, series_name="posted", sort_index=1 322 ) 323 ) 324 ) is None: 325 logger.exception("failed to create posted series for forecast") 326 return 327 for i, point in enumerate( 328 SellerAnalytics.graph_weekly_sales_vs_posted(bundle_rows, reservation_rows) 329 ): 330 if ( 331 await analytics_querier.create_graph_point( 332 CreateGraphPointParams( 333 series_id=sales_series.series_id, 334 sort_index=i, 335 x=point.day.strftime("%Y-%m-%d"), 336 y=Decimal(point.sold_qty), 337 ) 338 ) 339 is None 340 ): 341 logger.exception("failed to create point for sales series for forecast") 342 return 343 if ( 344 await analytics_querier.create_graph_point( 345 CreateGraphPointParams( 346 series_id=posted_series.series_id, 347 sort_index=i, 348 x=point.day.strftime("%Y-%m-%d"), 349 y=Decimal(point.posted_qty), 350 ) 351 ) 352 is None 353 ): 354 logger.exception( 355 "failed to create point for posted series for forecast" 356 ) 357 return 358 359 @staticmethod 360 async def add_forecast_outputs( 361 forecast_querier: ForecastQuerier, 362 category_querier: CategoryQuerier, 363 seller_querier: SellerQuerier, 364 seller_id: int, 365 conn: AsyncConnection, 366 ) -> None: 367 """Save forecasts for seller's future bundles to forecast_output table.""" 368 history = [ 369 row 370 async for row in forecast_querier.get_forecast_inputs_by_seller( 371 seller_id=seller_id 372 ) 373 ] 374 375 seller = await seller_querier.get_seller(user_id=seller_id) 376 if seller is None: 377 logger.exception("failed to get seller for forecast") 378 return 379 380 bundles_querier = BundleQuerier(conn) 381 bundles = [ 382 b async for b in bundles_querier.get_sellers_bundles(seller_id=seller_id) 383 ] 384 future_bundles = [ 385 b for b in bundles if b.window_start > datetime.datetime.now(datetime.UTC) 386 ] 387 388 bundle_queries: list[tuple[int, ForecastQuery]] = [] 389 for bundle in future_bundles: 390 categories = [ 391 cat_id 392 async for cat_id in category_querier.get_bundle_categories( 393 bundle_id=bundle.bundle_id 394 ) 395 ] 396 details = BundleDetails( 397 bundle_id=bundle.bundle_id, 398 bundle_date=bundle.window_start.date(), 399 window_start=bundle.window_start, 400 window_end=bundle.window_end, 401 seller_id=bundle.seller_id, 402 category_ids=categories, 403 latitude=seller.latitude, 404 longitude=seller.longitude, 405 posted_qty=bundle.total_qty, 406 ) 407 query = build_forecast_query(details) 408 bundle_queries.append((bundle.bundle_id, query)) 409 410 forecasts = generate_seller_forecasts(history, bundle_queries) 411 412 for forecast in forecasts: 413 await forecast_querier.upsert_forecast_output( 414 UpsertForecastOutputParams( 415 bundle_id=forecast.bundle_id, 416 seller_id=forecast.seller_id, 417 window_start=forecast.window_start, 418 predicted_sales=forecast.predicted_sales, 419 posted_qty=forecast.posted_qty, 420 predicted_no_show_prob=forecast.predicted_no_show_prob, 421 confidence=forecast.confidence, 422 rationale=forecast.rationale, 423 ) 424 ) 425 426 @staticmethod 427 async def process_analytics(seller_id: int) -> None: 428 """Background graph processing. 429 430 Args: 431 seller_id: seller_id 432 433 """ 434 async for conn in database_manager.get_connection(): 435 analytics_querier = AnalyticsQuerier(conn) 436 graph_types = analytics_querier.get_graphs_types() 437 async for graph_type in graph_types: 438 if ( 439 graph := await analytics_querier.get_graph( 440 GetGraphParams( 441 seller_id=seller_id, graph_type=graph_type.graph_type_id 442 ) 443 ) 444 ) is not None: 445 await analytics_querier.delete_graph_series(graph_id=graph.graph_id) 446 elif ( 447 await analytics_querier.create_graph( 448 CreateGraphParams( 449 seller_id=seller_id, graph_type=graph_type.graph_type_id 450 ) 451 ) 452 is None 453 ): 454 logger.exception("Failed to create analytics graph") 455 return 456 seller_bundles = BundleQuerier(conn).get_sellers_bundles( 457 seller_id=seller_id 458 ) 459 seller_reservations = ReservationQuerier(conn).get_seller_reservations_full( 460 seller_id=seller_id 461 ) 462 bundle_rows = [ 463 BundleRow( 464 bundle_date=bundle.window_start.date(), total_qty=bundle.total_qty 465 ) 466 async for bundle in seller_bundles 467 ] 468 reservation_rows = [ 469 ReservationRow( 470 bundle_date=reservation.window_start.date(), 471 window_start=reservation.window_start.time(), 472 category_ids=reservation.category_ids, 473 collected_at=reservation.collected_at, 474 ) 475 async for reservation in seller_reservations 476 ] 477 await AnalyticsProcesser.add_sales_vs_posted_graph( 478 analytics_querier, seller_id, bundle_rows, reservation_rows 479 ) 480 await AnalyticsProcesser.add_sell_through_rate_graph( 481 analytics_querier, seller_id, bundle_rows, reservation_rows 482 ) 483 await AnalyticsProcesser.add_cateogry_distribution_graph( 484 analytics_querier, seller_id, reservation_rows, conn 485 ) 486 await AnalyticsProcesser.add_time_window_distribution_graph( 487 analytics_querier, seller_id, reservation_rows 488 ) 489 await AnalyticsProcesser.add_forecast_graph( 490 analytics_querier, seller_id, bundle_rows, reservation_rows 491 ) 492 await AnalyticsProcesser.add_forecast_outputs( 493 ForecastQuerier(conn), 494 CategoryQuerier(conn), 495 SellerQuerier(conn), 496 seller_id, 497 conn, 498 )
Analytics processing.
AnalyticsProcesser(background_tasks: fastapi.background.BackgroundTasks)
34 def __init__(self, background_tasks: BackgroundTasks) -> None: 35 """Init processing for seller.""" 36 self.background_tasks = background_tasks
Init processing for seller.
def
run(self, seller_id: int) -> None:
38 def run(self, seller_id: int) -> None: 39 """Starts background analytics task. 40 41 Args: 42 seller_id: seller id 43 """ 44 self.background_tasks.add_task(self.process_analytics, seller_id)
Starts background analytics task.
Arguments:
- seller_id: seller id
@staticmethod
async def
add_sales_vs_posted_graph( analytics_querier: internal.queries.analytics.AsyncQuerier, seller_id: int, bundle_rows: list[internal.analytics.graphs.BundleRow], reservation_rows: list[internal.analytics.graphs.ReservationRow]) -> None:
46 @staticmethod 47 async def add_sales_vs_posted_graph( 48 analytics_querier: AnalyticsQuerier, 49 seller_id: int, 50 bundle_rows: list[BundleRow], 51 reservation_rows: list[ReservationRow], 52 ) -> None: 53 """Add seles vs posted graph. 54 55 Args: 56 analytics_querier: async analytics queries 57 seller_id: seller id 58 bundle_rows: formatted bundle rows 59 reservation_rows: formatted reservation rows 60 """ 61 if ( 62 graph := await analytics_querier.get_graph( 63 GetGraphParams(seller_id=seller_id, graph_type=1) 64 ) 65 ) is None: 66 logger.exception("failed to get seles_vs_posted graph") 67 return 68 if ( 69 sales_series := await analytics_querier.create_graph_series( 70 CreateGraphSeriesParams( 71 graph_id=graph.graph_id, series_name="sales", sort_index=0 72 ) 73 ) 74 ) is None: 75 logger.exception("failed to create sales series for sales_vs_posted") 76 return 77 if ( 78 posted_series := await analytics_querier.create_graph_series( 79 CreateGraphSeriesParams( 80 graph_id=graph.graph_id, series_name="posted", sort_index=1 81 ) 82 ) 83 ) is None: 84 logger.exception("failed to create posted series for sales_vs_posted") 85 return 86 for i, point in enumerate( 87 SellerAnalytics.graph_weekly_sales_vs_posted(bundle_rows, reservation_rows) 88 ): 89 if ( 90 await analytics_querier.create_graph_point( 91 CreateGraphPointParams( 92 series_id=sales_series.series_id, 93 sort_index=i, 94 x=point.day.strftime("%Y-%m-%d"), 95 y=Decimal(point.sold_qty), 96 ) 97 ) 98 is None 99 ): 100 logger.exception( 101 "failed to create point for sales series for sales_vs_posted" 102 ) 103 return 104 if ( 105 await analytics_querier.create_graph_point( 106 CreateGraphPointParams( 107 series_id=posted_series.series_id, 108 sort_index=i, 109 x=point.day.strftime("%Y-%m-%d"), 110 y=Decimal(point.posted_qty), 111 ) 112 ) 113 is None 114 ): 115 logger.exception( 116 "failed to create point for sales series for sales_vs_posted" 117 ) 118 return
Add seles vs posted graph.
Arguments:
- analytics_querier: async analytics queries
- seller_id: seller id
- bundle_rows: formatted bundle rows
- reservation_rows: formatted reservation rows
@staticmethod
async def
add_sell_through_rate_graph( analytics_querier: internal.queries.analytics.AsyncQuerier, seller_id: int, bundle_rows: list[internal.analytics.graphs.BundleRow], reservation_rows: list[internal.analytics.graphs.ReservationRow]) -> None:
120 @staticmethod 121 async def add_sell_through_rate_graph( 122 analytics_querier: AnalyticsQuerier, 123 seller_id: int, 124 bundle_rows: list[BundleRow], 125 reservation_rows: list[ReservationRow], 126 ) -> None: 127 """Add sell through rate graph. 128 129 Args: 130 analytics_querier: async analytics queries 131 seller_id: seller id 132 bundle_rows: formatted bundle rows 133 reservation_rows: formatted reservation rows 134 """ 135 if ( 136 graph := await analytics_querier.get_graph( 137 GetGraphParams(seller_id=seller_id, graph_type=2) 138 ) 139 ) is None: 140 logger.exception("failed to get sell_through_rate graph") 141 return 142 if ( 143 sell_series := await analytics_querier.create_graph_series( 144 CreateGraphSeriesParams( 145 graph_id=graph.graph_id, series_name="sell_rate", sort_index=0 146 ) 147 ) 148 ) is None: 149 logger.exception("failed to create sell series for sell_through_rate graph") 150 return 151 sell_through_rate = SellerAnalytics.graph_sell_through_rate( 152 bundle_rows, reservation_rows 153 ).sell_through_percentage 154 if ( 155 await analytics_querier.create_graph_point( 156 CreateGraphPointParams( 157 series_id=sell_series.series_id, 158 sort_index=0, 159 x="sold", 160 y=Decimal(sell_through_rate), 161 ) 162 ) 163 ) is None: 164 logger.exception("failed to create sold point for sell_through_rate graph") 165 return 166 if ( 167 await analytics_querier.create_graph_point( 168 CreateGraphPointParams( 169 series_id=sell_series.series_id, 170 sort_index=1, 171 x="unsold", 172 y=Decimal(100 - sell_through_rate), 173 ) 174 ) 175 ) is None: 176 logger.exception( 177 "failed to create unsold point for sell_through_rate graph" 178 ) 179 return
Add sell through rate graph.
Arguments:
- analytics_querier: async analytics queries
- seller_id: seller id
- bundle_rows: formatted bundle rows
- reservation_rows: formatted reservation rows
@staticmethod
async def
add_cateogry_distribution_graph( analytics_querier: internal.queries.analytics.AsyncQuerier, seller_id: int, reservation_rows: list[internal.analytics.graphs.ReservationRow], conn: sqlalchemy.ext.asyncio.engine.AsyncConnection) -> None:
181 @staticmethod 182 async def add_cateogry_distribution_graph( 183 analytics_querier: AnalyticsQuerier, 184 seller_id: int, 185 reservation_rows: list[ReservationRow], 186 conn: AsyncConnection, 187 ) -> None: 188 """Add category distribution graph. 189 190 Args: 191 analytics_querier: async analytics queries 192 seller_id: seller id 193 reservation_rows: formatted reservation rows 194 conn: database connection 195 """ 196 if ( 197 graph := await analytics_querier.get_graph( 198 GetGraphParams(seller_id=seller_id, graph_type=3) 199 ) 200 ) is None: 201 logger.exception("failed to get category_distribution graph") 202 return 203 if ( 204 categories_series := await analytics_querier.create_graph_series( 205 CreateGraphSeriesParams( 206 graph_id=graph.graph_id, series_name="categories", sort_index=0 207 ) 208 ) 209 ) is None: 210 logger.exception( 211 "failed to create categories series for category_distribution graph" 212 ) 213 return 214 category_distribution = SellerAnalytics.graph_category_distribution( 215 reservation_rows 216 ) 217 category_querier = CategoryQuerier(conn) 218 for i, category in enumerate(category_distribution): 219 if ( 220 category_row := await category_querier.get_category( 221 category_id=category.category_id 222 ) 223 ) is None: 224 logger.exception("failed to get category") 225 return 226 if ( 227 await analytics_querier.create_graph_point( 228 CreateGraphPointParams( 229 series_id=categories_series.series_id, 230 sort_index=i, 231 x=category_row.category_name, 232 y=Decimal(category.collected_qty), 233 ) 234 ) 235 ) is None: 236 logger.exception( 237 "failed to create category point for category_distribution graph" 238 ) 239 return
Add category distribution graph.
Arguments:
- analytics_querier: async analytics queries
- seller_id: seller id
- reservation_rows: formatted reservation rows
- conn: database connection
@staticmethod
async def
add_time_window_distribution_graph( analytics_querier: internal.queries.analytics.AsyncQuerier, seller_id: int, reservation_rows: list[internal.analytics.graphs.ReservationRow]) -> None:
241 @staticmethod 242 async def add_time_window_distribution_graph( 243 analytics_querier: AnalyticsQuerier, 244 seller_id: int, 245 reservation_rows: list[ReservationRow], 246 ) -> None: 247 """Add time window distribution graph. 248 249 Args: 250 analytics_querier: async analytics queries 251 seller_id: seller id 252 reservation_rows: formatted reservation rows 253 """ 254 if ( 255 graph := await analytics_querier.get_graph( 256 GetGraphParams(seller_id=seller_id, graph_type=4) 257 ) 258 ) is None: 259 logger.exception("failed to get time_window_distribution graph") 260 return 261 if ( 262 time_windows_series := await analytics_querier.create_graph_series( 263 CreateGraphSeriesParams( 264 graph_id=graph.graph_id, series_name="time_windows", sort_index=0 265 ) 266 ) 267 ) is None: 268 logger.exception("failed to create time_windows series") 269 return 270 time_windows_distribution = SellerAnalytics.graph_time_window_distribution( 271 reservation_rows 272 ) 273 for i, time_window in enumerate(time_windows_distribution): 274 if ( 275 await analytics_querier.create_graph_point( 276 CreateGraphPointParams( 277 series_id=time_windows_series.series_id, 278 sort_index=i, 279 x=time_window.time_window.strftime("%H:%M"), 280 y=Decimal(time_window.collected_qty), 281 ) 282 ) 283 ) is None: 284 logger.exception("failed to create time_window point") 285 return
Add time window distribution graph.
Arguments:
- analytics_querier: async analytics queries
- seller_id: seller id
- reservation_rows: formatted reservation rows
@staticmethod
async def
add_forecast_graph( analytics_querier: internal.queries.analytics.AsyncQuerier, seller_id: int, bundle_rows: list[internal.analytics.graphs.BundleRow], reservation_rows: list[internal.analytics.graphs.ReservationRow]) -> None:
287 @staticmethod 288 async def add_forecast_graph( 289 analytics_querier: AnalyticsQuerier, 290 seller_id: int, 291 bundle_rows: list[BundleRow], 292 reservation_rows: list[ReservationRow], 293 ) -> None: 294 """Add forecast vs posted graph. 295 296 Args: 297 analytics_querier: async analytics queries 298 seller_id: seller id 299 bundle_rows: formatted bundle rows 300 reservation_rows: formatted reservation rows 301 """ 302 if ( 303 graph := await analytics_querier.get_graph( 304 GetGraphParams(seller_id=seller_id, graph_type=5) 305 ) 306 ) is None: 307 logger.exception("failed to get forecast graph") 308 return 309 if ( 310 sales_series := await analytics_querier.create_graph_series( 311 CreateGraphSeriesParams( 312 graph_id=graph.graph_id, series_name="sales", sort_index=0 313 ) 314 ) 315 ) is None: 316 logger.exception("failed to create sales series for forecast") 317 return 318 if ( 319 posted_series := await analytics_querier.create_graph_series( 320 CreateGraphSeriesParams( 321 graph_id=graph.graph_id, series_name="posted", sort_index=1 322 ) 323 ) 324 ) is None: 325 logger.exception("failed to create posted series for forecast") 326 return 327 for i, point in enumerate( 328 SellerAnalytics.graph_weekly_sales_vs_posted(bundle_rows, reservation_rows) 329 ): 330 if ( 331 await analytics_querier.create_graph_point( 332 CreateGraphPointParams( 333 series_id=sales_series.series_id, 334 sort_index=i, 335 x=point.day.strftime("%Y-%m-%d"), 336 y=Decimal(point.sold_qty), 337 ) 338 ) 339 is None 340 ): 341 logger.exception("failed to create point for sales series for forecast") 342 return 343 if ( 344 await analytics_querier.create_graph_point( 345 CreateGraphPointParams( 346 series_id=posted_series.series_id, 347 sort_index=i, 348 x=point.day.strftime("%Y-%m-%d"), 349 y=Decimal(point.posted_qty), 350 ) 351 ) 352 is None 353 ): 354 logger.exception( 355 "failed to create point for posted series for forecast" 356 ) 357 return
Add forecast vs posted graph.
Arguments:
- analytics_querier: async analytics queries
- seller_id: seller id
- bundle_rows: formatted bundle rows
- reservation_rows: formatted reservation rows
@staticmethod
async def
add_forecast_outputs( forecast_querier: internal.queries.forecast.AsyncQuerier, category_querier: internal.queries.category.AsyncQuerier, seller_querier: internal.queries.seller.AsyncQuerier, seller_id: int, conn: sqlalchemy.ext.asyncio.engine.AsyncConnection) -> None:
359 @staticmethod 360 async def add_forecast_outputs( 361 forecast_querier: ForecastQuerier, 362 category_querier: CategoryQuerier, 363 seller_querier: SellerQuerier, 364 seller_id: int, 365 conn: AsyncConnection, 366 ) -> None: 367 """Save forecasts for seller's future bundles to forecast_output table.""" 368 history = [ 369 row 370 async for row in forecast_querier.get_forecast_inputs_by_seller( 371 seller_id=seller_id 372 ) 373 ] 374 375 seller = await seller_querier.get_seller(user_id=seller_id) 376 if seller is None: 377 logger.exception("failed to get seller for forecast") 378 return 379 380 bundles_querier = BundleQuerier(conn) 381 bundles = [ 382 b async for b in bundles_querier.get_sellers_bundles(seller_id=seller_id) 383 ] 384 future_bundles = [ 385 b for b in bundles if b.window_start > datetime.datetime.now(datetime.UTC) 386 ] 387 388 bundle_queries: list[tuple[int, ForecastQuery]] = [] 389 for bundle in future_bundles: 390 categories = [ 391 cat_id 392 async for cat_id in category_querier.get_bundle_categories( 393 bundle_id=bundle.bundle_id 394 ) 395 ] 396 details = BundleDetails( 397 bundle_id=bundle.bundle_id, 398 bundle_date=bundle.window_start.date(), 399 window_start=bundle.window_start, 400 window_end=bundle.window_end, 401 seller_id=bundle.seller_id, 402 category_ids=categories, 403 latitude=seller.latitude, 404 longitude=seller.longitude, 405 posted_qty=bundle.total_qty, 406 ) 407 query = build_forecast_query(details) 408 bundle_queries.append((bundle.bundle_id, query)) 409 410 forecasts = generate_seller_forecasts(history, bundle_queries) 411 412 for forecast in forecasts: 413 await forecast_querier.upsert_forecast_output( 414 UpsertForecastOutputParams( 415 bundle_id=forecast.bundle_id, 416 seller_id=forecast.seller_id, 417 window_start=forecast.window_start, 418 predicted_sales=forecast.predicted_sales, 419 posted_qty=forecast.posted_qty, 420 predicted_no_show_prob=forecast.predicted_no_show_prob, 421 confidence=forecast.confidence, 422 rationale=forecast.rationale, 423 ) 424 )
Save forecasts for seller's future bundles to forecast_output table.
@staticmethod
async def
process_analytics(seller_id: int) -> None:
426 @staticmethod 427 async def process_analytics(seller_id: int) -> None: 428 """Background graph processing. 429 430 Args: 431 seller_id: seller_id 432 433 """ 434 async for conn in database_manager.get_connection(): 435 analytics_querier = AnalyticsQuerier(conn) 436 graph_types = analytics_querier.get_graphs_types() 437 async for graph_type in graph_types: 438 if ( 439 graph := await analytics_querier.get_graph( 440 GetGraphParams( 441 seller_id=seller_id, graph_type=graph_type.graph_type_id 442 ) 443 ) 444 ) is not None: 445 await analytics_querier.delete_graph_series(graph_id=graph.graph_id) 446 elif ( 447 await analytics_querier.create_graph( 448 CreateGraphParams( 449 seller_id=seller_id, graph_type=graph_type.graph_type_id 450 ) 451 ) 452 is None 453 ): 454 logger.exception("Failed to create analytics graph") 455 return 456 seller_bundles = BundleQuerier(conn).get_sellers_bundles( 457 seller_id=seller_id 458 ) 459 seller_reservations = ReservationQuerier(conn).get_seller_reservations_full( 460 seller_id=seller_id 461 ) 462 bundle_rows = [ 463 BundleRow( 464 bundle_date=bundle.window_start.date(), total_qty=bundle.total_qty 465 ) 466 async for bundle in seller_bundles 467 ] 468 reservation_rows = [ 469 ReservationRow( 470 bundle_date=reservation.window_start.date(), 471 window_start=reservation.window_start.time(), 472 category_ids=reservation.category_ids, 473 collected_at=reservation.collected_at, 474 ) 475 async for reservation in seller_reservations 476 ] 477 await AnalyticsProcesser.add_sales_vs_posted_graph( 478 analytics_querier, seller_id, bundle_rows, reservation_rows 479 ) 480 await AnalyticsProcesser.add_sell_through_rate_graph( 481 analytics_querier, seller_id, bundle_rows, reservation_rows 482 ) 483 await AnalyticsProcesser.add_cateogry_distribution_graph( 484 analytics_querier, seller_id, reservation_rows, conn 485 ) 486 await AnalyticsProcesser.add_time_window_distribution_graph( 487 analytics_querier, seller_id, reservation_rows 488 ) 489 await AnalyticsProcesser.add_forecast_graph( 490 analytics_querier, seller_id, bundle_rows, reservation_rows 491 ) 492 await AnalyticsProcesser.add_forecast_outputs( 493 ForecastQuerier(conn), 494 CategoryQuerier(conn), 495 SellerQuerier(conn), 496 seller_id, 497 conn, 498 )
Background graph processing.
Arguments:
- seller_id: seller_id