internal.analytics.processing

Graph refreshing background processing.

  1"""Graph refreshing background processing."""
  2
  3import datetime
  4from decimal import Decimal
  5
  6from fastapi import BackgroundTasks
  7from internal.analytics.forecast import ForecastQuery, generate_seller_forecasts
  8from internal.analytics.forecast_info import BundleDetails, build_forecast_query
  9from internal.analytics.graphs import BundleRow, ReservationRow, SellerAnalytics
 10from internal.database.manager import database_manager
 11from internal.logger.logger import logger
 12from internal.queries.analytics import AsyncQuerier as AnalyticsQuerier
 13from internal.queries.analytics import (
 14    CreateGraphParams,
 15    CreateGraphPointParams,
 16    CreateGraphSeriesParams,
 17    GetGraphParams,
 18)
 19from internal.queries.bundle import AsyncQuerier as BundleQuerier
 20from internal.queries.category import AsyncQuerier as CategoryQuerier
 21from internal.queries.forecast import AsyncQuerier as ForecastQuerier
 22from internal.queries.forecast import UpsertForecastOutputParams
 23from internal.queries.reservations import AsyncQuerier as ReservationQuerier
 24from internal.queries.seller import AsyncQuerier as SellerQuerier
 25from sqlalchemy.ext.asyncio import AsyncConnection
 26
 27
 28class AnalyticsProcesser:
 29    """Analytics processing."""
 30
 31    background_tasks: BackgroundTasks
 32
 33    def __init__(self, background_tasks: BackgroundTasks) -> None:
 34        """Init processing for seller."""
 35        self.background_tasks = background_tasks
 36
 37    def run(self, seller_id: int) -> None:
 38        """Starts background analytics task.
 39
 40        Args:
 41            seller_id: seller id
 42        """
 43        self.background_tasks.add_task(self.process_analytics, seller_id)
 44
 45    @staticmethod
 46    async def add_sales_vs_posted_graph(
 47        analytics_querier: AnalyticsQuerier,
 48        seller_id: int,
 49        bundle_rows: list[BundleRow],
 50        reservation_rows: list[ReservationRow],
 51    ) -> None:
 52        """Add seles vs posted graph.
 53
 54        Args:
 55            analytics_querier: async analytics queries
 56            seller_id: seller id
 57            bundle_rows: formatted bundle rows
 58            reservation_rows: formatted reservation rows
 59        """
 60        if (
 61            graph := await analytics_querier.get_graph(
 62                GetGraphParams(seller_id=seller_id, graph_type=1)
 63            )
 64        ) is None:
 65            logger.exception("failed to get seles_vs_posted graph")
 66            return
 67        if (
 68            sales_series := await analytics_querier.create_graph_series(
 69                CreateGraphSeriesParams(
 70                    graph_id=graph.graph_id, series_name="sales", sort_index=0
 71                )
 72            )
 73        ) is None:
 74            logger.exception("failed to create sales series for sales_vs_posted")
 75            return
 76        if (
 77            posted_series := await analytics_querier.create_graph_series(
 78                CreateGraphSeriesParams(
 79                    graph_id=graph.graph_id, series_name="posted", sort_index=1
 80                )
 81            )
 82        ) is None:
 83            logger.exception("failed to create posted series for sales_vs_posted")
 84            return
 85        for i, point in enumerate(
 86            SellerAnalytics.graph_weekly_sales_vs_posted(bundle_rows, reservation_rows)
 87        ):
 88            if (
 89                await analytics_querier.create_graph_point(
 90                    CreateGraphPointParams(
 91                        series_id=sales_series.series_id,
 92                        sort_index=i,
 93                        x=point.day.strftime("%Y-%m-%d"),
 94                        y=Decimal(point.sold_qty),
 95                    )
 96                )
 97                is None
 98            ):
 99                logger.exception(
100                    "failed to create point for sales series for sales_vs_posted"
101                )
102                return
103            if (
104                await analytics_querier.create_graph_point(
105                    CreateGraphPointParams(
106                        series_id=posted_series.series_id,
107                        sort_index=i,
108                        x=point.day.strftime("%Y-%m-%d"),
109                        y=Decimal(point.posted_qty),
110                    )
111                )
112                is None
113            ):
114                logger.exception(
115                    "failed to create point for sales series for sales_vs_posted"
116                )
117                return
118
119    @staticmethod
120    async def add_sell_through_rate_graph(
121        analytics_querier: AnalyticsQuerier,
122        seller_id: int,
123        bundle_rows: list[BundleRow],
124        reservation_rows: list[ReservationRow],
125    ) -> None:
126        """Add sell through rate graph.
127
128        Args:
129            analytics_querier: async analytics queries
130            seller_id: seller id
131            bundle_rows: formatted bundle rows
132            reservation_rows: formatted reservation rows
133        """
134        if (
135            graph := await analytics_querier.get_graph(
136                GetGraphParams(seller_id=seller_id, graph_type=2)
137            )
138        ) is None:
139            logger.exception("failed to get sell_through_rate graph")
140            return
141        if (
142            sell_series := await analytics_querier.create_graph_series(
143                CreateGraphSeriesParams(
144                    graph_id=graph.graph_id, series_name="sell_rate", sort_index=0
145                )
146            )
147        ) is None:
148            logger.exception("failed to create sell series for sell_through_rate graph")
149            return
150        sell_through_rate = SellerAnalytics.graph_sell_through_rate(
151            bundle_rows, reservation_rows
152        ).sell_through_percentage
153        if (
154            await analytics_querier.create_graph_point(
155                CreateGraphPointParams(
156                    series_id=sell_series.series_id,
157                    sort_index=0,
158                    x="sold",
159                    y=Decimal(sell_through_rate),
160                )
161            )
162        ) is None:
163            logger.exception("failed to create sold point for sell_through_rate graph")
164            return
165        if (
166            await analytics_querier.create_graph_point(
167                CreateGraphPointParams(
168                    series_id=sell_series.series_id,
169                    sort_index=1,
170                    x="unsold",
171                    y=Decimal(100 - sell_through_rate),
172                )
173            )
174        ) is None:
175            logger.exception(
176                "failed to create unsold point for sell_through_rate graph"
177            )
178            return
179
180    @staticmethod
181    async def add_cateogry_distribution_graph(
182        analytics_querier: AnalyticsQuerier,
183        seller_id: int,
184        reservation_rows: list[ReservationRow],
185        conn: AsyncConnection,
186    ) -> None:
187        """Add category distribution graph.
188
189        Args:
190            analytics_querier: async analytics queries
191            seller_id: seller id
192            reservation_rows: formatted reservation rows
193            conn: database connection
194        """
195        if (
196            graph := await analytics_querier.get_graph(
197                GetGraphParams(seller_id=seller_id, graph_type=3)
198            )
199        ) is None:
200            logger.exception("failed to get category_distribution graph")
201            return
202        if (
203            categories_series := await analytics_querier.create_graph_series(
204                CreateGraphSeriesParams(
205                    graph_id=graph.graph_id, series_name="categories", sort_index=0
206                )
207            )
208        ) is None:
209            logger.exception(
210                "failed to create categories series for category_distribution graph"
211            )
212            return
213        category_distribution = SellerAnalytics.graph_category_distribution(
214            reservation_rows
215        )
216        category_querier = CategoryQuerier(conn)
217        for i, category in enumerate(category_distribution):
218            if (
219                category_row := await category_querier.get_category(
220                    category_id=category.category_id
221                )
222            ) is None:
223                logger.exception("failed to get category")
224                return
225            if (
226                await analytics_querier.create_graph_point(
227                    CreateGraphPointParams(
228                        series_id=categories_series.series_id,
229                        sort_index=i,
230                        x=category_row.category_name,
231                        y=Decimal(category.collected_qty),
232                    )
233                )
234            ) is None:
235                logger.exception(
236                    "failed to create category point for category_distribution graph"
237                )
238                return
239
240    @staticmethod
241    async def add_time_window_distribution_graph(
242        analytics_querier: AnalyticsQuerier,
243        seller_id: int,
244        reservation_rows: list[ReservationRow],
245    ) -> None:
246        """Add time window distribution graph.
247
248        Args:
249            analytics_querier: async analytics queries
250            seller_id: seller id
251            reservation_rows: formatted reservation rows
252        """
253        if (
254            graph := await analytics_querier.get_graph(
255                GetGraphParams(seller_id=seller_id, graph_type=4)
256            )
257        ) is None:
258            logger.exception("failed to get time_window_distribution graph")
259            return
260        if (
261            time_windows_series := await analytics_querier.create_graph_series(
262                CreateGraphSeriesParams(
263                    graph_id=graph.graph_id, series_name="time_windows", sort_index=0
264                )
265            )
266        ) is None:
267            logger.exception("failed to create time_windows series")
268            return
269        time_windows_distribution = SellerAnalytics.graph_time_window_distribution(
270            reservation_rows
271        )
272        for i, time_window in enumerate(time_windows_distribution):
273            if (
274                await analytics_querier.create_graph_point(
275                    CreateGraphPointParams(
276                        series_id=time_windows_series.series_id,
277                        sort_index=i,
278                        x=time_window.time_window.strftime("%H:%M"),
279                        y=Decimal(time_window.collected_qty),
280                    )
281                )
282            ) is None:
283                logger.exception("failed to create time_window point")
284                return
285
286    @staticmethod
287    async def add_forecast_graph(
288        analytics_querier: AnalyticsQuerier,
289        seller_id: int,
290        bundle_rows: list[BundleRow],
291        reservation_rows: list[ReservationRow],
292    ) -> None:
293        """Add forecast vs posted graph.
294
295        Args:
296            analytics_querier: async analytics queries
297            seller_id: seller id
298            bundle_rows: formatted bundle rows
299            reservation_rows: formatted reservation rows
300        """
301        if (
302            graph := await analytics_querier.get_graph(
303                GetGraphParams(seller_id=seller_id, graph_type=5)
304            )
305        ) is None:
306            logger.exception("failed to get forecast graph")
307            return
308        if (
309            sales_series := await analytics_querier.create_graph_series(
310                CreateGraphSeriesParams(
311                    graph_id=graph.graph_id, series_name="sales", sort_index=0
312                )
313            )
314        ) is None:
315            logger.exception("failed to create sales series for forecast")
316            return
317        if (
318            posted_series := await analytics_querier.create_graph_series(
319                CreateGraphSeriesParams(
320                    graph_id=graph.graph_id, series_name="posted", sort_index=1
321                )
322            )
323        ) is None:
324            logger.exception("failed to create posted series for forecast")
325            return
326        for i, point in enumerate(
327            SellerAnalytics.graph_weekly_sales_vs_posted(bundle_rows, reservation_rows)
328        ):
329            if (
330                await analytics_querier.create_graph_point(
331                    CreateGraphPointParams(
332                        series_id=sales_series.series_id,
333                        sort_index=i,
334                        x=point.day.strftime("%Y-%m-%d"),
335                        y=Decimal(point.sold_qty),
336                    )
337                )
338                is None
339            ):
340                logger.exception("failed to create point for sales series for forecast")
341                return
342            if (
343                await analytics_querier.create_graph_point(
344                    CreateGraphPointParams(
345                        series_id=posted_series.series_id,
346                        sort_index=i,
347                        x=point.day.strftime("%Y-%m-%d"),
348                        y=Decimal(point.posted_qty),
349                    )
350                )
351                is None
352            ):
353                logger.exception(
354                    "failed to create point for posted series for forecast"
355                )
356                return
357
358    @staticmethod
359    async def add_forecast_outputs(
360        forecast_querier: ForecastQuerier,
361        category_querier: CategoryQuerier,
362        seller_querier: SellerQuerier,
363        seller_id: int,
364        conn: AsyncConnection,
365    ) -> None:
366        """Save forecasts for seller's future bundles to forecast_output table."""
367        history = [
368            row
369            async for row in forecast_querier.get_forecast_inputs_by_seller(
370                seller_id=seller_id
371            )
372        ]
373
374        seller = await seller_querier.get_seller(user_id=seller_id)
375        if seller is None:
376            logger.exception("failed to get seller for forecast")
377            return
378
379        bundles_querier = BundleQuerier(conn)
380        bundles = [
381            b async for b in bundles_querier.get_sellers_bundles(seller_id=seller_id)
382        ]
383        future_bundles = [
384            b for b in bundles if b.window_start > datetime.datetime.now(datetime.UTC)
385        ]
386
387        bundle_queries: list[tuple[int, ForecastQuery]] = []
388        for bundle in future_bundles:
389            categories = [
390                cat_id
391                async for cat_id in category_querier.get_bundle_categories(
392                    bundle_id=bundle.bundle_id
393                )
394            ]
395            details = BundleDetails(
396                bundle_id=bundle.bundle_id,
397                bundle_date=bundle.window_start.date(),
398                window_start=bundle.window_start,
399                window_end=bundle.window_end,
400                seller_id=bundle.seller_id,
401                category_ids=categories,
402                latitude=seller.latitude,
403                longitude=seller.longitude,
404                posted_qty=bundle.total_qty,
405            )
406            query = build_forecast_query(details)
407            bundle_queries.append((bundle.bundle_id, query))
408
409        forecasts = generate_seller_forecasts(history, bundle_queries)
410
411        for forecast in forecasts:
412            await forecast_querier.upsert_forecast_output(
413                UpsertForecastOutputParams(
414                    bundle_id=forecast.bundle_id,
415                    seller_id=forecast.seller_id,
416                    window_start=forecast.window_start,
417                    predicted_sales=forecast.predicted_sales,
418                    posted_qty=forecast.posted_qty,
419                    predicted_no_show_prob=forecast.predicted_no_show_prob,
420                    confidence=forecast.confidence,
421                    rationale=forecast.rationale,
422                )
423            )
424
425    @staticmethod
426    async def process_analytics(seller_id: int) -> None:
427        """Background graph processing.
428
429        Args:
430            seller_id: seller_id
431
432        """
433        async for conn in database_manager.get_connection():
434            analytics_querier = AnalyticsQuerier(conn)
435            graph_types = analytics_querier.get_graphs_types()
436            async for graph_type in graph_types:
437                if (
438                    graph := await analytics_querier.get_graph(
439                        GetGraphParams(
440                            seller_id=seller_id, graph_type=graph_type.graph_type_id
441                        )
442                    )
443                ) is not None:
444                    await analytics_querier.delete_graph_series(graph_id=graph.graph_id)
445                elif (
446                    await analytics_querier.create_graph(
447                        CreateGraphParams(
448                            seller_id=seller_id, graph_type=graph_type.graph_type_id
449                        )
450                    )
451                    is None
452                ):
453                    logger.exception("Failed to create analytics graph")
454                    return
455            seller_bundles = BundleQuerier(conn).get_sellers_bundles(
456                seller_id=seller_id
457            )
458            seller_reservations = ReservationQuerier(conn).get_seller_reservations_full(
459                seller_id=seller_id
460            )
461            bundle_rows = [
462                BundleRow(
463                    bundle_date=bundle.window_start.date(), total_qty=bundle.total_qty
464                )
465                async for bundle in seller_bundles
466            ]
467            reservation_rows = [
468                ReservationRow(
469                    bundle_date=reservation.window_start.date(),
470                    window_start=reservation.window_start.time(),
471                    category_ids=reservation.category_ids,
472                    collected_at=reservation.collected_at,
473                )
474                async for reservation in seller_reservations
475            ]
476            await AnalyticsProcesser.add_sales_vs_posted_graph(
477                analytics_querier, seller_id, bundle_rows, reservation_rows
478            )
479            await AnalyticsProcesser.add_sell_through_rate_graph(
480                analytics_querier, seller_id, bundle_rows, reservation_rows
481            )
482            await AnalyticsProcesser.add_cateogry_distribution_graph(
483                analytics_querier, seller_id, reservation_rows, conn
484            )
485            await AnalyticsProcesser.add_time_window_distribution_graph(
486                analytics_querier, seller_id, reservation_rows
487            )
488            await AnalyticsProcesser.add_forecast_graph(
489                analytics_querier, seller_id, bundle_rows, reservation_rows
490            )
491            await AnalyticsProcesser.add_forecast_outputs(
492                ForecastQuerier(conn),
493                CategoryQuerier(conn),
494                SellerQuerier(conn),
495                seller_id,
496                conn,
497            )
class AnalyticsProcesser:
 29class AnalyticsProcesser:
 30    """Analytics processing."""
 31
 32    background_tasks: BackgroundTasks
 33
 34    def __init__(self, background_tasks: BackgroundTasks) -> None:
 35        """Init processing for seller."""
 36        self.background_tasks = background_tasks
 37
 38    def run(self, seller_id: int) -> None:
 39        """Starts background analytics task.
 40
 41        Args:
 42            seller_id: seller id
 43        """
 44        self.background_tasks.add_task(self.process_analytics, seller_id)
 45
 46    @staticmethod
 47    async def add_sales_vs_posted_graph(
 48        analytics_querier: AnalyticsQuerier,
 49        seller_id: int,
 50        bundle_rows: list[BundleRow],
 51        reservation_rows: list[ReservationRow],
 52    ) -> None:
 53        """Add seles vs posted graph.
 54
 55        Args:
 56            analytics_querier: async analytics queries
 57            seller_id: seller id
 58            bundle_rows: formatted bundle rows
 59            reservation_rows: formatted reservation rows
 60        """
 61        if (
 62            graph := await analytics_querier.get_graph(
 63                GetGraphParams(seller_id=seller_id, graph_type=1)
 64            )
 65        ) is None:
 66            logger.exception("failed to get seles_vs_posted graph")
 67            return
 68        if (
 69            sales_series := await analytics_querier.create_graph_series(
 70                CreateGraphSeriesParams(
 71                    graph_id=graph.graph_id, series_name="sales", sort_index=0
 72                )
 73            )
 74        ) is None:
 75            logger.exception("failed to create sales series for sales_vs_posted")
 76            return
 77        if (
 78            posted_series := await analytics_querier.create_graph_series(
 79                CreateGraphSeriesParams(
 80                    graph_id=graph.graph_id, series_name="posted", sort_index=1
 81                )
 82            )
 83        ) is None:
 84            logger.exception("failed to create posted series for sales_vs_posted")
 85            return
 86        for i, point in enumerate(
 87            SellerAnalytics.graph_weekly_sales_vs_posted(bundle_rows, reservation_rows)
 88        ):
 89            if (
 90                await analytics_querier.create_graph_point(
 91                    CreateGraphPointParams(
 92                        series_id=sales_series.series_id,
 93                        sort_index=i,
 94                        x=point.day.strftime("%Y-%m-%d"),
 95                        y=Decimal(point.sold_qty),
 96                    )
 97                )
 98                is None
 99            ):
100                logger.exception(
101                    "failed to create point for sales series for sales_vs_posted"
102                )
103                return
104            if (
105                await analytics_querier.create_graph_point(
106                    CreateGraphPointParams(
107                        series_id=posted_series.series_id,
108                        sort_index=i,
109                        x=point.day.strftime("%Y-%m-%d"),
110                        y=Decimal(point.posted_qty),
111                    )
112                )
113                is None
114            ):
115                logger.exception(
116                    "failed to create point for sales series for sales_vs_posted"
117                )
118                return
119
120    @staticmethod
121    async def add_sell_through_rate_graph(
122        analytics_querier: AnalyticsQuerier,
123        seller_id: int,
124        bundle_rows: list[BundleRow],
125        reservation_rows: list[ReservationRow],
126    ) -> None:
127        """Add sell through rate graph.
128
129        Args:
130            analytics_querier: async analytics queries
131            seller_id: seller id
132            bundle_rows: formatted bundle rows
133            reservation_rows: formatted reservation rows
134        """
135        if (
136            graph := await analytics_querier.get_graph(
137                GetGraphParams(seller_id=seller_id, graph_type=2)
138            )
139        ) is None:
140            logger.exception("failed to get sell_through_rate graph")
141            return
142        if (
143            sell_series := await analytics_querier.create_graph_series(
144                CreateGraphSeriesParams(
145                    graph_id=graph.graph_id, series_name="sell_rate", sort_index=0
146                )
147            )
148        ) is None:
149            logger.exception("failed to create sell series for sell_through_rate graph")
150            return
151        sell_through_rate = SellerAnalytics.graph_sell_through_rate(
152            bundle_rows, reservation_rows
153        ).sell_through_percentage
154        if (
155            await analytics_querier.create_graph_point(
156                CreateGraphPointParams(
157                    series_id=sell_series.series_id,
158                    sort_index=0,
159                    x="sold",
160                    y=Decimal(sell_through_rate),
161                )
162            )
163        ) is None:
164            logger.exception("failed to create sold point for sell_through_rate graph")
165            return
166        if (
167            await analytics_querier.create_graph_point(
168                CreateGraphPointParams(
169                    series_id=sell_series.series_id,
170                    sort_index=1,
171                    x="unsold",
172                    y=Decimal(100 - sell_through_rate),
173                )
174            )
175        ) is None:
176            logger.exception(
177                "failed to create unsold point for sell_through_rate graph"
178            )
179            return
180
181    @staticmethod
182    async def add_cateogry_distribution_graph(
183        analytics_querier: AnalyticsQuerier,
184        seller_id: int,
185        reservation_rows: list[ReservationRow],
186        conn: AsyncConnection,
187    ) -> None:
188        """Add category distribution graph.
189
190        Args:
191            analytics_querier: async analytics queries
192            seller_id: seller id
193            reservation_rows: formatted reservation rows
194            conn: database connection
195        """
196        if (
197            graph := await analytics_querier.get_graph(
198                GetGraphParams(seller_id=seller_id, graph_type=3)
199            )
200        ) is None:
201            logger.exception("failed to get category_distribution graph")
202            return
203        if (
204            categories_series := await analytics_querier.create_graph_series(
205                CreateGraphSeriesParams(
206                    graph_id=graph.graph_id, series_name="categories", sort_index=0
207                )
208            )
209        ) is None:
210            logger.exception(
211                "failed to create categories series for category_distribution graph"
212            )
213            return
214        category_distribution = SellerAnalytics.graph_category_distribution(
215            reservation_rows
216        )
217        category_querier = CategoryQuerier(conn)
218        for i, category in enumerate(category_distribution):
219            if (
220                category_row := await category_querier.get_category(
221                    category_id=category.category_id
222                )
223            ) is None:
224                logger.exception("failed to get category")
225                return
226            if (
227                await analytics_querier.create_graph_point(
228                    CreateGraphPointParams(
229                        series_id=categories_series.series_id,
230                        sort_index=i,
231                        x=category_row.category_name,
232                        y=Decimal(category.collected_qty),
233                    )
234                )
235            ) is None:
236                logger.exception(
237                    "failed to create category point for category_distribution graph"
238                )
239                return
240
241    @staticmethod
242    async def add_time_window_distribution_graph(
243        analytics_querier: AnalyticsQuerier,
244        seller_id: int,
245        reservation_rows: list[ReservationRow],
246    ) -> None:
247        """Add time window distribution graph.
248
249        Args:
250            analytics_querier: async analytics queries
251            seller_id: seller id
252            reservation_rows: formatted reservation rows
253        """
254        if (
255            graph := await analytics_querier.get_graph(
256                GetGraphParams(seller_id=seller_id, graph_type=4)
257            )
258        ) is None:
259            logger.exception("failed to get time_window_distribution graph")
260            return
261        if (
262            time_windows_series := await analytics_querier.create_graph_series(
263                CreateGraphSeriesParams(
264                    graph_id=graph.graph_id, series_name="time_windows", sort_index=0
265                )
266            )
267        ) is None:
268            logger.exception("failed to create time_windows series")
269            return
270        time_windows_distribution = SellerAnalytics.graph_time_window_distribution(
271            reservation_rows
272        )
273        for i, time_window in enumerate(time_windows_distribution):
274            if (
275                await analytics_querier.create_graph_point(
276                    CreateGraphPointParams(
277                        series_id=time_windows_series.series_id,
278                        sort_index=i,
279                        x=time_window.time_window.strftime("%H:%M"),
280                        y=Decimal(time_window.collected_qty),
281                    )
282                )
283            ) is None:
284                logger.exception("failed to create time_window point")
285                return
286
287    @staticmethod
288    async def add_forecast_graph(
289        analytics_querier: AnalyticsQuerier,
290        seller_id: int,
291        bundle_rows: list[BundleRow],
292        reservation_rows: list[ReservationRow],
293    ) -> None:
294        """Add forecast vs posted graph.
295
296        Args:
297            analytics_querier: async analytics queries
298            seller_id: seller id
299            bundle_rows: formatted bundle rows
300            reservation_rows: formatted reservation rows
301        """
302        if (
303            graph := await analytics_querier.get_graph(
304                GetGraphParams(seller_id=seller_id, graph_type=5)
305            )
306        ) is None:
307            logger.exception("failed to get forecast graph")
308            return
309        if (
310            sales_series := await analytics_querier.create_graph_series(
311                CreateGraphSeriesParams(
312                    graph_id=graph.graph_id, series_name="sales", sort_index=0
313                )
314            )
315        ) is None:
316            logger.exception("failed to create sales series for forecast")
317            return
318        if (
319            posted_series := await analytics_querier.create_graph_series(
320                CreateGraphSeriesParams(
321                    graph_id=graph.graph_id, series_name="posted", sort_index=1
322                )
323            )
324        ) is None:
325            logger.exception("failed to create posted series for forecast")
326            return
327        for i, point in enumerate(
328            SellerAnalytics.graph_weekly_sales_vs_posted(bundle_rows, reservation_rows)
329        ):
330            if (
331                await analytics_querier.create_graph_point(
332                    CreateGraphPointParams(
333                        series_id=sales_series.series_id,
334                        sort_index=i,
335                        x=point.day.strftime("%Y-%m-%d"),
336                        y=Decimal(point.sold_qty),
337                    )
338                )
339                is None
340            ):
341                logger.exception("failed to create point for sales series for forecast")
342                return
343            if (
344                await analytics_querier.create_graph_point(
345                    CreateGraphPointParams(
346                        series_id=posted_series.series_id,
347                        sort_index=i,
348                        x=point.day.strftime("%Y-%m-%d"),
349                        y=Decimal(point.posted_qty),
350                    )
351                )
352                is None
353            ):
354                logger.exception(
355                    "failed to create point for posted series for forecast"
356                )
357                return
358
359    @staticmethod
360    async def add_forecast_outputs(
361        forecast_querier: ForecastQuerier,
362        category_querier: CategoryQuerier,
363        seller_querier: SellerQuerier,
364        seller_id: int,
365        conn: AsyncConnection,
366    ) -> None:
367        """Save forecasts for seller's future bundles to forecast_output table."""
368        history = [
369            row
370            async for row in forecast_querier.get_forecast_inputs_by_seller(
371                seller_id=seller_id
372            )
373        ]
374
375        seller = await seller_querier.get_seller(user_id=seller_id)
376        if seller is None:
377            logger.exception("failed to get seller for forecast")
378            return
379
380        bundles_querier = BundleQuerier(conn)
381        bundles = [
382            b async for b in bundles_querier.get_sellers_bundles(seller_id=seller_id)
383        ]
384        future_bundles = [
385            b for b in bundles if b.window_start > datetime.datetime.now(datetime.UTC)
386        ]
387
388        bundle_queries: list[tuple[int, ForecastQuery]] = []
389        for bundle in future_bundles:
390            categories = [
391                cat_id
392                async for cat_id in category_querier.get_bundle_categories(
393                    bundle_id=bundle.bundle_id
394                )
395            ]
396            details = BundleDetails(
397                bundle_id=bundle.bundle_id,
398                bundle_date=bundle.window_start.date(),
399                window_start=bundle.window_start,
400                window_end=bundle.window_end,
401                seller_id=bundle.seller_id,
402                category_ids=categories,
403                latitude=seller.latitude,
404                longitude=seller.longitude,
405                posted_qty=bundle.total_qty,
406            )
407            query = build_forecast_query(details)
408            bundle_queries.append((bundle.bundle_id, query))
409
410        forecasts = generate_seller_forecasts(history, bundle_queries)
411
412        for forecast in forecasts:
413            await forecast_querier.upsert_forecast_output(
414                UpsertForecastOutputParams(
415                    bundle_id=forecast.bundle_id,
416                    seller_id=forecast.seller_id,
417                    window_start=forecast.window_start,
418                    predicted_sales=forecast.predicted_sales,
419                    posted_qty=forecast.posted_qty,
420                    predicted_no_show_prob=forecast.predicted_no_show_prob,
421                    confidence=forecast.confidence,
422                    rationale=forecast.rationale,
423                )
424            )
425
426    @staticmethod
427    async def process_analytics(seller_id: int) -> None:
428        """Background graph processing.
429
430        Args:
431            seller_id: seller_id
432
433        """
434        async for conn in database_manager.get_connection():
435            analytics_querier = AnalyticsQuerier(conn)
436            graph_types = analytics_querier.get_graphs_types()
437            async for graph_type in graph_types:
438                if (
439                    graph := await analytics_querier.get_graph(
440                        GetGraphParams(
441                            seller_id=seller_id, graph_type=graph_type.graph_type_id
442                        )
443                    )
444                ) is not None:
445                    await analytics_querier.delete_graph_series(graph_id=graph.graph_id)
446                elif (
447                    await analytics_querier.create_graph(
448                        CreateGraphParams(
449                            seller_id=seller_id, graph_type=graph_type.graph_type_id
450                        )
451                    )
452                    is None
453                ):
454                    logger.exception("Failed to create analytics graph")
455                    return
456            seller_bundles = BundleQuerier(conn).get_sellers_bundles(
457                seller_id=seller_id
458            )
459            seller_reservations = ReservationQuerier(conn).get_seller_reservations_full(
460                seller_id=seller_id
461            )
462            bundle_rows = [
463                BundleRow(
464                    bundle_date=bundle.window_start.date(), total_qty=bundle.total_qty
465                )
466                async for bundle in seller_bundles
467            ]
468            reservation_rows = [
469                ReservationRow(
470                    bundle_date=reservation.window_start.date(),
471                    window_start=reservation.window_start.time(),
472                    category_ids=reservation.category_ids,
473                    collected_at=reservation.collected_at,
474                )
475                async for reservation in seller_reservations
476            ]
477            await AnalyticsProcesser.add_sales_vs_posted_graph(
478                analytics_querier, seller_id, bundle_rows, reservation_rows
479            )
480            await AnalyticsProcesser.add_sell_through_rate_graph(
481                analytics_querier, seller_id, bundle_rows, reservation_rows
482            )
483            await AnalyticsProcesser.add_cateogry_distribution_graph(
484                analytics_querier, seller_id, reservation_rows, conn
485            )
486            await AnalyticsProcesser.add_time_window_distribution_graph(
487                analytics_querier, seller_id, reservation_rows
488            )
489            await AnalyticsProcesser.add_forecast_graph(
490                analytics_querier, seller_id, bundle_rows, reservation_rows
491            )
492            await AnalyticsProcesser.add_forecast_outputs(
493                ForecastQuerier(conn),
494                CategoryQuerier(conn),
495                SellerQuerier(conn),
496                seller_id,
497                conn,
498            )

Analytics processing.

AnalyticsProcesser(background_tasks: fastapi.background.BackgroundTasks)
34    def __init__(self, background_tasks: BackgroundTasks) -> None:
35        """Init processing for seller."""
36        self.background_tasks = background_tasks

Init processing for seller.

background_tasks: fastapi.background.BackgroundTasks
def run(self, seller_id: int) -> None:
38    def run(self, seller_id: int) -> None:
39        """Starts background analytics task.
40
41        Args:
42            seller_id: seller id
43        """
44        self.background_tasks.add_task(self.process_analytics, seller_id)

Starts background analytics task.

Arguments:
  • seller_id: seller id
@staticmethod
async def add_sales_vs_posted_graph( analytics_querier: internal.queries.analytics.AsyncQuerier, seller_id: int, bundle_rows: list[internal.analytics.graphs.BundleRow], reservation_rows: list[internal.analytics.graphs.ReservationRow]) -> None:
 46    @staticmethod
 47    async def add_sales_vs_posted_graph(
 48        analytics_querier: AnalyticsQuerier,
 49        seller_id: int,
 50        bundle_rows: list[BundleRow],
 51        reservation_rows: list[ReservationRow],
 52    ) -> None:
 53        """Add seles vs posted graph.
 54
 55        Args:
 56            analytics_querier: async analytics queries
 57            seller_id: seller id
 58            bundle_rows: formatted bundle rows
 59            reservation_rows: formatted reservation rows
 60        """
 61        if (
 62            graph := await analytics_querier.get_graph(
 63                GetGraphParams(seller_id=seller_id, graph_type=1)
 64            )
 65        ) is None:
 66            logger.exception("failed to get seles_vs_posted graph")
 67            return
 68        if (
 69            sales_series := await analytics_querier.create_graph_series(
 70                CreateGraphSeriesParams(
 71                    graph_id=graph.graph_id, series_name="sales", sort_index=0
 72                )
 73            )
 74        ) is None:
 75            logger.exception("failed to create sales series for sales_vs_posted")
 76            return
 77        if (
 78            posted_series := await analytics_querier.create_graph_series(
 79                CreateGraphSeriesParams(
 80                    graph_id=graph.graph_id, series_name="posted", sort_index=1
 81                )
 82            )
 83        ) is None:
 84            logger.exception("failed to create posted series for sales_vs_posted")
 85            return
 86        for i, point in enumerate(
 87            SellerAnalytics.graph_weekly_sales_vs_posted(bundle_rows, reservation_rows)
 88        ):
 89            if (
 90                await analytics_querier.create_graph_point(
 91                    CreateGraphPointParams(
 92                        series_id=sales_series.series_id,
 93                        sort_index=i,
 94                        x=point.day.strftime("%Y-%m-%d"),
 95                        y=Decimal(point.sold_qty),
 96                    )
 97                )
 98                is None
 99            ):
100                logger.exception(
101                    "failed to create point for sales series for sales_vs_posted"
102                )
103                return
104            if (
105                await analytics_querier.create_graph_point(
106                    CreateGraphPointParams(
107                        series_id=posted_series.series_id,
108                        sort_index=i,
109                        x=point.day.strftime("%Y-%m-%d"),
110                        y=Decimal(point.posted_qty),
111                    )
112                )
113                is None
114            ):
115                logger.exception(
116                    "failed to create point for sales series for sales_vs_posted"
117                )
118                return

Add seles vs posted graph.

Arguments:
  • analytics_querier: async analytics queries
  • seller_id: seller id
  • bundle_rows: formatted bundle rows
  • reservation_rows: formatted reservation rows
@staticmethod
async def add_sell_through_rate_graph( analytics_querier: internal.queries.analytics.AsyncQuerier, seller_id: int, bundle_rows: list[internal.analytics.graphs.BundleRow], reservation_rows: list[internal.analytics.graphs.ReservationRow]) -> None:
120    @staticmethod
121    async def add_sell_through_rate_graph(
122        analytics_querier: AnalyticsQuerier,
123        seller_id: int,
124        bundle_rows: list[BundleRow],
125        reservation_rows: list[ReservationRow],
126    ) -> None:
127        """Add sell through rate graph.
128
129        Args:
130            analytics_querier: async analytics queries
131            seller_id: seller id
132            bundle_rows: formatted bundle rows
133            reservation_rows: formatted reservation rows
134        """
135        if (
136            graph := await analytics_querier.get_graph(
137                GetGraphParams(seller_id=seller_id, graph_type=2)
138            )
139        ) is None:
140            logger.exception("failed to get sell_through_rate graph")
141            return
142        if (
143            sell_series := await analytics_querier.create_graph_series(
144                CreateGraphSeriesParams(
145                    graph_id=graph.graph_id, series_name="sell_rate", sort_index=0
146                )
147            )
148        ) is None:
149            logger.exception("failed to create sell series for sell_through_rate graph")
150            return
151        sell_through_rate = SellerAnalytics.graph_sell_through_rate(
152            bundle_rows, reservation_rows
153        ).sell_through_percentage
154        if (
155            await analytics_querier.create_graph_point(
156                CreateGraphPointParams(
157                    series_id=sell_series.series_id,
158                    sort_index=0,
159                    x="sold",
160                    y=Decimal(sell_through_rate),
161                )
162            )
163        ) is None:
164            logger.exception("failed to create sold point for sell_through_rate graph")
165            return
166        if (
167            await analytics_querier.create_graph_point(
168                CreateGraphPointParams(
169                    series_id=sell_series.series_id,
170                    sort_index=1,
171                    x="unsold",
172                    y=Decimal(100 - sell_through_rate),
173                )
174            )
175        ) is None:
176            logger.exception(
177                "failed to create unsold point for sell_through_rate graph"
178            )
179            return

Add sell through rate graph.

Arguments:
  • analytics_querier: async analytics queries
  • seller_id: seller id
  • bundle_rows: formatted bundle rows
  • reservation_rows: formatted reservation rows
@staticmethod
async def add_cateogry_distribution_graph( analytics_querier: internal.queries.analytics.AsyncQuerier, seller_id: int, reservation_rows: list[internal.analytics.graphs.ReservationRow], conn: sqlalchemy.ext.asyncio.engine.AsyncConnection) -> None:
181    @staticmethod
182    async def add_cateogry_distribution_graph(
183        analytics_querier: AnalyticsQuerier,
184        seller_id: int,
185        reservation_rows: list[ReservationRow],
186        conn: AsyncConnection,
187    ) -> None:
188        """Add category distribution graph.
189
190        Args:
191            analytics_querier: async analytics queries
192            seller_id: seller id
193            reservation_rows: formatted reservation rows
194            conn: database connection
195        """
196        if (
197            graph := await analytics_querier.get_graph(
198                GetGraphParams(seller_id=seller_id, graph_type=3)
199            )
200        ) is None:
201            logger.exception("failed to get category_distribution graph")
202            return
203        if (
204            categories_series := await analytics_querier.create_graph_series(
205                CreateGraphSeriesParams(
206                    graph_id=graph.graph_id, series_name="categories", sort_index=0
207                )
208            )
209        ) is None:
210            logger.exception(
211                "failed to create categories series for category_distribution graph"
212            )
213            return
214        category_distribution = SellerAnalytics.graph_category_distribution(
215            reservation_rows
216        )
217        category_querier = CategoryQuerier(conn)
218        for i, category in enumerate(category_distribution):
219            if (
220                category_row := await category_querier.get_category(
221                    category_id=category.category_id
222                )
223            ) is None:
224                logger.exception("failed to get category")
225                return
226            if (
227                await analytics_querier.create_graph_point(
228                    CreateGraphPointParams(
229                        series_id=categories_series.series_id,
230                        sort_index=i,
231                        x=category_row.category_name,
232                        y=Decimal(category.collected_qty),
233                    )
234                )
235            ) is None:
236                logger.exception(
237                    "failed to create category point for category_distribution graph"
238                )
239                return

Add category distribution graph.

Arguments:
  • analytics_querier: async analytics queries
  • seller_id: seller id
  • reservation_rows: formatted reservation rows
  • conn: database connection
@staticmethod
async def add_time_window_distribution_graph( analytics_querier: internal.queries.analytics.AsyncQuerier, seller_id: int, reservation_rows: list[internal.analytics.graphs.ReservationRow]) -> None:
241    @staticmethod
242    async def add_time_window_distribution_graph(
243        analytics_querier: AnalyticsQuerier,
244        seller_id: int,
245        reservation_rows: list[ReservationRow],
246    ) -> None:
247        """Add time window distribution graph.
248
249        Args:
250            analytics_querier: async analytics queries
251            seller_id: seller id
252            reservation_rows: formatted reservation rows
253        """
254        if (
255            graph := await analytics_querier.get_graph(
256                GetGraphParams(seller_id=seller_id, graph_type=4)
257            )
258        ) is None:
259            logger.exception("failed to get time_window_distribution graph")
260            return
261        if (
262            time_windows_series := await analytics_querier.create_graph_series(
263                CreateGraphSeriesParams(
264                    graph_id=graph.graph_id, series_name="time_windows", sort_index=0
265                )
266            )
267        ) is None:
268            logger.exception("failed to create time_windows series")
269            return
270        time_windows_distribution = SellerAnalytics.graph_time_window_distribution(
271            reservation_rows
272        )
273        for i, time_window in enumerate(time_windows_distribution):
274            if (
275                await analytics_querier.create_graph_point(
276                    CreateGraphPointParams(
277                        series_id=time_windows_series.series_id,
278                        sort_index=i,
279                        x=time_window.time_window.strftime("%H:%M"),
280                        y=Decimal(time_window.collected_qty),
281                    )
282                )
283            ) is None:
284                logger.exception("failed to create time_window point")
285                return

Add time window distribution graph.

Arguments:
  • analytics_querier: async analytics queries
  • seller_id: seller id
  • reservation_rows: formatted reservation rows
@staticmethod
async def add_forecast_graph( analytics_querier: internal.queries.analytics.AsyncQuerier, seller_id: int, bundle_rows: list[internal.analytics.graphs.BundleRow], reservation_rows: list[internal.analytics.graphs.ReservationRow]) -> None:
287    @staticmethod
288    async def add_forecast_graph(
289        analytics_querier: AnalyticsQuerier,
290        seller_id: int,
291        bundle_rows: list[BundleRow],
292        reservation_rows: list[ReservationRow],
293    ) -> None:
294        """Add forecast vs posted graph.
295
296        Args:
297            analytics_querier: async analytics queries
298            seller_id: seller id
299            bundle_rows: formatted bundle rows
300            reservation_rows: formatted reservation rows
301        """
302        if (
303            graph := await analytics_querier.get_graph(
304                GetGraphParams(seller_id=seller_id, graph_type=5)
305            )
306        ) is None:
307            logger.exception("failed to get forecast graph")
308            return
309        if (
310            sales_series := await analytics_querier.create_graph_series(
311                CreateGraphSeriesParams(
312                    graph_id=graph.graph_id, series_name="sales", sort_index=0
313                )
314            )
315        ) is None:
316            logger.exception("failed to create sales series for forecast")
317            return
318        if (
319            posted_series := await analytics_querier.create_graph_series(
320                CreateGraphSeriesParams(
321                    graph_id=graph.graph_id, series_name="posted", sort_index=1
322                )
323            )
324        ) is None:
325            logger.exception("failed to create posted series for forecast")
326            return
327        for i, point in enumerate(
328            SellerAnalytics.graph_weekly_sales_vs_posted(bundle_rows, reservation_rows)
329        ):
330            if (
331                await analytics_querier.create_graph_point(
332                    CreateGraphPointParams(
333                        series_id=sales_series.series_id,
334                        sort_index=i,
335                        x=point.day.strftime("%Y-%m-%d"),
336                        y=Decimal(point.sold_qty),
337                    )
338                )
339                is None
340            ):
341                logger.exception("failed to create point for sales series for forecast")
342                return
343            if (
344                await analytics_querier.create_graph_point(
345                    CreateGraphPointParams(
346                        series_id=posted_series.series_id,
347                        sort_index=i,
348                        x=point.day.strftime("%Y-%m-%d"),
349                        y=Decimal(point.posted_qty),
350                    )
351                )
352                is None
353            ):
354                logger.exception(
355                    "failed to create point for posted series for forecast"
356                )
357                return

Add forecast vs posted graph.

Arguments:
  • analytics_querier: async analytics queries
  • seller_id: seller id
  • bundle_rows: formatted bundle rows
  • reservation_rows: formatted reservation rows
@staticmethod
async def add_forecast_outputs( forecast_querier: internal.queries.forecast.AsyncQuerier, category_querier: internal.queries.category.AsyncQuerier, seller_querier: internal.queries.seller.AsyncQuerier, seller_id: int, conn: sqlalchemy.ext.asyncio.engine.AsyncConnection) -> None:
359    @staticmethod
360    async def add_forecast_outputs(
361        forecast_querier: ForecastQuerier,
362        category_querier: CategoryQuerier,
363        seller_querier: SellerQuerier,
364        seller_id: int,
365        conn: AsyncConnection,
366    ) -> None:
367        """Save forecasts for seller's future bundles to forecast_output table."""
368        history = [
369            row
370            async for row in forecast_querier.get_forecast_inputs_by_seller(
371                seller_id=seller_id
372            )
373        ]
374
375        seller = await seller_querier.get_seller(user_id=seller_id)
376        if seller is None:
377            logger.exception("failed to get seller for forecast")
378            return
379
380        bundles_querier = BundleQuerier(conn)
381        bundles = [
382            b async for b in bundles_querier.get_sellers_bundles(seller_id=seller_id)
383        ]
384        future_bundles = [
385            b for b in bundles if b.window_start > datetime.datetime.now(datetime.UTC)
386        ]
387
388        bundle_queries: list[tuple[int, ForecastQuery]] = []
389        for bundle in future_bundles:
390            categories = [
391                cat_id
392                async for cat_id in category_querier.get_bundle_categories(
393                    bundle_id=bundle.bundle_id
394                )
395            ]
396            details = BundleDetails(
397                bundle_id=bundle.bundle_id,
398                bundle_date=bundle.window_start.date(),
399                window_start=bundle.window_start,
400                window_end=bundle.window_end,
401                seller_id=bundle.seller_id,
402                category_ids=categories,
403                latitude=seller.latitude,
404                longitude=seller.longitude,
405                posted_qty=bundle.total_qty,
406            )
407            query = build_forecast_query(details)
408            bundle_queries.append((bundle.bundle_id, query))
409
410        forecasts = generate_seller_forecasts(history, bundle_queries)
411
412        for forecast in forecasts:
413            await forecast_querier.upsert_forecast_output(
414                UpsertForecastOutputParams(
415                    bundle_id=forecast.bundle_id,
416                    seller_id=forecast.seller_id,
417                    window_start=forecast.window_start,
418                    predicted_sales=forecast.predicted_sales,
419                    posted_qty=forecast.posted_qty,
420                    predicted_no_show_prob=forecast.predicted_no_show_prob,
421                    confidence=forecast.confidence,
422                    rationale=forecast.rationale,
423                )
424            )

Save forecasts for seller's future bundles to forecast_output table.

@staticmethod
async def process_analytics(seller_id: int) -> None:
426    @staticmethod
427    async def process_analytics(seller_id: int) -> None:
428        """Background graph processing.
429
430        Args:
431            seller_id: seller_id
432
433        """
434        async for conn in database_manager.get_connection():
435            analytics_querier = AnalyticsQuerier(conn)
436            graph_types = analytics_querier.get_graphs_types()
437            async for graph_type in graph_types:
438                if (
439                    graph := await analytics_querier.get_graph(
440                        GetGraphParams(
441                            seller_id=seller_id, graph_type=graph_type.graph_type_id
442                        )
443                    )
444                ) is not None:
445                    await analytics_querier.delete_graph_series(graph_id=graph.graph_id)
446                elif (
447                    await analytics_querier.create_graph(
448                        CreateGraphParams(
449                            seller_id=seller_id, graph_type=graph_type.graph_type_id
450                        )
451                    )
452                    is None
453                ):
454                    logger.exception("Failed to create analytics graph")
455                    return
456            seller_bundles = BundleQuerier(conn).get_sellers_bundles(
457                seller_id=seller_id
458            )
459            seller_reservations = ReservationQuerier(conn).get_seller_reservations_full(
460                seller_id=seller_id
461            )
462            bundle_rows = [
463                BundleRow(
464                    bundle_date=bundle.window_start.date(), total_qty=bundle.total_qty
465                )
466                async for bundle in seller_bundles
467            ]
468            reservation_rows = [
469                ReservationRow(
470                    bundle_date=reservation.window_start.date(),
471                    window_start=reservation.window_start.time(),
472                    category_ids=reservation.category_ids,
473                    collected_at=reservation.collected_at,
474                )
475                async for reservation in seller_reservations
476            ]
477            await AnalyticsProcesser.add_sales_vs_posted_graph(
478                analytics_querier, seller_id, bundle_rows, reservation_rows
479            )
480            await AnalyticsProcesser.add_sell_through_rate_graph(
481                analytics_querier, seller_id, bundle_rows, reservation_rows
482            )
483            await AnalyticsProcesser.add_cateogry_distribution_graph(
484                analytics_querier, seller_id, reservation_rows, conn
485            )
486            await AnalyticsProcesser.add_time_window_distribution_graph(
487                analytics_querier, seller_id, reservation_rows
488            )
489            await AnalyticsProcesser.add_forecast_graph(
490                analytics_querier, seller_id, bundle_rows, reservation_rows
491            )
492            await AnalyticsProcesser.add_forecast_outputs(
493                ForecastQuerier(conn),
494                CategoryQuerier(conn),
495                SellerQuerier(conn),
496                seller_id,
497                conn,
498            )

Background graph processing.

Arguments:
  • seller_id: seller_id