From fa7de589c1bd0ae37ebaaa07bae0ed3d68e01720 Mon Sep 17 00:00:00 2001 From: Fringg Date: Mon, 2 Mar 2026 06:09:12 +0300 Subject: [PATCH] feat: add admin campaign chart data endpoint with deposits/spending split - Add get_admin_campaign_chart_data() to PartnerStatsService with daily registrations, revenue trends, period comparison, and top registrations - Add total_deposits_kopeks and total_spending_kopeks as separate aggregates - Add 6 Pydantic schemas for admin chart data response - Add GET /{campaign_id}/chart-data endpoint with campaigns:stats permission - Add partner application endpoints and schemas for campaign detailed stats --- app/cabinet/routes/admin_campaigns.py | 20 + app/cabinet/routes/partner_application.py | 69 ++- app/cabinet/schemas/campaigns.py | 58 +++ app/cabinet/schemas/partners.py | 73 +++ app/services/partner_stats_service.py | 563 ++++++++++++++++++++++ 5 files changed, 782 insertions(+), 1 deletion(-) diff --git a/app/cabinet/routes/admin_campaigns.py b/app/cabinet/routes/admin_campaigns.py index fdf69fb9..19b442a5 100644 --- a/app/cabinet/routes/admin_campaigns.py +++ b/app/cabinet/routes/admin_campaigns.py @@ -29,9 +29,11 @@ from app.database.models import ( Tariff, User, ) +from app.services.partner_stats_service import PartnerStatsService from ..dependencies import get_cabinet_db, require_permission from ..schemas.campaigns import ( + AdminCampaignChartDataResponse, AvailablePartnerItem, CampaignCreateRequest, CampaignDetailResponse, @@ -254,6 +256,24 @@ async def get_campaign( ) +@router.get('/{campaign_id}/chart-data', response_model=AdminCampaignChartDataResponse) +async def get_campaign_chart_data( + campaign_id: int, + admin: User = Depends(require_permission('campaigns:stats')), + db: AsyncSession = Depends(get_cabinet_db), +): + """Get chart data for admin campaign analytics.""" + campaign = await get_campaign_by_id(db, campaign_id) + if not campaign: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail='Campaign not found', + ) + + data = await PartnerStatsService.get_admin_campaign_chart_data(db, campaign_id) + return AdminCampaignChartDataResponse(**data) + + @router.get('/{campaign_id}/stats', response_model=CampaignStatisticsResponse) async def get_campaign_stats( campaign_id: int, diff --git a/app/cabinet/routes/partner_application.py b/app/cabinet/routes/partner_application.py index 8b31db03..94c3d5aa 100644 --- a/app/cabinet/routes/partner_application.py +++ b/app/cabinet/routes/partner_application.py @@ -8,13 +8,20 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.config import settings from app.database.models import AdvertisingCampaign, User from app.services.partner_application_service import partner_application_service +from app.services.partner_stats_service import PartnerStatsService from ..dependencies import get_cabinet_db, get_current_cabinet_user from ..schemas.partners import ( + CampaignReferralItem, + DailyStatItem, PartnerApplicationInfo, PartnerApplicationRequest, + PartnerCampaignDetailedStats, PartnerCampaignInfo, PartnerStatusResponse, + PeriodChange, + PeriodComparison, + PeriodStats, ) @@ -77,7 +84,14 @@ async def get_partner_status( AdvertisingCampaign.is_active.is_(True), ) ) - for c in result.scalars().all(): + campaign_models = result.scalars().all() + + # Fetch per-campaign stats in one batch + campaign_ids = [c.id for c in campaign_models] + campaign_stats = await PartnerStatsService.get_per_campaign_stats(db, user.id, campaign_ids) + + for c in campaign_models: + stats = campaign_stats.get(c.id, {}) campaigns.append( PartnerCampaignInfo( id=c.id, @@ -89,6 +103,9 @@ async def get_partner_status( subscription_traffic_gb=c.subscription_traffic_gb, deep_link=_get_campaign_deep_link(c.start_parameter), web_link=_get_campaign_web_link(c.start_parameter), + registrations_count=stats.get('registrations_count', 0), + referrals_count=stats.get('referrals_count', 0), + earnings_kopeks=stats.get('earnings_kopeks', 0), ) ) @@ -100,6 +117,56 @@ async def get_partner_status( ) +@router.get('/campaigns/{campaign_id}/stats', response_model=PartnerCampaignDetailedStats) +async def get_campaign_stats( + campaign_id: int, + user: User = Depends(get_current_cabinet_user), + db: AsyncSession = Depends(get_cabinet_db), +): + """Get detailed stats for a single campaign belonging to the current partner.""" + if not user.is_partner: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail='Partner status required', + ) + + # Verify campaign belongs to this partner + campaign_result = await db.execute( + select(AdvertisingCampaign).where( + AdvertisingCampaign.id == campaign_id, + AdvertisingCampaign.partner_user_id == user.id, + ) + ) + campaign = campaign_result.scalar_one_or_none() + if not campaign: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail='Campaign not found or not assigned to you', + ) + + raw = await PartnerStatsService.get_campaign_detailed_stats(db, user.id, campaign_id) + + return PartnerCampaignDetailedStats( + campaign_id=raw['campaign_id'], + campaign_name=campaign.name, + registrations_count=raw['registrations_count'], + referrals_count=raw['referrals_count'], + earnings_kopeks=raw['earnings_kopeks'], + conversion_rate=raw['conversion_rate'], + earnings_today=raw['earnings_today'], + earnings_week=raw['earnings_week'], + earnings_month=raw['earnings_month'], + daily_stats=[DailyStatItem(**d) for d in raw['daily_stats']], + period_comparison=PeriodComparison( + current=PeriodStats(**raw['period_comparison']['current']), + previous=PeriodStats(**raw['period_comparison']['previous']), + referrals_change=PeriodChange(**raw['period_comparison']['referrals_change']), + earnings_change=PeriodChange(**raw['period_comparison']['earnings_change']), + ), + top_referrals=[CampaignReferralItem(**r) for r in raw['top_referrals']], + ) + + @router.post('/apply', response_model=PartnerApplicationInfo) async def apply_for_partner( request: PartnerApplicationRequest, diff --git a/app/cabinet/schemas/campaigns.py b/app/cabinet/schemas/campaigns.py index 313c8411..1d273473 100644 --- a/app/cabinet/schemas/campaigns.py +++ b/app/cabinet/schemas/campaigns.py @@ -220,3 +220,61 @@ class ServerSquadInfo(BaseModel): squad_uuid: str display_name: str country_code: str | None = None + + +# --- Admin campaign chart data schemas --- + + +class AdminDailyStatItem(BaseModel): + """Daily stat item for admin campaign charts.""" + + date: str + referrals_count: int = 0 # actually registrations, named for frontend compat + earnings_kopeks: int = 0 # actually revenue, named for frontend compat + + +class AdminPeriodStats(BaseModel): + """Period stats for admin campaign comparison.""" + + days: int + referrals_count: int = 0 + earnings_kopeks: int = 0 + + +class AdminPeriodChange(BaseModel): + """Change metrics between periods.""" + + absolute: int = 0 + percent: float = 0.0 + trend: str = 'stable' + + +class AdminPeriodComparison(BaseModel): + """Comparison of current vs previous period.""" + + current: AdminPeriodStats + previous: AdminPeriodStats + referrals_change: AdminPeriodChange + earnings_change: AdminPeriodChange + + +class AdminTopRegistrationItem(BaseModel): + """Top user by spending in a campaign.""" + + id: int + full_name: str + created_at: datetime + has_paid: bool = False + is_active: bool = False + total_earnings_kopeks: int = 0 # actually total spending, named for frontend compat + + +class AdminCampaignChartDataResponse(BaseModel): + """Chart data for admin campaign stats page.""" + + campaign_id: int + total_deposits_kopeks: int = 0 + total_spending_kopeks: int = 0 + daily_stats: list[AdminDailyStatItem] = [] + period_comparison: AdminPeriodComparison + top_registrations: list[AdminTopRegistrationItem] = [] diff --git a/app/cabinet/schemas/partners.py b/app/cabinet/schemas/partners.py index d58346f0..1e49a8f8 100644 --- a/app/cabinet/schemas/partners.py +++ b/app/cabinet/schemas/partners.py @@ -51,6 +51,10 @@ class PartnerCampaignInfo(BaseModel): subscription_traffic_gb: int | None = None deep_link: str | None = None web_link: str | None = None + # Per-campaign statistics + registrations_count: int = 0 + referrals_count: int = 0 + earnings_kopeks: int = 0 class PartnerStatusResponse(BaseModel): @@ -62,6 +66,75 @@ class PartnerStatusResponse(BaseModel): campaigns: list[PartnerCampaignInfo] = [] +# ==================== Campaign detailed stats ==================== + + +class DailyStatItem(BaseModel): + """Single day of campaign stats.""" + + date: str + referrals_count: int = 0 + earnings_kopeks: int = 0 + + +class PeriodStats(BaseModel): + """Stats for a single period.""" + + days: int + referrals_count: int = 0 + earnings_kopeks: int = 0 + + +class PeriodChange(BaseModel): + """Change metrics between periods.""" + + absolute: int = 0 + percent: float = 0.0 + trend: str = 'stable' + + +class PeriodComparison(BaseModel): + """Comparison between current and previous period.""" + + current: PeriodStats + previous: PeriodStats + referrals_change: PeriodChange + earnings_change: PeriodChange + + +class CampaignReferralItem(BaseModel): + """Referral user in campaign stats.""" + + id: int + full_name: str + created_at: datetime + has_paid: bool = False + is_active: bool = False + total_earnings_kopeks: int = 0 + + +class PartnerCampaignDetailedStats(BaseModel): + """Detailed stats for a single campaign.""" + + campaign_id: int + campaign_name: str + # Summary + registrations_count: int = 0 + referrals_count: int = 0 + earnings_kopeks: int = 0 + conversion_rate: float = 0.0 + # Period earnings + earnings_today: int = 0 + earnings_week: int = 0 + earnings_month: int = 0 + # Daily chart (30 days) + daily_stats: list[DailyStatItem] = [] + # Period comparison (this week vs last week) + period_comparison: PeriodComparison + # Top referrals + top_referrals: list[CampaignReferralItem] = [] + + # ==================== Admin-facing ==================== diff --git a/app/services/partner_stats_service.py b/app/services/partner_stats_service.py index a2266bc0..c9fc9f6b 100644 --- a/app/services/partner_stats_service.py +++ b/app/services/partner_stats_service.py @@ -14,12 +14,19 @@ from app.database.models import ( ReferralEarning, Subscription, SubscriptionStatus, + Transaction, + TransactionType, User, ) logger = structlog.get_logger(__name__) +# Constants for campaign detailed stats +DAILY_STATS_DAYS = 30 +TOP_REFERRALS_LIMIT = 5 +PERIOD_COMPARISON_DAYS = 7 + class PartnerStatsService: """Сервис для детальной статистики партнёров.""" @@ -644,6 +651,562 @@ class PartnerStatsService: return result + @classmethod + async def get_campaign_detailed_stats( + cls, + db: AsyncSession, + user_id: int, + campaign_id: int, + ) -> dict[str, Any]: + """Detailed stats for a single campaign owned by the partner.""" + now = datetime.now(UTC) + today_start = now.replace(hour=0, minute=0, second=0, microsecond=0) + week_ago = now - timedelta(days=PERIOD_COMPARISON_DAYS) + month_ago = now - timedelta(days=DAILY_STATS_DAYS) + + # --- Summary: registrations, referrals, earnings --- + basic = await cls.get_per_campaign_stats(db, user_id, [campaign_id]) + summary = basic.get(campaign_id, {'registrations_count': 0, 'referrals_count': 0, 'earnings_kopeks': 0}) + + # --- Period earnings (today / week / month) --- + period_earnings_result = await db.execute( + select( + func.coalesce( + func.sum(case((ReferralEarning.created_at >= today_start, ReferralEarning.amount_kopeks), else_=0)), + 0, + ).label('today'), + func.coalesce( + func.sum(case((ReferralEarning.created_at >= week_ago, ReferralEarning.amount_kopeks), else_=0)), + 0, + ).label('week'), + func.coalesce( + func.sum(case((ReferralEarning.created_at >= month_ago, ReferralEarning.amount_kopeks), else_=0)), + 0, + ).label('month'), + ).where( + and_( + ReferralEarning.user_id == user_id, + ReferralEarning.campaign_id == campaign_id, + ) + ) + ) + pe_row = period_earnings_result.one() + + # --- Daily stats (DAILY_STATS_DAYS days) --- + start_date = now - timedelta(days=DAILY_STATS_DAYS) + + referrals_by_day = await db.execute( + select( + func.date(User.created_at).label('date'), + func.count(User.id).label('count'), + ) + .join(AdvertisingCampaignRegistration, AdvertisingCampaignRegistration.user_id == User.id) + .where( + and_( + User.referred_by_id == user_id, + AdvertisingCampaignRegistration.campaign_id == campaign_id, + User.created_at >= start_date, + ) + ) + .group_by(func.date(User.created_at)) + ) + referrals_dict = {str(row.date): int(row.count) for row in referrals_by_day.all()} + + earnings_by_day = await db.execute( + select( + func.date(ReferralEarning.created_at).label('date'), + func.sum(ReferralEarning.amount_kopeks).label('earnings'), + ) + .where( + and_( + ReferralEarning.user_id == user_id, + ReferralEarning.campaign_id == campaign_id, + ReferralEarning.created_at >= start_date, + ) + ) + .group_by(func.date(ReferralEarning.created_at)) + ) + earnings_dict = {str(row.date): int(row.earnings or 0) for row in earnings_by_day.all()} + + daily_stats = [] + for i in range(DAILY_STATS_DAYS): + date = (start_date + timedelta(days=i)).date() + date_str = str(date) + daily_stats.append( + { + 'date': date_str, + 'referrals_count': referrals_dict.get(date_str, 0), + 'earnings_kopeks': earnings_dict.get(date_str, 0), + } + ) + + # --- Period comparison (this week vs last week) --- + previous_start = week_ago - timedelta(days=PERIOD_COMPARISON_DAYS) + + current_ref_result = await db.execute( + select(func.count(User.id)) + .join(AdvertisingCampaignRegistration, AdvertisingCampaignRegistration.user_id == User.id) + .where( + and_( + User.referred_by_id == user_id, + AdvertisingCampaignRegistration.campaign_id == campaign_id, + User.created_at >= week_ago, + ) + ) + ) + current_referrals = current_ref_result.scalar() or 0 + + previous_ref_result = await db.execute( + select(func.count(User.id)) + .join(AdvertisingCampaignRegistration, AdvertisingCampaignRegistration.user_id == User.id) + .where( + and_( + User.referred_by_id == user_id, + AdvertisingCampaignRegistration.campaign_id == campaign_id, + User.created_at >= previous_start, + User.created_at < week_ago, + ) + ) + ) + previous_referrals = previous_ref_result.scalar() or 0 + + current_earn = await db.execute( + select(func.coalesce(func.sum(ReferralEarning.amount_kopeks), 0)).where( + and_( + ReferralEarning.user_id == user_id, + ReferralEarning.campaign_id == campaign_id, + ReferralEarning.created_at >= week_ago, + ) + ) + ) + current_earnings = int(current_earn.scalar() or 0) + + previous_earn = await db.execute( + select(func.coalesce(func.sum(ReferralEarning.amount_kopeks), 0)).where( + and_( + ReferralEarning.user_id == user_id, + ReferralEarning.campaign_id == campaign_id, + ReferralEarning.created_at >= previous_start, + ReferralEarning.created_at < week_ago, + ) + ) + ) + previous_earnings = int(previous_earn.scalar() or 0) + + def _calc_change(current_val: int, previous_val: int) -> dict[str, Any]: + diff = current_val - previous_val + pct = round((diff / previous_val * 100), 2) if previous_val > 0 else (100.0 if current_val > 0 else 0.0) + trend = 'up' if diff > 0 else 'down' if diff < 0 else 'stable' + return {'absolute': diff, 'percent': pct, 'trend': trend} + + period_comparison = { + 'current': { + 'days': PERIOD_COMPARISON_DAYS, + 'referrals_count': current_referrals, + 'earnings_kopeks': current_earnings, + }, + 'previous': { + 'days': PERIOD_COMPARISON_DAYS, + 'referrals_count': previous_referrals, + 'earnings_kopeks': previous_earnings, + }, + 'referrals_change': _calc_change(current_referrals, previous_referrals), + 'earnings_change': _calc_change(current_earnings, previous_earnings), + } + + # --- Top referrals for this campaign --- + top_result = await db.execute( + select( + User.id, + User.username, + User.first_name, + User.last_name, + User.created_at, + User.has_made_first_topup, + func.coalesce(func.sum(ReferralEarning.amount_kopeks), 0).label('total_earnings'), + ) + .join(AdvertisingCampaignRegistration, AdvertisingCampaignRegistration.user_id == User.id) + .outerjoin( + ReferralEarning, + and_( + ReferralEarning.referral_id == User.id, + ReferralEarning.user_id == user_id, + ReferralEarning.campaign_id == campaign_id, + ), + ) + .where( + and_( + User.referred_by_id == user_id, + AdvertisingCampaignRegistration.campaign_id == campaign_id, + ) + ) + .group_by(User.id) + .order_by(desc(func.coalesce(func.sum(ReferralEarning.amount_kopeks), 0))) + .limit(TOP_REFERRALS_LIMIT) + ) + top_rows = top_result.all() + + # Check active subscriptions for top referrals + top_user_ids = [row.id for row in top_rows] + active_subs: set[int] = set() + if top_user_ids: + active_result = await db.execute( + select(Subscription.user_id).where( + and_( + Subscription.user_id.in_(top_user_ids), + Subscription.status == SubscriptionStatus.ACTIVE.value, + Subscription.end_date > now, + ) + ) + ) + active_subs = {row.user_id for row in active_result.all()} + + top_referrals = [] + for row in top_rows: + full_name = f'{row.first_name or ""} {row.last_name or ""}'.strip() or row.username or f'User #{row.id}' + top_referrals.append( + { + 'id': row.id, + 'full_name': full_name, + 'created_at': row.created_at, + 'has_paid': row.has_made_first_topup, + 'is_active': row.id in active_subs, + 'total_earnings_kopeks': int(row.total_earnings), + } + ) + + # --- Conversion rate --- + reg_count = summary['registrations_count'] + ref_count = summary['referrals_count'] + conversion_rate = round((ref_count / reg_count * 100), 2) if reg_count > 0 else 0.0 + + return { + 'campaign_id': campaign_id, + 'registrations_count': reg_count, + 'referrals_count': ref_count, + 'earnings_kopeks': summary['earnings_kopeks'], + 'conversion_rate': conversion_rate, + 'earnings_today': int(pe_row.today), + 'earnings_week': int(pe_row.week), + 'earnings_month': int(pe_row.month), + 'daily_stats': daily_stats, + 'period_comparison': period_comparison, + 'top_referrals': top_referrals, + } + + @classmethod + async def get_admin_campaign_chart_data( + cls, + db: AsyncSession, + campaign_id: int, + ) -> dict[str, Any]: + """Chart data for admin campaign analytics (no partner filter). + + Unlike get_campaign_detailed_stats which is partner-isolated, + this method returns ALL registrations and revenue for a campaign. + Revenue is based on actual user transactions (deposits + subscription payments), + not referral earnings. + """ + now = datetime.now(UTC) + start_date = now - timedelta(days=DAILY_STATS_DAYS) + week_ago = now - timedelta(days=PERIOD_COMPARISON_DAYS) + previous_start = week_ago - timedelta(days=PERIOD_COMPARISON_DAYS) + + # Subquery: user_ids registered via this campaign + campaign_user_ids_sq = ( + select(AdvertisingCampaignRegistration.user_id) + .where(AdvertisingCampaignRegistration.campaign_id == campaign_id) + .scalar_subquery() + ) + + # --- Daily registrations (DAILY_STATS_DAYS days) --- + registrations_by_day = await db.execute( + select( + func.date(AdvertisingCampaignRegistration.created_at).label('date'), + func.count(AdvertisingCampaignRegistration.id).label('count'), + ) + .where( + and_( + AdvertisingCampaignRegistration.campaign_id == campaign_id, + AdvertisingCampaignRegistration.created_at >= start_date, + ) + ) + .group_by(func.date(AdvertisingCampaignRegistration.created_at)) + ) + registrations_dict = {str(row.date): int(row.count) for row in registrations_by_day.all()} + + # --- Daily revenue (DAILY_STATS_DAYS days) --- + # Revenue = deposits (positive) + abs(subscription_payments) (stored negative) + revenue_amount_expr = func.coalesce( + func.sum( + case( + (Transaction.type == TransactionType.DEPOSIT.value, Transaction.amount_kopeks), + ( + Transaction.type == TransactionType.SUBSCRIPTION_PAYMENT.value, + func.abs(Transaction.amount_kopeks), + ), + else_=0, + ) + ), + 0, + ) + + revenue_by_day = await db.execute( + select( + func.date(Transaction.created_at).label('date'), + revenue_amount_expr.label('revenue'), + ) + .where( + and_( + Transaction.user_id.in_(campaign_user_ids_sq), + Transaction.is_completed.is_(True), + Transaction.created_at >= start_date, + Transaction.type.in_( + [ + TransactionType.DEPOSIT.value, + TransactionType.SUBSCRIPTION_PAYMENT.value, + ] + ), + ) + ) + .group_by(func.date(Transaction.created_at)) + ) + revenue_dict = {str(row.date): int(row.revenue) for row in revenue_by_day.all()} + + # --- Combine into daily_stats --- + daily_stats: list[dict[str, Any]] = [] + for i in range(DAILY_STATS_DAYS): + date = (start_date + timedelta(days=i)).date() + date_str = str(date) + daily_stats.append( + { + 'date': date_str, + 'referrals_count': registrations_dict.get(date_str, 0), + 'earnings_kopeks': revenue_dict.get(date_str, 0), + } + ) + + # --- Period comparison (this week vs last week) --- + # Current period registrations + current_reg_result = await db.execute( + select(func.count(AdvertisingCampaignRegistration.id)).where( + and_( + AdvertisingCampaignRegistration.campaign_id == campaign_id, + AdvertisingCampaignRegistration.created_at >= week_ago, + ) + ) + ) + current_registrations = current_reg_result.scalar() or 0 + + # Previous period registrations + previous_reg_result = await db.execute( + select(func.count(AdvertisingCampaignRegistration.id)).where( + and_( + AdvertisingCampaignRegistration.campaign_id == campaign_id, + AdvertisingCampaignRegistration.created_at >= previous_start, + AdvertisingCampaignRegistration.created_at < week_ago, + ) + ) + ) + previous_registrations = previous_reg_result.scalar() or 0 + + # Current period revenue + current_rev_result = await db.execute( + select(revenue_amount_expr.label('revenue')).where( + and_( + Transaction.user_id.in_(campaign_user_ids_sq), + Transaction.is_completed.is_(True), + Transaction.created_at >= week_ago, + Transaction.type.in_( + [ + TransactionType.DEPOSIT.value, + TransactionType.SUBSCRIPTION_PAYMENT.value, + ] + ), + ) + ) + ) + current_revenue = int(current_rev_result.scalar() or 0) + + # Previous period revenue + previous_rev_result = await db.execute( + select(revenue_amount_expr.label('revenue')).where( + and_( + Transaction.user_id.in_(campaign_user_ids_sq), + Transaction.is_completed.is_(True), + Transaction.created_at >= previous_start, + Transaction.created_at < week_ago, + Transaction.type.in_( + [ + TransactionType.DEPOSIT.value, + TransactionType.SUBSCRIPTION_PAYMENT.value, + ] + ), + ) + ) + ) + previous_revenue = int(previous_rev_result.scalar() or 0) + + def _calc_change(current_val: int, previous_val: int) -> dict[str, Any]: + diff = current_val - previous_val + pct = round((diff / previous_val * 100), 2) if previous_val > 0 else (100.0 if current_val > 0 else 0.0) + trend = 'up' if diff > 0 else 'down' if diff < 0 else 'stable' + return {'absolute': diff, 'percent': pct, 'trend': trend} + + period_comparison = { + 'current': { + 'days': PERIOD_COMPARISON_DAYS, + 'referrals_count': current_registrations, + 'earnings_kopeks': current_revenue, + }, + 'previous': { + 'days': PERIOD_COMPARISON_DAYS, + 'referrals_count': previous_registrations, + 'earnings_kopeks': previous_revenue, + }, + 'referrals_change': _calc_change(current_registrations, previous_registrations), + 'earnings_change': _calc_change(current_revenue, previous_revenue), + } + + # --- Total deposits & spending (separate aggregates) --- + totals_result = await db.execute( + select( + func.coalesce( + func.sum( + case( + (Transaction.type == TransactionType.DEPOSIT.value, Transaction.amount_kopeks), + else_=0, + ) + ), + 0, + ).label('deposits'), + func.coalesce( + func.sum( + case( + ( + Transaction.type == TransactionType.SUBSCRIPTION_PAYMENT.value, + func.abs(Transaction.amount_kopeks), + ), + else_=0, + ) + ), + 0, + ).label('spending'), + ).where( + and_( + Transaction.user_id.in_(campaign_user_ids_sq), + Transaction.is_completed.is_(True), + Transaction.type.in_( + [ + TransactionType.DEPOSIT.value, + TransactionType.SUBSCRIPTION_PAYMENT.value, + ] + ), + ) + ) + ) + totals_row = totals_result.one() + total_deposits_kopeks = int(totals_row.deposits) + total_spending_kopeks = int(totals_row.spending) + + # --- Top registrations (top 5 users by spending) --- + top_result = await db.execute( + select( + User.id, + User.username, + User.first_name, + User.last_name, + User.created_at, + User.has_had_paid_subscription, + func.coalesce( + func.sum( + case( + (Transaction.type == TransactionType.DEPOSIT.value, Transaction.amount_kopeks), + ( + Transaction.type == TransactionType.SUBSCRIPTION_PAYMENT.value, + func.abs(Transaction.amount_kopeks), + ), + else_=0, + ) + ), + 0, + ).label('total_spending'), + ) + .join(AdvertisingCampaignRegistration, AdvertisingCampaignRegistration.user_id == User.id) + .outerjoin( + Transaction, + and_( + Transaction.user_id == User.id, + Transaction.is_completed.is_(True), + Transaction.type.in_( + [ + TransactionType.DEPOSIT.value, + TransactionType.SUBSCRIPTION_PAYMENT.value, + ] + ), + ), + ) + .where(AdvertisingCampaignRegistration.campaign_id == campaign_id) + .group_by(User.id) + .order_by( + desc( + func.coalesce( + func.sum( + case( + (Transaction.type == TransactionType.DEPOSIT.value, Transaction.amount_kopeks), + ( + Transaction.type == TransactionType.SUBSCRIPTION_PAYMENT.value, + func.abs(Transaction.amount_kopeks), + ), + else_=0, + ) + ), + 0, + ) + ) + ) + .limit(TOP_REFERRALS_LIMIT) + ) + top_rows = top_result.all() + + # Batch-check active subscriptions to avoid N+1 + top_user_ids = [row.id for row in top_rows] + active_subs: set[int] = set() + if top_user_ids: + active_result = await db.execute( + select(Subscription.user_id).where( + and_( + Subscription.user_id.in_(top_user_ids), + Subscription.status == SubscriptionStatus.ACTIVE.value, + Subscription.end_date > now, + ) + ) + ) + active_subs = {row.user_id for row in active_result.all()} + + top_registrations: list[dict[str, Any]] = [] + for row in top_rows: + full_name = f'{row.first_name or ""} {row.last_name or ""}'.strip() or row.username or f'User #{row.id}' + top_registrations.append( + { + 'id': row.id, + 'full_name': full_name, + 'created_at': row.created_at, + 'has_paid': row.has_had_paid_subscription, + 'is_active': row.id in active_subs, + 'total_earnings_kopeks': int(row.total_spending), + } + ) + + return { + 'campaign_id': campaign_id, + 'total_deposits_kopeks': total_deposits_kopeks, + 'total_spending_kopeks': total_spending_kopeks, + 'daily_stats': daily_stats, + 'period_comparison': period_comparison, + 'top_registrations': top_registrations, + } + @classmethod async def _get_earnings_for_period( cls,