diff --git a/app/cabinet/routes/__init__.py b/app/cabinet/routes/__init__.py index 29e3211f..0c7850c7 100644 --- a/app/cabinet/routes/__init__.py +++ b/app/cabinet/routes/__init__.py @@ -27,6 +27,7 @@ from .admin_broadcasts import router as admin_broadcasts_router from .admin_promocodes import router as admin_promocodes_router from .admin_promocodes import promo_groups_router as admin_promo_groups_router from .admin_campaigns import router as admin_campaigns_router +from .admin_users import router as admin_users_router from .media import router as media_router # Main cabinet router @@ -63,5 +64,6 @@ router.include_router(admin_broadcasts_router) router.include_router(admin_promocodes_router) router.include_router(admin_promo_groups_router) router.include_router(admin_campaigns_router) +router.include_router(admin_users_router) __all__ = ["router"] diff --git a/app/cabinet/routes/admin_users.py b/app/cabinet/routes/admin_users.py new file mode 100644 index 00000000..1313d623 --- /dev/null +++ b/app/cabinet/routes/admin_users.py @@ -0,0 +1,1574 @@ +"""Admin routes for managing users in cabinet.""" + +import logging +from datetime import datetime, timedelta +from typing import List, Optional + +from fastapi import APIRouter, Depends, HTTPException, status, Query +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy import select, func, and_, or_, Integer + +from app.database.models import ( + User, + UserStatus, + Subscription, + SubscriptionStatus, + Transaction, + TransactionType, + PromoGroup, + Tariff, +) +from app.database.crud.user import ( + get_user_by_id, + get_user_by_telegram_id, + get_users_list, + get_users_count, + get_users_statistics, + get_users_spending_stats, + add_user_balance, + subtract_user_balance, + update_user, + delete_user as soft_delete_user, + get_referrals, +) +from app.database.crud.subscription import ( + get_subscription_by_user_id, + extend_subscription, +) +from app.database.crud.tariff import get_tariff_by_id + +from ..dependencies import get_cabinet_db, get_current_admin_user +from ..schemas.users import ( + UserListItem, + UsersListResponse, + UserDetailResponse, + UserSubscriptionInfo, + UserPromoGroupInfo, + UserTransactionItem, + UserReferralInfo, + UpdateBalanceRequest, + UpdateBalanceResponse, + UpdateSubscriptionRequest, + UpdateSubscriptionResponse, + UpdateUserStatusRequest, + UpdateUserStatusResponse, + UpdateRestrictionsRequest, + UpdateRestrictionsResponse, + UpdatePromoGroupRequest, + UpdatePromoGroupResponse, + DeleteUserRequest, + DeleteUserResponse, + UsersStatsResponse, + SortByEnum, + UserStatusEnum, + UserAvailableTariffItem, + UserAvailableTariffsResponse, + PeriodPriceInfo, + PanelUserInfo, + SyncFromPanelRequest, + SyncFromPanelResponse, + SyncToPanelRequest, + SyncToPanelResponse, + PanelSyncStatusResponse, +) + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/admin/users", tags=["Cabinet Admin Users"]) + + +def _build_user_list_item(user: User, spending_stats: dict = None) -> UserListItem: + """Build UserListItem from User model.""" + stats = spending_stats or {} + user_stats = stats.get(user.id, {"total_spent": 0, "purchase_count": 0}) + + subscription_status = None + subscription_is_trial = False + subscription_end_date = None + has_subscription = False + + if user.subscription: + has_subscription = True + subscription_status = user.subscription.status + subscription_is_trial = user.subscription.is_trial + subscription_end_date = user.subscription.end_date + + return UserListItem( + id=user.id, + telegram_id=user.telegram_id, + username=user.username, + first_name=user.first_name, + last_name=user.last_name, + full_name=user.full_name, + status=user.status, + balance_kopeks=user.balance_kopeks, + balance_rubles=user.balance_rubles, + created_at=user.created_at, + last_activity=user.last_activity, + has_subscription=has_subscription, + subscription_status=subscription_status, + subscription_is_trial=subscription_is_trial, + subscription_end_date=subscription_end_date, + promo_group_id=user.promo_group_id, + promo_group_name=user.promo_group.name if user.promo_group else None, + total_spent_kopeks=user_stats.get("total_spent", 0), + purchase_count=user_stats.get("purchase_count", 0), + has_restrictions=user.has_restrictions, + restriction_topup=user.restriction_topup, + restriction_subscription=user.restriction_subscription, + ) + + +def _build_subscription_info(subscription: Subscription) -> UserSubscriptionInfo: + """Build UserSubscriptionInfo from Subscription model.""" + days_remaining = 0 + is_active = False + + if subscription.end_date: + delta = subscription.end_date - datetime.utcnow() + days_remaining = max(0, delta.days) + is_active = ( + subscription.status == SubscriptionStatus.ACTIVE.value + and subscription.end_date > datetime.utcnow() + ) + + tariff_name = None + if subscription.tariff_id: + tariff = getattr(subscription, "tariff", None) + if tariff: + tariff_name = tariff.name + + return UserSubscriptionInfo( + id=subscription.id, + status=subscription.status, + is_trial=subscription.is_trial, + start_date=subscription.start_date, + end_date=subscription.end_date, + traffic_limit_gb=subscription.traffic_limit_gb, + traffic_used_gb=subscription.traffic_used_gb or 0.0, + device_limit=subscription.device_limit, + tariff_id=subscription.tariff_id, + tariff_name=tariff_name, + autopay_enabled=subscription.autopay_enabled, + is_active=is_active, + days_remaining=days_remaining, + ) + + +# === List & Search === + +@router.get("", response_model=UsersListResponse) +async def list_users( + offset: int = Query(0, ge=0), + limit: int = Query(50, ge=1, le=200), + search: Optional[str] = Query(None, max_length=255), + status: Optional[UserStatusEnum] = Query(None), + sort_by: SortByEnum = Query(SortByEnum.CREATED_AT), + admin: User = Depends(get_current_admin_user), + db: AsyncSession = Depends(get_cabinet_db), +): + """ + Get paginated list of users with filtering and sorting. + + - **offset**: Pagination offset + - **limit**: Number of users per page (max 200) + - **search**: Search by telegram_id, username, first_name, last_name + - **status**: Filter by user status (active, blocked, deleted) + - **sort_by**: Sort field (created_at, balance, traffic, last_activity, total_spent, purchase_count) + """ + # Convert status enum to model enum + user_status = None + if status: + user_status = UserStatus(status.value) + + # Map sort options + order_by_balance = sort_by == SortByEnum.BALANCE + order_by_traffic = sort_by == SortByEnum.TRAFFIC + order_by_last_activity = sort_by == SortByEnum.LAST_ACTIVITY + order_by_total_spent = sort_by == SortByEnum.TOTAL_SPENT + order_by_purchase_count = sort_by == SortByEnum.PURCHASE_COUNT + + users = await get_users_list( + db=db, + offset=offset, + limit=limit, + search=search, + status=user_status, + order_by_balance=order_by_balance, + order_by_traffic=order_by_traffic, + order_by_last_activity=order_by_last_activity, + order_by_total_spent=order_by_total_spent, + order_by_purchase_count=order_by_purchase_count, + ) + + total = await get_users_count(db=db, status=user_status, search=search) + + # Get spending stats for all users + user_ids = [u.id for u in users] + spending_stats = await get_users_spending_stats(db, user_ids) if user_ids else {} + + items = [_build_user_list_item(u, spending_stats) for u in users] + + return UsersListResponse( + users=items, + total=total, + offset=offset, + limit=limit, + ) + + +@router.get("/stats", response_model=UsersStatsResponse) +async def get_users_stats( + admin: User = Depends(get_current_admin_user), + db: AsyncSession = Depends(get_cabinet_db), +): + """Get overall users statistics.""" + stats = await get_users_statistics(db) + + # Get subscription stats + sub_stats_query = select( + func.count(Subscription.id).label("total"), + func.sum( + func.cast( + and_( + Subscription.status == SubscriptionStatus.ACTIVE.value, + Subscription.end_date > datetime.utcnow(), + ), + Integer, + ) + ).label("active"), + func.sum(func.cast(Subscription.is_trial == True, Integer)).label("trial"), + func.sum( + func.cast( + or_( + Subscription.status == SubscriptionStatus.EXPIRED.value, + Subscription.end_date <= datetime.utcnow(), + ), + Integer, + ) + ).label("expired"), + ) + sub_result = await db.execute(sub_stats_query) + sub_row = sub_result.one_or_none() + + users_with_subscription = sub_row.total or 0 if sub_row else 0 + users_with_active = sub_row.active or 0 if sub_row else 0 + users_with_trial = sub_row.trial or 0 if sub_row else 0 + users_with_expired = sub_row.expired or 0 if sub_row else 0 + + # Get balance stats + balance_query = select( + func.sum(User.balance_kopeks).label("total"), + func.avg(User.balance_kopeks).label("avg"), + ).where(User.status == UserStatus.ACTIVE.value) + balance_result = await db.execute(balance_query) + balance_row = balance_result.one_or_none() + total_balance = balance_row.total or 0 if balance_row else 0 + avg_balance = int(balance_row.avg or 0) if balance_row else 0 + + # Get activity stats + now = datetime.utcnow() + today_start = now.replace(hour=0, minute=0, second=0, microsecond=0) + week_ago = now - timedelta(days=7) + month_ago = now - timedelta(days=30) + + active_today_q = select(func.count(User.id)).where( + User.last_activity >= today_start, + User.status == UserStatus.ACTIVE.value, + ) + active_week_q = select(func.count(User.id)).where( + User.last_activity >= week_ago, + User.status == UserStatus.ACTIVE.value, + ) + active_month_q = select(func.count(User.id)).where( + User.last_activity >= month_ago, + User.status == UserStatus.ACTIVE.value, + ) + + active_today = (await db.execute(active_today_q)).scalar() or 0 + active_week = (await db.execute(active_week_q)).scalar() or 0 + active_month = (await db.execute(active_month_q)).scalar() or 0 + + # Count deleted users + deleted_q = select(func.count(User.id)).where(User.status == UserStatus.DELETED.value) + deleted_count = (await db.execute(deleted_q)).scalar() or 0 + + return UsersStatsResponse( + total_users=stats["total_users"], + active_users=stats["active_users"], + blocked_users=stats["blocked_users"], + deleted_users=deleted_count, + new_today=stats["new_today"], + new_week=stats["new_week"], + new_month=stats["new_month"], + users_with_subscription=users_with_subscription, + users_with_active_subscription=users_with_active, + users_with_trial=users_with_trial, + users_with_expired_subscription=users_with_expired, + total_balance_kopeks=total_balance, + total_balance_rubles=total_balance / 100, + avg_balance_kopeks=avg_balance, + active_today=active_today, + active_week=active_week, + active_month=active_month, + ) + + +# === User Detail === + +@router.get("/{user_id}", response_model=UserDetailResponse) +async def get_user_detail( + user_id: int, + admin: User = Depends(get_current_admin_user), + db: AsyncSession = Depends(get_cabinet_db), +): + """Get detailed user information by ID.""" + user = await get_user_by_id(db, user_id) + if not user: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="User not found", + ) + + # Get spending stats + spending_stats = await get_users_spending_stats(db, [user.id]) + user_stats = spending_stats.get(user.id, {"total_spent": 0, "purchase_count": 0}) + + # Build subscription info + subscription_info = None + if user.subscription: + subscription_info = _build_subscription_info(user.subscription) + + # Build promo group info + promo_group_info = None + if user.promo_group: + promo_group_info = UserPromoGroupInfo( + id=user.promo_group.id, + name=user.promo_group.name, + is_default=user.promo_group.is_default, + ) + + # Get referrals count + referrals = await get_referrals(db, user.id) + referrals_count = len(referrals) + + # Calculate total referral earnings + referral_earnings_q = select(func.sum(Transaction.amount_kopeks)).where( + Transaction.user_id == user.id, + Transaction.type == TransactionType.REFERRAL_REWARD.value, + Transaction.is_completed == True, + ) + referral_earnings = (await db.execute(referral_earnings_q)).scalar() or 0 + + # Get referrer info + referred_by_username = None + if user.referrer: + referred_by_username = user.referrer.username or user.referrer.full_name + + referral_info = UserReferralInfo( + referral_code=user.referral_code or "", + referrals_count=referrals_count, + total_earnings_kopeks=referral_earnings, + commission_percent=user.referral_commission_percent, + referred_by_id=user.referred_by_id, + referred_by_username=referred_by_username, + ) + + # Get recent transactions + transactions_q = ( + select(Transaction) + .where(Transaction.user_id == user.id) + .order_by(Transaction.created_at.desc()) + .limit(20) + ) + transactions_result = await db.execute(transactions_q) + transactions = transactions_result.scalars().all() + + recent_transactions = [ + UserTransactionItem( + id=t.id, + type=t.type, + amount_kopeks=t.amount_kopeks, + amount_rubles=t.amount_kopeks / 100, + description=t.description, + payment_method=t.payment_method, + is_completed=t.is_completed, + created_at=t.created_at, + ) + for t in transactions + ] + + return UserDetailResponse( + id=user.id, + telegram_id=user.telegram_id, + username=user.username, + first_name=user.first_name, + last_name=user.last_name, + full_name=user.full_name, + status=user.status, + language=user.language, + balance_kopeks=user.balance_kopeks, + balance_rubles=user.balance_rubles, + email=user.email, + email_verified=user.email_verified, + created_at=user.created_at, + updated_at=user.updated_at, + last_activity=user.last_activity, + cabinet_last_login=user.cabinet_last_login, + subscription=subscription_info, + promo_group=promo_group_info, + referral=referral_info, + total_spent_kopeks=user_stats.get("total_spent", 0), + purchase_count=user_stats.get("purchase_count", 0), + used_promocodes=user.used_promocodes, + has_had_paid_subscription=user.has_had_paid_subscription, + lifetime_used_traffic_bytes=user.lifetime_used_traffic_bytes or 0, + restriction_topup=user.restriction_topup, + restriction_subscription=user.restriction_subscription, + restriction_reason=user.restriction_reason, + promo_offer_discount_percent=user.promo_offer_discount_percent, + promo_offer_discount_source=user.promo_offer_discount_source, + promo_offer_discount_expires_at=user.promo_offer_discount_expires_at, + recent_transactions=recent_transactions, + ) + + +@router.get("/by-telegram/{telegram_id}", response_model=UserDetailResponse) +async def get_user_by_telegram( + telegram_id: int, + admin: User = Depends(get_current_admin_user), + db: AsyncSession = Depends(get_cabinet_db), +): + """Get user by Telegram ID.""" + user = await get_user_by_telegram_id(db, telegram_id) + if not user: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="User not found", + ) + return await get_user_detail(user.id, admin, db) + + +# === Balance Management === + +@router.post("/{user_id}/balance", response_model=UpdateBalanceResponse) +async def update_user_balance( + user_id: int, + request: UpdateBalanceRequest, + admin: User = Depends(get_current_admin_user), + db: AsyncSession = Depends(get_cabinet_db), +): + """ + Update user balance. + + - Positive amount: adds to balance + - Negative amount: subtracts from balance + """ + user = await get_user_by_id(db, user_id) + if not user: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="User not found", + ) + + old_balance = user.balance_kopeks + + if request.amount_kopeks >= 0: + # Add balance + success = await add_user_balance( + db=db, + user=user, + amount_kopeks=request.amount_kopeks, + description=request.description, + create_transaction=request.create_transaction, + transaction_type=TransactionType.DEPOSIT, + ) + else: + # Subtract balance + amount_to_subtract = abs(request.amount_kopeks) + if user.balance_kopeks < amount_to_subtract: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"Insufficient balance. Current: {user.balance_kopeks}, requested: {amount_to_subtract}", + ) + success = await subtract_user_balance( + db=db, + user=user, + amount_kopeks=amount_to_subtract, + description=request.description, + create_transaction=request.create_transaction, + ) + + if not success: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Failed to update balance", + ) + + # Refresh user + await db.refresh(user) + + logger.info( + f"Admin {admin.id} updated balance for user {user_id}: " + f"{old_balance} -> {user.balance_kopeks} ({request.amount_kopeks:+d})" + ) + + return UpdateBalanceResponse( + success=True, + old_balance_kopeks=old_balance, + new_balance_kopeks=user.balance_kopeks, + message=f"Balance updated: {old_balance/100:.2f}₽ -> {user.balance_kopeks/100:.2f}₽", + ) + + +# === Subscription Management === + +@router.post("/{user_id}/subscription", response_model=UpdateSubscriptionResponse) +async def update_user_subscription( + user_id: int, + request: UpdateSubscriptionRequest, + admin: User = Depends(get_current_admin_user), + db: AsyncSession = Depends(get_cabinet_db), +): + """ + Update user subscription. + + Actions: + - **extend**: Extend subscription by X days + - **set_end_date**: Set specific end date + - **change_tariff**: Change subscription tariff + - **set_traffic**: Set traffic limit and/or used traffic + - **toggle_autopay**: Enable/disable autopay + - **cancel**: Cancel subscription (set status to expired) + - **activate**: Activate subscription + - **create**: Create new subscription if not exists + """ + user = await get_user_by_id(db, user_id) + if not user: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="User not found", + ) + + subscription = user.subscription + + if request.action == "create": + # Create new subscription + if subscription: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="User already has a subscription", + ) + + from app.database.crud.subscription import create_paid_subscription + + days = request.days or 30 + is_trial = request.is_trial or False + traffic_limit = request.traffic_limit_gb or 100 + device_limit = request.device_limit or 1 + + new_sub = await create_paid_subscription( + db=db, + user_id=user.id, + duration_days=days, + traffic_limit_gb=traffic_limit, + device_limit=device_limit, + is_trial=is_trial, + tariff_id=request.tariff_id, + ) + + logger.info(f"Admin {admin.id} created subscription for user {user_id}") + + return UpdateSubscriptionResponse( + success=True, + message=f"Subscription created for {days} days", + subscription=_build_subscription_info(new_sub), + ) + + if not subscription: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="User has no subscription", + ) + + if request.action == "extend": + if not request.days: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Days parameter is required for extend action", + ) + + old_end = subscription.end_date + await extend_subscription(db, subscription, request.days) + await db.refresh(subscription) + + logger.info( + f"Admin {admin.id} extended subscription for user {user_id} by {request.days} days" + ) + + return UpdateSubscriptionResponse( + success=True, + message=f"Subscription extended by {request.days} days", + subscription=_build_subscription_info(subscription), + ) + + elif request.action == "set_end_date": + if not request.end_date: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="end_date parameter is required", + ) + + subscription.end_date = request.end_date + if request.end_date > datetime.utcnow(): + subscription.status = SubscriptionStatus.ACTIVE.value + else: + subscription.status = SubscriptionStatus.EXPIRED.value + + await db.commit() + await db.refresh(subscription) + + logger.info(f"Admin {admin.id} set end_date for user {user_id} subscription") + + return UpdateSubscriptionResponse( + success=True, + message=f"Subscription end date set to {request.end_date.isoformat()}", + subscription=_build_subscription_info(subscription), + ) + + elif request.action == "change_tariff": + if request.tariff_id is None: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="tariff_id parameter is required", + ) + + tariff = await get_tariff_by_id(db, request.tariff_id) + if not tariff: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Tariff not found", + ) + + subscription.tariff_id = request.tariff_id + subscription.traffic_limit_gb = tariff.traffic_limit_gb + subscription.device_limit = tariff.device_limit + await db.commit() + await db.refresh(subscription) + + logger.info( + f"Admin {admin.id} changed tariff for user {user_id} to {tariff.name}" + ) + + return UpdateSubscriptionResponse( + success=True, + message=f"Tariff changed to {tariff.name}", + subscription=_build_subscription_info(subscription), + ) + + elif request.action == "set_traffic": + if request.traffic_limit_gb is not None: + subscription.traffic_limit_gb = request.traffic_limit_gb + + if request.traffic_used_gb is not None: + subscription.traffic_used_gb = request.traffic_used_gb + + await db.commit() + await db.refresh(subscription) + + logger.info(f"Admin {admin.id} updated traffic for user {user_id}") + + return UpdateSubscriptionResponse( + success=True, + message="Traffic settings updated", + subscription=_build_subscription_info(subscription), + ) + + elif request.action == "toggle_autopay": + if request.autopay_enabled is None: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="autopay_enabled parameter is required", + ) + + subscription.autopay_enabled = request.autopay_enabled + await db.commit() + await db.refresh(subscription) + + state = "enabled" if request.autopay_enabled else "disabled" + logger.info(f"Admin {admin.id} {state} autopay for user {user_id}") + + return UpdateSubscriptionResponse( + success=True, + message=f"Autopay {state}", + subscription=_build_subscription_info(subscription), + ) + + elif request.action == "cancel": + subscription.status = SubscriptionStatus.EXPIRED.value + subscription.end_date = datetime.utcnow() + await db.commit() + await db.refresh(subscription) + + logger.info(f"Admin {admin.id} cancelled subscription for user {user_id}") + + return UpdateSubscriptionResponse( + success=True, + message="Subscription cancelled", + subscription=_build_subscription_info(subscription), + ) + + elif request.action == "activate": + subscription.status = SubscriptionStatus.ACTIVE.value + if subscription.end_date and subscription.end_date <= datetime.utcnow(): + # Extend by 30 days if expired + subscription.end_date = datetime.utcnow() + timedelta(days=30) + await db.commit() + await db.refresh(subscription) + + logger.info(f"Admin {admin.id} activated subscription for user {user_id}") + + return UpdateSubscriptionResponse( + success=True, + message="Subscription activated", + subscription=_build_subscription_info(subscription), + ) + + else: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"Unknown action: {request.action}", + ) + + +# === Available Tariffs === + +@router.get("/{user_id}/available-tariffs", response_model=UserAvailableTariffsResponse) +async def get_user_available_tariffs( + user_id: int, + include_inactive: bool = Query(False, description="Include inactive tariffs"), + admin: User = Depends(get_current_admin_user), + db: AsyncSession = Depends(get_cabinet_db), +): + """ + Get list of tariffs available for a specific user. + + Takes into account user's promo group to determine which tariffs are accessible. + Shows all tariffs with availability flag. + """ + user = await get_user_by_id(db, user_id) + if not user: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="User not found", + ) + + # Get all tariffs + from app.database.crud.tariff import get_all_tariffs + + tariffs = await get_all_tariffs(db, include_inactive=include_inactive) + + # Get current subscription tariff + current_tariff_id = None + current_tariff_name = None + if user.subscription and user.subscription.tariff_id: + current_tariff_id = user.subscription.tariff_id + if user.subscription.tariff: + current_tariff_name = user.subscription.tariff.name + + # Build tariff items + tariff_items = [] + for tariff in tariffs: + # Check if available for user's promo group + is_available = tariff.is_available_for_promo_group(user.promo_group_id) + requires_promo_group = bool(tariff.allowed_promo_groups) + + # Build period prices + period_prices = [] + if tariff.period_prices: + for days_str, price_kopeks in sorted(tariff.period_prices.items(), key=lambda x: int(x[0])): + days = int(days_str) + period_prices.append(PeriodPriceInfo( + days=days, + price_kopeks=price_kopeks, + price_rubles=price_kopeks / 100, + )) + + tariff_items.append(UserAvailableTariffItem( + id=tariff.id, + name=tariff.name, + description=tariff.description, + is_active=tariff.is_active, + is_trial_available=tariff.is_trial_available, + traffic_limit_gb=tariff.traffic_limit_gb, + device_limit=tariff.device_limit, + tier_level=tariff.tier_level, + display_order=tariff.display_order, + period_prices=period_prices, + is_daily=tariff.is_daily, + daily_price_kopeks=tariff.daily_price_kopeks, + custom_days_enabled=tariff.custom_days_enabled, + price_per_day_kopeks=tariff.price_per_day_kopeks, + min_days=tariff.min_days, + max_days=tariff.max_days, + is_available=is_available, + requires_promo_group=requires_promo_group, + )) + + # Sort by display_order, then by tier_level + tariff_items.sort(key=lambda t: (t.display_order, t.tier_level)) + + return UserAvailableTariffsResponse( + user_id=user.id, + promo_group_id=user.promo_group_id, + promo_group_name=user.promo_group.name if user.promo_group else None, + tariffs=tariff_items, + total=len(tariff_items), + current_tariff_id=current_tariff_id, + current_tariff_name=current_tariff_name, + ) + + +# === Status Management === + +@router.post("/{user_id}/status", response_model=UpdateUserStatusResponse) +async def update_user_status( + user_id: int, + request: UpdateUserStatusRequest, + admin: User = Depends(get_current_admin_user), + db: AsyncSession = Depends(get_cabinet_db), +): + """Update user status (active, blocked, deleted).""" + user = await get_user_by_id(db, user_id) + if not user: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="User not found", + ) + + old_status = user.status + new_status = request.status.value + + if old_status == new_status: + return UpdateUserStatusResponse( + success=True, + old_status=old_status, + new_status=new_status, + message="Status unchanged", + ) + + user.status = new_status + user.updated_at = datetime.utcnow() + await db.commit() + await db.refresh(user) + + action = f"{old_status} -> {new_status}" + if request.reason: + action += f" (reason: {request.reason})" + + logger.info(f"Admin {admin.id} changed status for user {user_id}: {action}") + + return UpdateUserStatusResponse( + success=True, + old_status=old_status, + new_status=new_status, + message=f"Status changed from {old_status} to {new_status}", + ) + + +@router.post("/{user_id}/block", response_model=UpdateUserStatusResponse) +async def block_user( + user_id: int, + reason: Optional[str] = None, + admin: User = Depends(get_current_admin_user), + db: AsyncSession = Depends(get_cabinet_db), +): + """Block a user (shortcut for status update).""" + request = UpdateUserStatusRequest(status=UserStatusEnum.BLOCKED, reason=reason) + return await update_user_status(user_id, request, admin, db) + + +@router.post("/{user_id}/unblock", response_model=UpdateUserStatusResponse) +async def unblock_user( + user_id: int, + admin: User = Depends(get_current_admin_user), + db: AsyncSession = Depends(get_cabinet_db), +): + """Unblock a user (shortcut for status update).""" + request = UpdateUserStatusRequest(status=UserStatusEnum.ACTIVE) + return await update_user_status(user_id, request, admin, db) + + +# === Restrictions Management === + +@router.post("/{user_id}/restrictions", response_model=UpdateRestrictionsResponse) +async def update_user_restrictions( + user_id: int, + request: UpdateRestrictionsRequest, + admin: User = Depends(get_current_admin_user), + db: AsyncSession = Depends(get_cabinet_db), +): + """Update user restrictions (topup, subscription).""" + user = await get_user_by_id(db, user_id) + if not user: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="User not found", + ) + + if request.restriction_topup is not None: + user.restriction_topup = request.restriction_topup + + if request.restriction_subscription is not None: + user.restriction_subscription = request.restriction_subscription + + if request.restriction_reason is not None: + user.restriction_reason = request.restriction_reason + + user.updated_at = datetime.utcnow() + await db.commit() + await db.refresh(user) + + logger.info( + f"Admin {admin.id} updated restrictions for user {user_id}: " + f"topup={user.restriction_topup}, subscription={user.restriction_subscription}" + ) + + return UpdateRestrictionsResponse( + success=True, + restriction_topup=user.restriction_topup, + restriction_subscription=user.restriction_subscription, + restriction_reason=user.restriction_reason, + message="Restrictions updated", + ) + + +# === Promo Group Management === + +@router.post("/{user_id}/promo-group", response_model=UpdatePromoGroupResponse) +async def update_user_promo_group( + user_id: int, + request: UpdatePromoGroupRequest, + admin: User = Depends(get_current_admin_user), + db: AsyncSession = Depends(get_cabinet_db), +): + """Update user promo group.""" + user = await get_user_by_id(db, user_id) + if not user: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="User not found", + ) + + old_promo_group_id = user.promo_group_id + new_promo_group_id = request.promo_group_id + promo_group_name = None + + if new_promo_group_id is not None: + # Verify promo group exists + result = await db.execute( + select(PromoGroup).where(PromoGroup.id == new_promo_group_id) + ) + promo_group = result.scalar_one_or_none() + if not promo_group: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Promo group not found", + ) + promo_group_name = promo_group.name + + user.promo_group_id = new_promo_group_id + user.updated_at = datetime.utcnow() + await db.commit() + await db.refresh(user) + + logger.info( + f"Admin {admin.id} changed promo group for user {user_id}: " + f"{old_promo_group_id} -> {new_promo_group_id}" + ) + + return UpdatePromoGroupResponse( + success=True, + old_promo_group_id=old_promo_group_id, + new_promo_group_id=new_promo_group_id, + promo_group_name=promo_group_name, + message="Promo group updated", + ) + + +# === Delete User === + +@router.delete("/{user_id}", response_model=DeleteUserResponse) +async def delete_user( + user_id: int, + request: DeleteUserRequest = DeleteUserRequest(), + admin: User = Depends(get_current_admin_user), + db: AsyncSession = Depends(get_cabinet_db), +): + """ + Delete a user. + + - **soft_delete=True**: Mark user as deleted (default) + - **soft_delete=False**: Permanently delete from database + """ + user = await get_user_by_id(db, user_id) + if not user: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="User not found", + ) + + if request.soft_delete: + await soft_delete_user(db, user) + action = "soft deleted" + else: + # Hard delete + await db.delete(user) + await db.commit() + action = "permanently deleted" + + reason_text = f" (reason: {request.reason})" if request.reason else "" + logger.info(f"Admin {admin.id} {action} user {user_id}{reason_text}") + + return DeleteUserResponse( + success=True, + message=f"User {action} successfully", + ) + + +# === User Referrals === + +@router.get("/{user_id}/referrals", response_model=UsersListResponse) +async def get_user_referrals( + user_id: int, + offset: int = Query(0, ge=0), + limit: int = Query(50, ge=1, le=200), + admin: User = Depends(get_current_admin_user), + db: AsyncSession = Depends(get_cabinet_db), +): + """Get list of users referred by this user.""" + user = await get_user_by_id(db, user_id) + if not user: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="User not found", + ) + + referrals = await get_referrals(db, user.id) + + # Apply pagination manually + total = len(referrals) + referrals = referrals[offset : offset + limit] + + # Get spending stats + user_ids = [r.id for r in referrals] + spending_stats = await get_users_spending_stats(db, user_ids) if user_ids else {} + + items = [_build_user_list_item(r, spending_stats) for r in referrals] + + return UsersListResponse( + users=items, + total=total, + offset=offset, + limit=limit, + ) + + +# === User Transactions === + +@router.get("/{user_id}/transactions") +async def get_user_transactions( + user_id: int, + offset: int = Query(0, ge=0), + limit: int = Query(50, ge=1, le=200), + transaction_type: Optional[str] = Query(None), + admin: User = Depends(get_current_admin_user), + db: AsyncSession = Depends(get_cabinet_db), +): + """Get user transactions.""" + user = await get_user_by_id(db, user_id) + if not user: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="User not found", + ) + + query = select(Transaction).where(Transaction.user_id == user.id) + + if transaction_type: + query = query.where(Transaction.type == transaction_type) + + # Get total count + count_query = select(func.count(Transaction.id)).where(Transaction.user_id == user.id) + if transaction_type: + count_query = count_query.where(Transaction.type == transaction_type) + total = (await db.execute(count_query)).scalar() or 0 + + # Get transactions + query = query.order_by(Transaction.created_at.desc()).offset(offset).limit(limit) + result = await db.execute(query) + transactions = result.scalars().all() + + items = [ + UserTransactionItem( + id=t.id, + type=t.type, + amount_kopeks=t.amount_kopeks, + amount_rubles=t.amount_kopeks / 100, + description=t.description, + payment_method=t.payment_method, + is_completed=t.is_completed, + created_at=t.created_at, + ) + for t in transactions + ] + + return { + "transactions": items, + "total": total, + "offset": offset, + "limit": limit, + } + + +# === Panel Sync === + +@router.get("/{user_id}/sync/status", response_model=PanelSyncStatusResponse) +async def get_user_sync_status( + user_id: int, + admin: User = Depends(get_current_admin_user), + db: AsyncSession = Depends(get_cabinet_db), +): + """ + Get sync status between bot and panel for a user. + + Shows differences between bot data and panel data. + """ + user = await get_user_by_id(db, user_id) + if not user: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="User not found", + ) + + # Bot data + bot_sub_status = None + bot_sub_end_date = None + bot_traffic_limit = 0 + bot_traffic_used = 0.0 + + if user.subscription: + bot_sub_status = user.subscription.status + bot_sub_end_date = user.subscription.end_date + bot_traffic_limit = user.subscription.traffic_limit_gb + bot_traffic_used = user.subscription.traffic_used_gb or 0.0 + + # Panel data + panel_found = False + panel_status = None + panel_expire_at = None + panel_traffic_limit = 0.0 + panel_traffic_used = 0.0 + differences = [] + + try: + from app.services.remnawave_service import RemnaWaveService + + service = RemnaWaveService() + if service.is_configured: + async with service.get_api_client() as api: + panel_users = await api.get_user_by_telegram_id(user.telegram_id) + if panel_users: + panel_user = panel_users[0] + panel_found = True + panel_status = panel_user.status.value if panel_user.status else None + panel_expire_at = panel_user.expire_at + panel_traffic_limit = panel_user.traffic_limit_bytes / (1024**3) if panel_user.traffic_limit_bytes else 0 + panel_traffic_used = panel_user.used_traffic_bytes / (1024**3) if panel_user.used_traffic_bytes else 0 + + # Check differences + if bot_sub_status and panel_status: + bot_active = bot_sub_status in ("active", "trial") + panel_active = panel_status.upper() == "ACTIVE" + if bot_active != panel_active: + differences.append(f"Status: bot={bot_sub_status}, panel={panel_status}") + + if bot_sub_end_date and panel_expire_at: + diff_seconds = abs((bot_sub_end_date - panel_expire_at).total_seconds()) + if diff_seconds > 3600: # More than 1 hour difference + differences.append(f"End date differs by {diff_seconds/3600:.1f} hours") + + if abs(bot_traffic_limit - panel_traffic_limit) > 1: + differences.append(f"Traffic limit: bot={bot_traffic_limit}GB, panel={panel_traffic_limit:.1f}GB") + + if abs(bot_traffic_used - panel_traffic_used) > 0.5: + differences.append(f"Traffic used: bot={bot_traffic_used:.2f}GB, panel={panel_traffic_used:.2f}GB") + + except Exception as e: + logger.warning(f"Failed to get panel data for user {user_id}: {e}") + differences.append(f"Error fetching panel data: {str(e)}") + + return PanelSyncStatusResponse( + user_id=user.id, + telegram_id=user.telegram_id, + remnawave_uuid=user.remnawave_uuid, + last_sync=user.last_remnawave_sync, + bot_subscription_status=bot_sub_status, + bot_subscription_end_date=bot_sub_end_date, + bot_traffic_limit_gb=bot_traffic_limit, + bot_traffic_used_gb=bot_traffic_used, + panel_found=panel_found, + panel_status=panel_status, + panel_expire_at=panel_expire_at, + panel_traffic_limit_gb=panel_traffic_limit, + panel_traffic_used_gb=panel_traffic_used, + has_differences=len(differences) > 0, + differences=differences, + ) + + +@router.post("/{user_id}/sync/from-panel", response_model=SyncFromPanelResponse) +async def sync_user_from_panel( + user_id: int, + request: SyncFromPanelRequest = SyncFromPanelRequest(), + admin: User = Depends(get_current_admin_user), + db: AsyncSession = Depends(get_cabinet_db), +): + """ + Sync user data FROM panel TO bot. + + Fetches user data from Remnawave panel and updates local database. + """ + user = await get_user_by_id(db, user_id) + if not user: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="User not found", + ) + + try: + from app.services.remnawave_service import RemnaWaveService, RemnaWaveConfigurationError + + service = RemnaWaveService() + if not service.is_configured: + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail=service.configuration_error or "Remnawave API not configured", + ) + + changes = {} + errors = [] + panel_info = None + + async with service.get_api_client() as api: + # Find user in panel + panel_users = await api.get_user_by_telegram_id(user.telegram_id) + + if not panel_users: + return SyncFromPanelResponse( + success=False, + message="User not found in panel", + errors=["No user with this telegram_id found in Remnawave panel"], + ) + + panel_user = panel_users[0] + + # Build panel info + active_squads = [] + if hasattr(panel_user, 'active_internal_squads') and panel_user.active_internal_squads: + for squad in panel_user.active_internal_squads: + if hasattr(squad, 'uuid'): + active_squads.append(squad.uuid) + elif isinstance(squad, str): + active_squads.append(squad) + + panel_info = PanelUserInfo( + uuid=panel_user.uuid, + short_uuid=panel_user.short_uuid, + username=panel_user.username, + status=panel_user.status.value if panel_user.status else None, + expire_at=panel_user.expire_at, + traffic_limit_gb=panel_user.traffic_limit_bytes / (1024**3) if panel_user.traffic_limit_bytes else 0, + traffic_used_gb=panel_user.used_traffic_bytes / (1024**3) if panel_user.used_traffic_bytes else 0, + device_limit=panel_user.hwid_device_limit or 1, + subscription_url=panel_user.subscription_url, + active_squads=active_squads, + ) + + # Update remnawave_uuid if different + if user.remnawave_uuid != panel_user.uuid: + changes["remnawave_uuid"] = {"old": user.remnawave_uuid, "new": panel_user.uuid} + user.remnawave_uuid = panel_user.uuid + + # Update subscription if requested + if request.update_subscription and user.subscription: + sub = user.subscription + + # Update end date + if panel_user.expire_at: + if sub.end_date != panel_user.expire_at: + changes["end_date"] = {"old": sub.end_date.isoformat() if sub.end_date else None, "new": panel_user.expire_at.isoformat()} + sub.end_date = panel_user.expire_at + + # Update status + panel_status_str = panel_user.status.value if panel_user.status else "DISABLED" + now = datetime.utcnow() + if panel_status_str == "ACTIVE" and panel_user.expire_at and panel_user.expire_at > now: + new_status = SubscriptionStatus.ACTIVE.value + elif panel_user.expire_at and panel_user.expire_at <= now: + new_status = SubscriptionStatus.EXPIRED.value + else: + new_status = SubscriptionStatus.DISABLED.value + + if sub.status != new_status: + changes["status"] = {"old": sub.status, "new": new_status} + sub.status = new_status + + # Update traffic limit + panel_traffic_limit = int(panel_user.traffic_limit_bytes / (1024**3)) if panel_user.traffic_limit_bytes else 0 + if sub.traffic_limit_gb != panel_traffic_limit: + changes["traffic_limit_gb"] = {"old": sub.traffic_limit_gb, "new": panel_traffic_limit} + sub.traffic_limit_gb = panel_traffic_limit + + # Update device limit + panel_device_limit = panel_user.hwid_device_limit or 1 + if sub.device_limit != panel_device_limit: + changes["device_limit"] = {"old": sub.device_limit, "new": panel_device_limit} + sub.device_limit = panel_device_limit + + # Update connected squads + if active_squads and sub.connected_squads != active_squads: + changes["connected_squads"] = {"old": sub.connected_squads, "new": active_squads} + sub.connected_squads = active_squads + + # Update subscription URL + if panel_user.subscription_url and sub.subscription_url != panel_user.subscription_url: + changes["subscription_url"] = {"old": sub.subscription_url, "new": panel_user.subscription_url} + sub.subscription_url = panel_user.subscription_url + + # Update short UUID + if panel_user.short_uuid and sub.remnawave_short_uuid != panel_user.short_uuid: + changes["remnawave_short_uuid"] = {"old": sub.remnawave_short_uuid, "new": panel_user.short_uuid} + sub.remnawave_short_uuid = panel_user.short_uuid + + # Update traffic usage if requested + if request.update_traffic and user.subscription: + panel_traffic_used = panel_user.used_traffic_bytes / (1024**3) if panel_user.used_traffic_bytes else 0 + if abs((user.subscription.traffic_used_gb or 0) - panel_traffic_used) > 0.01: + changes["traffic_used_gb"] = {"old": user.subscription.traffic_used_gb, "new": panel_traffic_used} + user.subscription.traffic_used_gb = panel_traffic_used + + # Create subscription if missing but user exists in panel + if request.create_if_missing and not user.subscription and panel_user.expire_at: + from app.database.crud.subscription import create_paid_subscription + + panel_traffic_limit = int(panel_user.traffic_limit_bytes / (1024**3)) if panel_user.traffic_limit_bytes else 100 + days_remaining = max(1, (panel_user.expire_at - datetime.utcnow()).days) + + new_sub = await create_paid_subscription( + db=db, + user_id=user.id, + duration_days=days_remaining, + traffic_limit_gb=panel_traffic_limit, + device_limit=panel_user.hwid_device_limit or 1, + connected_squads=active_squads, + ) + new_sub.remnawave_short_uuid = panel_user.short_uuid + new_sub.subscription_url = panel_user.subscription_url + changes["subscription_created"] = True + + # Update last sync time + user.last_remnawave_sync = datetime.utcnow() + user.updated_at = datetime.utcnow() + + await db.commit() + + logger.info(f"Admin {admin.id} synced user {user_id} from panel. Changes: {list(changes.keys())}") + + return SyncFromPanelResponse( + success=True, + message=f"Synced {len(changes)} changes from panel" if changes else "No changes needed", + panel_user=panel_info, + changes=changes, + errors=errors, + ) + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error syncing user {user_id} from panel: {e}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Sync error: {str(e)}", + ) + + +@router.post("/{user_id}/sync/to-panel", response_model=SyncToPanelResponse) +async def sync_user_to_panel( + user_id: int, + request: SyncToPanelRequest = SyncToPanelRequest(), + admin: User = Depends(get_current_admin_user), + db: AsyncSession = Depends(get_cabinet_db), +): + """ + Sync user data FROM bot TO panel. + + Sends user/subscription data to Remnawave panel, creating or updating as needed. + """ + user = await get_user_by_id(db, user_id) + if not user: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="User not found", + ) + + if not user.subscription: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="User has no subscription to sync", + ) + + try: + from app.services.remnawave_service import RemnaWaveService + from app.external.remnawave_api import UserStatus as PanelUserStatus, TrafficLimitStrategy + from app.config import settings + from app.utils.subscription_utils import resolve_hwid_device_limit_for_payload + + service = RemnaWaveService() + if not service.is_configured: + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail=service.configuration_error or "Remnawave API not configured", + ) + + sub = user.subscription + changes = {} + errors = [] + action = "no_changes" + panel_uuid = user.remnawave_uuid + + # Prepare data for panel + is_active = ( + sub.status in (SubscriptionStatus.ACTIVE.value, SubscriptionStatus.TRIAL.value) + and sub.end_date + and sub.end_date > datetime.utcnow() + ) + panel_status = PanelUserStatus.ACTIVE if is_active else PanelUserStatus.DISABLED + + # Ensure expire_at is in future for panel + expire_at = sub.end_date + if expire_at and expire_at <= datetime.utcnow(): + expire_at = datetime.utcnow() + timedelta(minutes=1) + + username = settings.format_remnawave_username( + full_name=user.full_name, + username=user.username, + telegram_id=user.telegram_id, + ) + + description = settings.format_remnawave_user_description( + full_name=user.full_name, + username=user.username, + telegram_id=user.telegram_id, + ) + + hwid_limit = resolve_hwid_device_limit_for_payload(sub) + traffic_limit_bytes = sub.traffic_limit_gb * (1024**3) if sub.traffic_limit_gb > 0 else 0 + + async with service.get_api_client() as api: + # Try to find existing user in panel + if not panel_uuid: + existing_users = await api.get_user_by_telegram_id(user.telegram_id) + if existing_users: + panel_uuid = existing_users[0].uuid + user.remnawave_uuid = panel_uuid + changes["remnawave_uuid_discovered"] = panel_uuid + + if panel_uuid: + # Update existing user + update_kwargs = {"uuid": panel_uuid} + + if request.update_status: + update_kwargs["status"] = panel_status + changes["status"] = panel_status.value + + if request.update_expire_date and expire_at: + update_kwargs["expire_at"] = expire_at + changes["expire_at"] = expire_at.isoformat() + + if request.update_traffic_limit: + update_kwargs["traffic_limit_bytes"] = traffic_limit_bytes + update_kwargs["traffic_limit_strategy"] = TrafficLimitStrategy.MONTH + changes["traffic_limit_gb"] = sub.traffic_limit_gb + + if request.update_squads and sub.connected_squads: + update_kwargs["active_internal_squads"] = sub.connected_squads + changes["connected_squads"] = sub.connected_squads + + update_kwargs["description"] = description + if hwid_limit is not None: + update_kwargs["hwid_device_limit"] = hwid_limit + changes["device_limit"] = hwid_limit + + try: + await api.update_user(**update_kwargs) + action = "updated" + except Exception as update_error: + if hasattr(update_error, 'status_code') and update_error.status_code == 404: + # User not found in panel, create new + panel_uuid = None + else: + raise + + if not panel_uuid and request.create_if_missing: + # Create new user in panel + create_kwargs = { + "username": username, + "expire_at": expire_at or (datetime.utcnow() + timedelta(days=30)), + "status": panel_status, + "traffic_limit_bytes": traffic_limit_bytes, + "traffic_limit_strategy": TrafficLimitStrategy.MONTH, + "telegram_id": user.telegram_id, + "description": description, + "active_internal_squads": sub.connected_squads or [], + } + + if hwid_limit is not None: + create_kwargs["hwid_device_limit"] = hwid_limit + + new_panel_user = await api.create_user(**create_kwargs) + panel_uuid = new_panel_user.uuid + user.remnawave_uuid = new_panel_user.uuid + sub.remnawave_short_uuid = new_panel_user.short_uuid + sub.subscription_url = new_panel_user.subscription_url + + changes["created_in_panel"] = True + changes["panel_uuid"] = panel_uuid + changes["short_uuid"] = new_panel_user.short_uuid + action = "created" + + # Update last sync time + user.last_remnawave_sync = datetime.utcnow() + user.updated_at = datetime.utcnow() + + await db.commit() + + logger.info(f"Admin {admin.id} synced user {user_id} to panel. Action: {action}") + + return SyncToPanelResponse( + success=True, + message=f"User {action} in panel" if action != "no_changes" else "No changes needed", + action=action, + panel_uuid=panel_uuid, + changes=changes, + errors=errors, + ) + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error syncing user {user_id} to panel: {e}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Sync error: {str(e)}", + )