feat: add admin campaign chart data endpoint with deposits/spending split

- Add get_admin_campaign_chart_data() to PartnerStatsService with daily registrations, revenue trends, period comparison, and top registrations
- Add total_deposits_kopeks and total_spending_kopeks as separate aggregates
- Add 6 Pydantic schemas for admin chart data response
- Add GET /{campaign_id}/chart-data endpoint with campaigns:stats permission
- Add partner application endpoints and schemas for campaign detailed stats
This commit is contained in:
Fringg
2026-03-02 06:09:12 +03:00
parent 69868418e5
commit fa7de589c1
5 changed files with 782 additions and 1 deletions

View File

@@ -29,9 +29,11 @@ from app.database.models import (
Tariff,
User,
)
from app.services.partner_stats_service import PartnerStatsService
from ..dependencies import get_cabinet_db, require_permission
from ..schemas.campaigns import (
AdminCampaignChartDataResponse,
AvailablePartnerItem,
CampaignCreateRequest,
CampaignDetailResponse,
@@ -254,6 +256,24 @@ async def get_campaign(
)
@router.get('/{campaign_id}/chart-data', response_model=AdminCampaignChartDataResponse)
async def get_campaign_chart_data(
campaign_id: int,
admin: User = Depends(require_permission('campaigns:stats')),
db: AsyncSession = Depends(get_cabinet_db),
):
"""Get chart data for admin campaign analytics."""
campaign = await get_campaign_by_id(db, campaign_id)
if not campaign:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail='Campaign not found',
)
data = await PartnerStatsService.get_admin_campaign_chart_data(db, campaign_id)
return AdminCampaignChartDataResponse(**data)
@router.get('/{campaign_id}/stats', response_model=CampaignStatisticsResponse)
async def get_campaign_stats(
campaign_id: int,

View File

@@ -8,13 +8,20 @@ from sqlalchemy.ext.asyncio import AsyncSession
from app.config import settings
from app.database.models import AdvertisingCampaign, User
from app.services.partner_application_service import partner_application_service
from app.services.partner_stats_service import PartnerStatsService
from ..dependencies import get_cabinet_db, get_current_cabinet_user
from ..schemas.partners import (
CampaignReferralItem,
DailyStatItem,
PartnerApplicationInfo,
PartnerApplicationRequest,
PartnerCampaignDetailedStats,
PartnerCampaignInfo,
PartnerStatusResponse,
PeriodChange,
PeriodComparison,
PeriodStats,
)
@@ -77,7 +84,14 @@ async def get_partner_status(
AdvertisingCampaign.is_active.is_(True),
)
)
for c in result.scalars().all():
campaign_models = result.scalars().all()
# Fetch per-campaign stats in one batch
campaign_ids = [c.id for c in campaign_models]
campaign_stats = await PartnerStatsService.get_per_campaign_stats(db, user.id, campaign_ids)
for c in campaign_models:
stats = campaign_stats.get(c.id, {})
campaigns.append(
PartnerCampaignInfo(
id=c.id,
@@ -89,6 +103,9 @@ async def get_partner_status(
subscription_traffic_gb=c.subscription_traffic_gb,
deep_link=_get_campaign_deep_link(c.start_parameter),
web_link=_get_campaign_web_link(c.start_parameter),
registrations_count=stats.get('registrations_count', 0),
referrals_count=stats.get('referrals_count', 0),
earnings_kopeks=stats.get('earnings_kopeks', 0),
)
)
@@ -100,6 +117,56 @@ async def get_partner_status(
)
@router.get('/campaigns/{campaign_id}/stats', response_model=PartnerCampaignDetailedStats)
async def get_campaign_stats(
campaign_id: int,
user: User = Depends(get_current_cabinet_user),
db: AsyncSession = Depends(get_cabinet_db),
):
"""Get detailed stats for a single campaign belonging to the current partner."""
if not user.is_partner:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail='Partner status required',
)
# Verify campaign belongs to this partner
campaign_result = await db.execute(
select(AdvertisingCampaign).where(
AdvertisingCampaign.id == campaign_id,
AdvertisingCampaign.partner_user_id == user.id,
)
)
campaign = campaign_result.scalar_one_or_none()
if not campaign:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail='Campaign not found or not assigned to you',
)
raw = await PartnerStatsService.get_campaign_detailed_stats(db, user.id, campaign_id)
return PartnerCampaignDetailedStats(
campaign_id=raw['campaign_id'],
campaign_name=campaign.name,
registrations_count=raw['registrations_count'],
referrals_count=raw['referrals_count'],
earnings_kopeks=raw['earnings_kopeks'],
conversion_rate=raw['conversion_rate'],
earnings_today=raw['earnings_today'],
earnings_week=raw['earnings_week'],
earnings_month=raw['earnings_month'],
daily_stats=[DailyStatItem(**d) for d in raw['daily_stats']],
period_comparison=PeriodComparison(
current=PeriodStats(**raw['period_comparison']['current']),
previous=PeriodStats(**raw['period_comparison']['previous']),
referrals_change=PeriodChange(**raw['period_comparison']['referrals_change']),
earnings_change=PeriodChange(**raw['period_comparison']['earnings_change']),
),
top_referrals=[CampaignReferralItem(**r) for r in raw['top_referrals']],
)
@router.post('/apply', response_model=PartnerApplicationInfo)
async def apply_for_partner(
request: PartnerApplicationRequest,

View File

@@ -220,3 +220,61 @@ class ServerSquadInfo(BaseModel):
squad_uuid: str
display_name: str
country_code: str | None = None
# --- Admin campaign chart data schemas ---
class AdminDailyStatItem(BaseModel):
"""Daily stat item for admin campaign charts."""
date: str
referrals_count: int = 0 # actually registrations, named for frontend compat
earnings_kopeks: int = 0 # actually revenue, named for frontend compat
class AdminPeriodStats(BaseModel):
"""Period stats for admin campaign comparison."""
days: int
referrals_count: int = 0
earnings_kopeks: int = 0
class AdminPeriodChange(BaseModel):
"""Change metrics between periods."""
absolute: int = 0
percent: float = 0.0
trend: str = 'stable'
class AdminPeriodComparison(BaseModel):
"""Comparison of current vs previous period."""
current: AdminPeriodStats
previous: AdminPeriodStats
referrals_change: AdminPeriodChange
earnings_change: AdminPeriodChange
class AdminTopRegistrationItem(BaseModel):
"""Top user by spending in a campaign."""
id: int
full_name: str
created_at: datetime
has_paid: bool = False
is_active: bool = False
total_earnings_kopeks: int = 0 # actually total spending, named for frontend compat
class AdminCampaignChartDataResponse(BaseModel):
"""Chart data for admin campaign stats page."""
campaign_id: int
total_deposits_kopeks: int = 0
total_spending_kopeks: int = 0
daily_stats: list[AdminDailyStatItem] = []
period_comparison: AdminPeriodComparison
top_registrations: list[AdminTopRegistrationItem] = []

View File

@@ -51,6 +51,10 @@ class PartnerCampaignInfo(BaseModel):
subscription_traffic_gb: int | None = None
deep_link: str | None = None
web_link: str | None = None
# Per-campaign statistics
registrations_count: int = 0
referrals_count: int = 0
earnings_kopeks: int = 0
class PartnerStatusResponse(BaseModel):
@@ -62,6 +66,75 @@ class PartnerStatusResponse(BaseModel):
campaigns: list[PartnerCampaignInfo] = []
# ==================== Campaign detailed stats ====================
class DailyStatItem(BaseModel):
"""Single day of campaign stats."""
date: str
referrals_count: int = 0
earnings_kopeks: int = 0
class PeriodStats(BaseModel):
"""Stats for a single period."""
days: int
referrals_count: int = 0
earnings_kopeks: int = 0
class PeriodChange(BaseModel):
"""Change metrics between periods."""
absolute: int = 0
percent: float = 0.0
trend: str = 'stable'
class PeriodComparison(BaseModel):
"""Comparison between current and previous period."""
current: PeriodStats
previous: PeriodStats
referrals_change: PeriodChange
earnings_change: PeriodChange
class CampaignReferralItem(BaseModel):
"""Referral user in campaign stats."""
id: int
full_name: str
created_at: datetime
has_paid: bool = False
is_active: bool = False
total_earnings_kopeks: int = 0
class PartnerCampaignDetailedStats(BaseModel):
"""Detailed stats for a single campaign."""
campaign_id: int
campaign_name: str
# Summary
registrations_count: int = 0
referrals_count: int = 0
earnings_kopeks: int = 0
conversion_rate: float = 0.0
# Period earnings
earnings_today: int = 0
earnings_week: int = 0
earnings_month: int = 0
# Daily chart (30 days)
daily_stats: list[DailyStatItem] = []
# Period comparison (this week vs last week)
period_comparison: PeriodComparison
# Top referrals
top_referrals: list[CampaignReferralItem] = []
# ==================== Admin-facing ====================

View File

@@ -14,12 +14,19 @@ from app.database.models import (
ReferralEarning,
Subscription,
SubscriptionStatus,
Transaction,
TransactionType,
User,
)
logger = structlog.get_logger(__name__)
# Constants for campaign detailed stats
DAILY_STATS_DAYS = 30
TOP_REFERRALS_LIMIT = 5
PERIOD_COMPARISON_DAYS = 7
class PartnerStatsService:
"""Сервис для детальной статистики партнёров."""
@@ -644,6 +651,562 @@ class PartnerStatsService:
return result
@classmethod
async def get_campaign_detailed_stats(
cls,
db: AsyncSession,
user_id: int,
campaign_id: int,
) -> dict[str, Any]:
"""Detailed stats for a single campaign owned by the partner."""
now = datetime.now(UTC)
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
week_ago = now - timedelta(days=PERIOD_COMPARISON_DAYS)
month_ago = now - timedelta(days=DAILY_STATS_DAYS)
# --- Summary: registrations, referrals, earnings ---
basic = await cls.get_per_campaign_stats(db, user_id, [campaign_id])
summary = basic.get(campaign_id, {'registrations_count': 0, 'referrals_count': 0, 'earnings_kopeks': 0})
# --- Period earnings (today / week / month) ---
period_earnings_result = await db.execute(
select(
func.coalesce(
func.sum(case((ReferralEarning.created_at >= today_start, ReferralEarning.amount_kopeks), else_=0)),
0,
).label('today'),
func.coalesce(
func.sum(case((ReferralEarning.created_at >= week_ago, ReferralEarning.amount_kopeks), else_=0)),
0,
).label('week'),
func.coalesce(
func.sum(case((ReferralEarning.created_at >= month_ago, ReferralEarning.amount_kopeks), else_=0)),
0,
).label('month'),
).where(
and_(
ReferralEarning.user_id == user_id,
ReferralEarning.campaign_id == campaign_id,
)
)
)
pe_row = period_earnings_result.one()
# --- Daily stats (DAILY_STATS_DAYS days) ---
start_date = now - timedelta(days=DAILY_STATS_DAYS)
referrals_by_day = await db.execute(
select(
func.date(User.created_at).label('date'),
func.count(User.id).label('count'),
)
.join(AdvertisingCampaignRegistration, AdvertisingCampaignRegistration.user_id == User.id)
.where(
and_(
User.referred_by_id == user_id,
AdvertisingCampaignRegistration.campaign_id == campaign_id,
User.created_at >= start_date,
)
)
.group_by(func.date(User.created_at))
)
referrals_dict = {str(row.date): int(row.count) for row in referrals_by_day.all()}
earnings_by_day = await db.execute(
select(
func.date(ReferralEarning.created_at).label('date'),
func.sum(ReferralEarning.amount_kopeks).label('earnings'),
)
.where(
and_(
ReferralEarning.user_id == user_id,
ReferralEarning.campaign_id == campaign_id,
ReferralEarning.created_at >= start_date,
)
)
.group_by(func.date(ReferralEarning.created_at))
)
earnings_dict = {str(row.date): int(row.earnings or 0) for row in earnings_by_day.all()}
daily_stats = []
for i in range(DAILY_STATS_DAYS):
date = (start_date + timedelta(days=i)).date()
date_str = str(date)
daily_stats.append(
{
'date': date_str,
'referrals_count': referrals_dict.get(date_str, 0),
'earnings_kopeks': earnings_dict.get(date_str, 0),
}
)
# --- Period comparison (this week vs last week) ---
previous_start = week_ago - timedelta(days=PERIOD_COMPARISON_DAYS)
current_ref_result = await db.execute(
select(func.count(User.id))
.join(AdvertisingCampaignRegistration, AdvertisingCampaignRegistration.user_id == User.id)
.where(
and_(
User.referred_by_id == user_id,
AdvertisingCampaignRegistration.campaign_id == campaign_id,
User.created_at >= week_ago,
)
)
)
current_referrals = current_ref_result.scalar() or 0
previous_ref_result = await db.execute(
select(func.count(User.id))
.join(AdvertisingCampaignRegistration, AdvertisingCampaignRegistration.user_id == User.id)
.where(
and_(
User.referred_by_id == user_id,
AdvertisingCampaignRegistration.campaign_id == campaign_id,
User.created_at >= previous_start,
User.created_at < week_ago,
)
)
)
previous_referrals = previous_ref_result.scalar() or 0
current_earn = await db.execute(
select(func.coalesce(func.sum(ReferralEarning.amount_kopeks), 0)).where(
and_(
ReferralEarning.user_id == user_id,
ReferralEarning.campaign_id == campaign_id,
ReferralEarning.created_at >= week_ago,
)
)
)
current_earnings = int(current_earn.scalar() or 0)
previous_earn = await db.execute(
select(func.coalesce(func.sum(ReferralEarning.amount_kopeks), 0)).where(
and_(
ReferralEarning.user_id == user_id,
ReferralEarning.campaign_id == campaign_id,
ReferralEarning.created_at >= previous_start,
ReferralEarning.created_at < week_ago,
)
)
)
previous_earnings = int(previous_earn.scalar() or 0)
def _calc_change(current_val: int, previous_val: int) -> dict[str, Any]:
diff = current_val - previous_val
pct = round((diff / previous_val * 100), 2) if previous_val > 0 else (100.0 if current_val > 0 else 0.0)
trend = 'up' if diff > 0 else 'down' if diff < 0 else 'stable'
return {'absolute': diff, 'percent': pct, 'trend': trend}
period_comparison = {
'current': {
'days': PERIOD_COMPARISON_DAYS,
'referrals_count': current_referrals,
'earnings_kopeks': current_earnings,
},
'previous': {
'days': PERIOD_COMPARISON_DAYS,
'referrals_count': previous_referrals,
'earnings_kopeks': previous_earnings,
},
'referrals_change': _calc_change(current_referrals, previous_referrals),
'earnings_change': _calc_change(current_earnings, previous_earnings),
}
# --- Top referrals for this campaign ---
top_result = await db.execute(
select(
User.id,
User.username,
User.first_name,
User.last_name,
User.created_at,
User.has_made_first_topup,
func.coalesce(func.sum(ReferralEarning.amount_kopeks), 0).label('total_earnings'),
)
.join(AdvertisingCampaignRegistration, AdvertisingCampaignRegistration.user_id == User.id)
.outerjoin(
ReferralEarning,
and_(
ReferralEarning.referral_id == User.id,
ReferralEarning.user_id == user_id,
ReferralEarning.campaign_id == campaign_id,
),
)
.where(
and_(
User.referred_by_id == user_id,
AdvertisingCampaignRegistration.campaign_id == campaign_id,
)
)
.group_by(User.id)
.order_by(desc(func.coalesce(func.sum(ReferralEarning.amount_kopeks), 0)))
.limit(TOP_REFERRALS_LIMIT)
)
top_rows = top_result.all()
# Check active subscriptions for top referrals
top_user_ids = [row.id for row in top_rows]
active_subs: set[int] = set()
if top_user_ids:
active_result = await db.execute(
select(Subscription.user_id).where(
and_(
Subscription.user_id.in_(top_user_ids),
Subscription.status == SubscriptionStatus.ACTIVE.value,
Subscription.end_date > now,
)
)
)
active_subs = {row.user_id for row in active_result.all()}
top_referrals = []
for row in top_rows:
full_name = f'{row.first_name or ""} {row.last_name or ""}'.strip() or row.username or f'User #{row.id}'
top_referrals.append(
{
'id': row.id,
'full_name': full_name,
'created_at': row.created_at,
'has_paid': row.has_made_first_topup,
'is_active': row.id in active_subs,
'total_earnings_kopeks': int(row.total_earnings),
}
)
# --- Conversion rate ---
reg_count = summary['registrations_count']
ref_count = summary['referrals_count']
conversion_rate = round((ref_count / reg_count * 100), 2) if reg_count > 0 else 0.0
return {
'campaign_id': campaign_id,
'registrations_count': reg_count,
'referrals_count': ref_count,
'earnings_kopeks': summary['earnings_kopeks'],
'conversion_rate': conversion_rate,
'earnings_today': int(pe_row.today),
'earnings_week': int(pe_row.week),
'earnings_month': int(pe_row.month),
'daily_stats': daily_stats,
'period_comparison': period_comparison,
'top_referrals': top_referrals,
}
@classmethod
async def get_admin_campaign_chart_data(
cls,
db: AsyncSession,
campaign_id: int,
) -> dict[str, Any]:
"""Chart data for admin campaign analytics (no partner filter).
Unlike get_campaign_detailed_stats which is partner-isolated,
this method returns ALL registrations and revenue for a campaign.
Revenue is based on actual user transactions (deposits + subscription payments),
not referral earnings.
"""
now = datetime.now(UTC)
start_date = now - timedelta(days=DAILY_STATS_DAYS)
week_ago = now - timedelta(days=PERIOD_COMPARISON_DAYS)
previous_start = week_ago - timedelta(days=PERIOD_COMPARISON_DAYS)
# Subquery: user_ids registered via this campaign
campaign_user_ids_sq = (
select(AdvertisingCampaignRegistration.user_id)
.where(AdvertisingCampaignRegistration.campaign_id == campaign_id)
.scalar_subquery()
)
# --- Daily registrations (DAILY_STATS_DAYS days) ---
registrations_by_day = await db.execute(
select(
func.date(AdvertisingCampaignRegistration.created_at).label('date'),
func.count(AdvertisingCampaignRegistration.id).label('count'),
)
.where(
and_(
AdvertisingCampaignRegistration.campaign_id == campaign_id,
AdvertisingCampaignRegistration.created_at >= start_date,
)
)
.group_by(func.date(AdvertisingCampaignRegistration.created_at))
)
registrations_dict = {str(row.date): int(row.count) for row in registrations_by_day.all()}
# --- Daily revenue (DAILY_STATS_DAYS days) ---
# Revenue = deposits (positive) + abs(subscription_payments) (stored negative)
revenue_amount_expr = func.coalesce(
func.sum(
case(
(Transaction.type == TransactionType.DEPOSIT.value, Transaction.amount_kopeks),
(
Transaction.type == TransactionType.SUBSCRIPTION_PAYMENT.value,
func.abs(Transaction.amount_kopeks),
),
else_=0,
)
),
0,
)
revenue_by_day = await db.execute(
select(
func.date(Transaction.created_at).label('date'),
revenue_amount_expr.label('revenue'),
)
.where(
and_(
Transaction.user_id.in_(campaign_user_ids_sq),
Transaction.is_completed.is_(True),
Transaction.created_at >= start_date,
Transaction.type.in_(
[
TransactionType.DEPOSIT.value,
TransactionType.SUBSCRIPTION_PAYMENT.value,
]
),
)
)
.group_by(func.date(Transaction.created_at))
)
revenue_dict = {str(row.date): int(row.revenue) for row in revenue_by_day.all()}
# --- Combine into daily_stats ---
daily_stats: list[dict[str, Any]] = []
for i in range(DAILY_STATS_DAYS):
date = (start_date + timedelta(days=i)).date()
date_str = str(date)
daily_stats.append(
{
'date': date_str,
'referrals_count': registrations_dict.get(date_str, 0),
'earnings_kopeks': revenue_dict.get(date_str, 0),
}
)
# --- Period comparison (this week vs last week) ---
# Current period registrations
current_reg_result = await db.execute(
select(func.count(AdvertisingCampaignRegistration.id)).where(
and_(
AdvertisingCampaignRegistration.campaign_id == campaign_id,
AdvertisingCampaignRegistration.created_at >= week_ago,
)
)
)
current_registrations = current_reg_result.scalar() or 0
# Previous period registrations
previous_reg_result = await db.execute(
select(func.count(AdvertisingCampaignRegistration.id)).where(
and_(
AdvertisingCampaignRegistration.campaign_id == campaign_id,
AdvertisingCampaignRegistration.created_at >= previous_start,
AdvertisingCampaignRegistration.created_at < week_ago,
)
)
)
previous_registrations = previous_reg_result.scalar() or 0
# Current period revenue
current_rev_result = await db.execute(
select(revenue_amount_expr.label('revenue')).where(
and_(
Transaction.user_id.in_(campaign_user_ids_sq),
Transaction.is_completed.is_(True),
Transaction.created_at >= week_ago,
Transaction.type.in_(
[
TransactionType.DEPOSIT.value,
TransactionType.SUBSCRIPTION_PAYMENT.value,
]
),
)
)
)
current_revenue = int(current_rev_result.scalar() or 0)
# Previous period revenue
previous_rev_result = await db.execute(
select(revenue_amount_expr.label('revenue')).where(
and_(
Transaction.user_id.in_(campaign_user_ids_sq),
Transaction.is_completed.is_(True),
Transaction.created_at >= previous_start,
Transaction.created_at < week_ago,
Transaction.type.in_(
[
TransactionType.DEPOSIT.value,
TransactionType.SUBSCRIPTION_PAYMENT.value,
]
),
)
)
)
previous_revenue = int(previous_rev_result.scalar() or 0)
def _calc_change(current_val: int, previous_val: int) -> dict[str, Any]:
diff = current_val - previous_val
pct = round((diff / previous_val * 100), 2) if previous_val > 0 else (100.0 if current_val > 0 else 0.0)
trend = 'up' if diff > 0 else 'down' if diff < 0 else 'stable'
return {'absolute': diff, 'percent': pct, 'trend': trend}
period_comparison = {
'current': {
'days': PERIOD_COMPARISON_DAYS,
'referrals_count': current_registrations,
'earnings_kopeks': current_revenue,
},
'previous': {
'days': PERIOD_COMPARISON_DAYS,
'referrals_count': previous_registrations,
'earnings_kopeks': previous_revenue,
},
'referrals_change': _calc_change(current_registrations, previous_registrations),
'earnings_change': _calc_change(current_revenue, previous_revenue),
}
# --- Total deposits & spending (separate aggregates) ---
totals_result = await db.execute(
select(
func.coalesce(
func.sum(
case(
(Transaction.type == TransactionType.DEPOSIT.value, Transaction.amount_kopeks),
else_=0,
)
),
0,
).label('deposits'),
func.coalesce(
func.sum(
case(
(
Transaction.type == TransactionType.SUBSCRIPTION_PAYMENT.value,
func.abs(Transaction.amount_kopeks),
),
else_=0,
)
),
0,
).label('spending'),
).where(
and_(
Transaction.user_id.in_(campaign_user_ids_sq),
Transaction.is_completed.is_(True),
Transaction.type.in_(
[
TransactionType.DEPOSIT.value,
TransactionType.SUBSCRIPTION_PAYMENT.value,
]
),
)
)
)
totals_row = totals_result.one()
total_deposits_kopeks = int(totals_row.deposits)
total_spending_kopeks = int(totals_row.spending)
# --- Top registrations (top 5 users by spending) ---
top_result = await db.execute(
select(
User.id,
User.username,
User.first_name,
User.last_name,
User.created_at,
User.has_had_paid_subscription,
func.coalesce(
func.sum(
case(
(Transaction.type == TransactionType.DEPOSIT.value, Transaction.amount_kopeks),
(
Transaction.type == TransactionType.SUBSCRIPTION_PAYMENT.value,
func.abs(Transaction.amount_kopeks),
),
else_=0,
)
),
0,
).label('total_spending'),
)
.join(AdvertisingCampaignRegistration, AdvertisingCampaignRegistration.user_id == User.id)
.outerjoin(
Transaction,
and_(
Transaction.user_id == User.id,
Transaction.is_completed.is_(True),
Transaction.type.in_(
[
TransactionType.DEPOSIT.value,
TransactionType.SUBSCRIPTION_PAYMENT.value,
]
),
),
)
.where(AdvertisingCampaignRegistration.campaign_id == campaign_id)
.group_by(User.id)
.order_by(
desc(
func.coalesce(
func.sum(
case(
(Transaction.type == TransactionType.DEPOSIT.value, Transaction.amount_kopeks),
(
Transaction.type == TransactionType.SUBSCRIPTION_PAYMENT.value,
func.abs(Transaction.amount_kopeks),
),
else_=0,
)
),
0,
)
)
)
.limit(TOP_REFERRALS_LIMIT)
)
top_rows = top_result.all()
# Batch-check active subscriptions to avoid N+1
top_user_ids = [row.id for row in top_rows]
active_subs: set[int] = set()
if top_user_ids:
active_result = await db.execute(
select(Subscription.user_id).where(
and_(
Subscription.user_id.in_(top_user_ids),
Subscription.status == SubscriptionStatus.ACTIVE.value,
Subscription.end_date > now,
)
)
)
active_subs = {row.user_id for row in active_result.all()}
top_registrations: list[dict[str, Any]] = []
for row in top_rows:
full_name = f'{row.first_name or ""} {row.last_name or ""}'.strip() or row.username or f'User #{row.id}'
top_registrations.append(
{
'id': row.id,
'full_name': full_name,
'created_at': row.created_at,
'has_paid': row.has_had_paid_subscription,
'is_active': row.id in active_subs,
'total_earnings_kopeks': int(row.total_spending),
}
)
return {
'campaign_id': campaign_id,
'total_deposits_kopeks': total_deposits_kopeks,
'total_spending_kopeks': total_spending_kopeks,
'daily_stats': daily_stats,
'period_comparison': period_comparison,
'top_registrations': top_registrations,
}
@classmethod
async def _get_earnings_for_period(
cls,