From a9fb8a069de6bfe87ef8259950c4c251be9f469d Mon Sep 17 00:00:00 2001 From: Egor Date: Sun, 2 Nov 2025 22:21:50 +0300 Subject: [PATCH 01/12] Fix user search when query exceeds int32 --- app/database/crud/user.py | 14 ++++++++++---- app/webapi/routes/users.py | 8 ++++++-- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/app/database/crud/user.py b/app/database/crud/user.py index e0295e73..82184318 100644 --- a/app/database/crud/user.py +++ b/app/database/crud/user.py @@ -26,6 +26,8 @@ from app.utils.validators import sanitize_telegram_name logger = logging.getLogger(__name__) +INT32_MAX = 2_147_483_647 + def generate_referral_code() -> str: alphabet = string.ascii_letters + string.digits @@ -538,8 +540,10 @@ async def get_users_list( ] if search.isdigit(): - conditions.append(User.telegram_id == int(search)) - conditions.append(User.id == int(search)) # Add support for searching by internal user ID + search_int = int(search) + conditions.append(User.telegram_id == search_int) + if search_int <= INT32_MAX: + conditions.append(User.id == search_int) # Add support for searching by internal user ID query = query.where(or_(*conditions)) @@ -636,8 +640,10 @@ async def get_users_count( ] if search.isdigit(): - conditions.append(User.telegram_id == int(search)) - conditions.append(User.id == int(search)) # Add support for searching by internal user ID + search_int = int(search) + conditions.append(User.telegram_id == search_int) + if search_int <= INT32_MAX: + conditions.append(User.id == search_int) # Add support for searching by internal user ID query = query.where(or_(*conditions)) diff --git a/app/webapi/routes/users.py b/app/webapi/routes/users.py index c2031719..91fdf3a2 100644 --- a/app/webapi/routes/users.py +++ b/app/webapi/routes/users.py @@ -31,6 +31,8 @@ from ..schemas.users import ( router = APIRouter() +INT32_MAX = 2_147_483_647 + def _serialize_promo_group(group: Optional[PromoGroup]) -> Optional[PromoGroupSummary]: if not group: @@ -103,8 +105,10 @@ def _apply_search_filter(query, search: str): ] if search.isdigit(): - conditions.append(User.telegram_id == int(search)) - conditions.append(User.id == int(search)) + search_int = int(search) + conditions.append(User.telegram_id == search_int) + if search_int <= INT32_MAX: + conditions.append(User.id == search_int) return query.where(or_(*conditions)) From 62f8fdd9bbaeb321c92d663ffa9e2a96d94cf9e9 Mon Sep 17 00:00:00 2001 From: Egor Date: Sun, 2 Nov 2025 22:24:56 +0300 Subject: [PATCH 02/12] Revert "Fix user search when numeric query exceeds INT32 range" --- app/database/crud/user.py | 14 ++++---------- app/webapi/routes/users.py | 8 ++------ 2 files changed, 6 insertions(+), 16 deletions(-) diff --git a/app/database/crud/user.py b/app/database/crud/user.py index 82184318..e0295e73 100644 --- a/app/database/crud/user.py +++ b/app/database/crud/user.py @@ -26,8 +26,6 @@ from app.utils.validators import sanitize_telegram_name logger = logging.getLogger(__name__) -INT32_MAX = 2_147_483_647 - def generate_referral_code() -> str: alphabet = string.ascii_letters + string.digits @@ -540,10 +538,8 @@ async def get_users_list( ] if search.isdigit(): - search_int = int(search) - conditions.append(User.telegram_id == search_int) - if search_int <= INT32_MAX: - conditions.append(User.id == search_int) # Add support for searching by internal user ID + conditions.append(User.telegram_id == int(search)) + conditions.append(User.id == int(search)) # Add support for searching by internal user ID query = query.where(or_(*conditions)) @@ -640,10 +636,8 @@ async def get_users_count( ] if search.isdigit(): - search_int = int(search) - conditions.append(User.telegram_id == search_int) - if search_int <= INT32_MAX: - conditions.append(User.id == search_int) # Add support for searching by internal user ID + conditions.append(User.telegram_id == int(search)) + conditions.append(User.id == int(search)) # Add support for searching by internal user ID query = query.where(or_(*conditions)) diff --git a/app/webapi/routes/users.py b/app/webapi/routes/users.py index 91fdf3a2..c2031719 100644 --- a/app/webapi/routes/users.py +++ b/app/webapi/routes/users.py @@ -31,8 +31,6 @@ from ..schemas.users import ( router = APIRouter() -INT32_MAX = 2_147_483_647 - def _serialize_promo_group(group: Optional[PromoGroup]) -> Optional[PromoGroupSummary]: if not group: @@ -105,10 +103,8 @@ def _apply_search_filter(query, search: str): ] if search.isdigit(): - search_int = int(search) - conditions.append(User.telegram_id == search_int) - if search_int <= INT32_MAX: - conditions.append(User.id == search_int) + conditions.append(User.telegram_id == int(search)) + conditions.append(User.id == int(search)) return query.where(or_(*conditions)) From d47ca550ae7e47967629aa558bbe34a20e1a1551 Mon Sep 17 00:00:00 2001 From: Egor Date: Mon, 3 Nov 2025 06:59:29 +0300 Subject: [PATCH 03/12] Fix RemnaWave user sync tests by restoring create_user usage --- app/database/crud/server_squad.py | 13 +- app/services/remnawave_service.py | 18 +- app/webapi/app.py | 9 + app/webapi/routes/__init__.py | 2 + app/webapi/routes/servers.py | 386 ++++++++++++++++++++++++++++++ app/webapi/schemas/servers.py | 160 +++++++++++++ 6 files changed, 579 insertions(+), 9 deletions(-) create mode 100644 app/webapi/routes/servers.py create mode 100644 app/webapi/schemas/servers.py diff --git a/app/database/crud/server_squad.py b/app/database/crud/server_squad.py index a2021bd5..29fba112 100644 --- a/app/database/crud/server_squad.py +++ b/app/database/crud/server_squad.py @@ -47,6 +47,7 @@ async def create_server_squad( max_users: int = None, is_available: bool = True, is_trial_eligible: bool = False, + sort_order: int = 0, promo_group_ids: Optional[Iterable[int]] = None, ) -> ServerSquad: @@ -80,6 +81,7 @@ async def create_server_squad( max_users=max_users, is_available=is_available, is_trial_eligible=is_trial_eligible, + sort_order=sort_order, allowed_promo_groups=promo_groups, ) @@ -260,8 +262,15 @@ async def update_server_squad( ) -> Optional[ServerSquad]: valid_fields = { - 'display_name', 'country_code', 'price_kopeks', 'description', - 'max_users', 'is_available', 'sort_order', 'is_trial_eligible' + "display_name", + "original_name", + "country_code", + "price_kopeks", + "description", + "max_users", + "is_available", + "sort_order", + "is_trial_eligible", } filtered_updates = {k: v for k, v in updates.items() if k in valid_fields} diff --git a/app/services/remnawave_service.py b/app/services/remnawave_service.py index 0a165bcd..2db162d6 100644 --- a/app/services/remnawave_service.py +++ b/app/services/remnawave_service.py @@ -18,7 +18,6 @@ from sqlalchemy.orm import selectinload from sqlalchemy.ext.asyncio import AsyncSession from app.database.crud.user import ( create_user, - create_user_no_commit, get_users_list, get_user_by_telegram_id, update_user, @@ -303,27 +302,32 @@ class RemnaWaveService: first_name_from_desc, last_name_from_desc, username_from_desc = self._extract_user_data_from_description(description) # Используем извлеченное имя или дефолтное значение + fallback_first_name = f"Panel User {telegram_id}" + full_first_name = fallback_first_name + full_last_name = None + if first_name_from_desc and last_name_from_desc: full_first_name = first_name_from_desc full_last_name = last_name_from_desc elif first_name_from_desc: full_first_name = first_name_from_desc full_last_name = last_name_from_desc - else: - full_first_name = f"User {telegram_id}" - full_last_name = None - + username = username_from_desc or panel_user.get("username") try: - db_user = await create_user_no_commit( + create_kwargs = dict( db=db, telegram_id=telegram_id, username=username, first_name=full_first_name, - last_name=full_last_name, language="ru", ) + + if full_last_name: + create_kwargs["last_name"] = full_last_name + + db_user = await create_user(**create_kwargs) return db_user, True except IntegrityError as create_error: logger.info( diff --git a/app/webapi/app.py b/app/webapi/app.py index d5848663..07b86b86 100644 --- a/app/webapi/app.py +++ b/app/webapi/app.py @@ -20,6 +20,7 @@ from .routes import ( promo_offers, pages, remnawave, + servers, stats, subscriptions, tickets, @@ -67,6 +68,13 @@ OPENAPI_TAGS = [ "name": "promo-groups", "description": "Создание и управление промо-группами и их участниками.", }, + { + "name": "servers", + "description": ( + "Управление серверами RemnaWave, их доступностью, промогруппами и " + "ручная синхронизация данных.", + ), + }, { "name": "promo-offers", "description": "Управление промо-предложениями, шаблонами и журналом событий.", @@ -137,6 +145,7 @@ def create_web_api_app() -> FastAPI: app.include_router(transactions.router, prefix="/transactions", tags=["transactions"]) app.include_router(promo_groups.router, prefix="/promo-groups", tags=["promo-groups"]) app.include_router(promo_offers.router, prefix="/promo-offers", tags=["promo-offers"]) + app.include_router(servers.router, prefix="/servers", tags=["servers"]) app.include_router( main_menu_buttons.router, prefix="/main-menu/buttons", diff --git a/app/webapi/routes/__init__.py b/app/webapi/routes/__init__.py index 85684d75..e4320cf9 100644 --- a/app/webapi/routes/__init__.py +++ b/app/webapi/routes/__init__.py @@ -7,6 +7,7 @@ from . import ( promo_offers, pages, promo_groups, + servers, remnawave, stats, subscriptions, @@ -26,6 +27,7 @@ __all__ = [ "promo_offers", "pages", "promo_groups", + "servers", "remnawave", "stats", "subscriptions", diff --git a/app/webapi/routes/servers.py b/app/webapi/routes/servers.py new file mode 100644 index 00000000..38f2883f --- /dev/null +++ b/app/webapi/routes/servers.py @@ -0,0 +1,386 @@ +"""Маршруты управления серверами в административном API.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any, Iterable, List, Optional + +from fastapi import APIRouter, Depends, HTTPException, Query, Security, status +from sqlalchemy import func, or_, select +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.orm import selectinload + +from app.database.crud.server_squad import ( + create_server_squad, + delete_server_squad, + get_server_connected_users, + get_server_squad_by_id, + get_server_squad_by_uuid, + get_server_statistics, + sync_server_user_counts, + sync_with_remnawave, + update_server_squad, + update_server_squad_promo_groups, +) +from app.database.models import PromoGroup, ServerSquad, User +from app.utils.cache import cache + +from ..dependencies import get_db_session, require_api_token +from ..schemas.servers import ( + ServerConnectedUser, + ServerConnectedUsersResponse, + ServerCountsSyncResponse, + ServerCreateRequest, + ServerDeleteResponse, + ServerListResponse, + ServerResponse, + ServerStatisticsResponse, + ServerSyncResponse, + ServerUpdateRequest, +) +from ..schemas.users import PromoGroupSummary + +try: # pragma: no cover - импорт может провалиться без optional-зависимостей + from app.services.remnawave_service import RemnaWaveService # type: ignore +except Exception: # pragma: no cover - скрываем функционал, если сервис недоступен + RemnaWaveService = None # type: ignore[assignment] + + +if TYPE_CHECKING: # pragma: no cover - только для подсказок типов в IDE + from app.services.remnawave_service import ( # type: ignore + RemnaWaveService as RemnaWaveServiceType, + ) +else: + RemnaWaveServiceType = Any + + +router = APIRouter() + + +def _serialize_promo_group(group: PromoGroup) -> PromoGroupSummary: + return PromoGroupSummary( + id=group.id, + name=group.name, + server_discount_percent=group.server_discount_percent, + traffic_discount_percent=group.traffic_discount_percent, + device_discount_percent=group.device_discount_percent, + apply_discounts_to_addons=getattr(group, "apply_discounts_to_addons", True), + ) + + +def _serialize_server(server: ServerSquad) -> ServerResponse: + promo_groups = [ + _serialize_promo_group(group) + for group in sorted( + getattr(server, "allowed_promo_groups", []) or [], + key=lambda pg: pg.name.lower() if getattr(pg, "name", None) else "", + ) + ] + + return ServerResponse( + id=server.id, + squad_uuid=server.squad_uuid, + display_name=server.display_name, + original_name=server.original_name, + country_code=server.country_code, + is_available=bool(server.is_available), + is_trial_eligible=bool(server.is_trial_eligible), + price_kopeks=int(server.price_kopeks or 0), + price_rubles=round((server.price_kopeks or 0) / 100, 2), + description=server.description, + sort_order=int(server.sort_order or 0), + max_users=server.max_users, + current_users=int(server.current_users or 0), + created_at=getattr(server, "created_at", None), + updated_at=getattr(server, "updated_at", None), + promo_groups=promo_groups, + ) + + +def _serialize_connected_user(user: User) -> ServerConnectedUser: + subscription = getattr(user, "subscription", None) + subscription_status = getattr(subscription, "status", None) + if hasattr(subscription_status, "value"): + subscription_status = subscription_status.value + + return ServerConnectedUser( + id=user.id, + telegram_id=user.telegram_id, + username=user.username, + first_name=user.first_name, + last_name=user.last_name, + status=getattr(getattr(user, "status", None), "value", user.status), + balance_kopeks=int(user.balance_kopeks or 0), + balance_rubles=round((user.balance_kopeks or 0) / 100, 2), + subscription_id=getattr(subscription, "id", None), + subscription_status=subscription_status, + subscription_end_date=getattr(subscription, "end_date", None), + ) + + +def _apply_filters( + filters: Iterable[Any], + query, +): + for condition in filters: + query = query.where(condition) + return query + + +def _get_remnawave_service() -> "RemnaWaveServiceType": + if RemnaWaveService is None: # pragma: no cover - зависимость не доступна + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail="RemnaWave сервис недоступен", + ) + + return RemnaWaveService() + + +def _ensure_service_configured(service: "RemnaWaveServiceType") -> None: + if RemnaWaveService is None: # pragma: no cover - зависимость не доступна + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail="RemnaWave сервис недоступен", + ) + + if not service.is_configured: + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail=service.configuration_error or "RemnaWave API не настроен", + ) + + +@router.get("", response_model=ServerListResponse) +async def list_servers( + _: Any = Security(require_api_token), + db: AsyncSession = Depends(get_db_session), + page: int = Query(1, ge=1), + limit: int = Query(50, ge=1, le=200), + available_only: bool = Query(False, alias="available"), + search: Optional[str] = Query(default=None), +) -> ServerListResponse: + filters = [] + + if available_only: + filters.append(ServerSquad.is_available.is_(True)) + + if search: + pattern = f"%{search.lower()}%" + filters.append( + or_( + func.lower(ServerSquad.display_name).like(pattern), + func.lower(ServerSquad.original_name).like(pattern), + func.lower(ServerSquad.squad_uuid).like(pattern), + func.lower(ServerSquad.country_code).like(pattern), + ) + ) + + base_query = ( + select(ServerSquad) + .options(selectinload(ServerSquad.allowed_promo_groups)) + .order_by(ServerSquad.sort_order, ServerSquad.display_name) + ) + + count_query = select(func.count(ServerSquad.id)) + + if filters: + base_query = _apply_filters(filters, base_query) + count_query = _apply_filters(filters, count_query) + + total = await db.scalar(count_query) or 0 + + result = await db.execute( + base_query.offset((page - 1) * limit).limit(limit) + ) + servers = result.scalars().unique().all() + + return ServerListResponse( + items=[_serialize_server(server) for server in servers], + total=int(total), + page=page, + limit=limit, + ) + + +@router.get("/stats", response_model=ServerStatisticsResponse) +async def get_servers_statistics( + _: Any = Security(require_api_token), + db: AsyncSession = Depends(get_db_session), +) -> ServerStatisticsResponse: + stats = await get_server_statistics(db) + + return ServerStatisticsResponse( + total_servers=int(stats.get("total_servers", 0) or 0), + available_servers=int(stats.get("available_servers", 0) or 0), + unavailable_servers=int(stats.get("unavailable_servers", 0) or 0), + servers_with_connections=int(stats.get("servers_with_connections", 0) or 0), + total_revenue_kopeks=int(stats.get("total_revenue_kopeks", 0) or 0), + total_revenue_rubles=float(stats.get("total_revenue_rubles", 0) or 0), + ) + + +@router.post("", response_model=ServerResponse, status_code=status.HTTP_201_CREATED) +async def create_server_endpoint( + payload: ServerCreateRequest, + _: Any = Security(require_api_token), + db: AsyncSession = Depends(get_db_session), +) -> ServerResponse: + existing = await get_server_squad_by_uuid(db, payload.squad_uuid) + if existing: + raise HTTPException( + status.HTTP_400_BAD_REQUEST, + "Server with this UUID already exists", + ) + + try: + server = await create_server_squad( + db, + squad_uuid=payload.squad_uuid, + display_name=payload.display_name, + original_name=payload.original_name, + country_code=payload.country_code, + price_kopeks=payload.price_kopeks, + description=payload.description, + max_users=payload.max_users, + is_available=payload.is_available, + is_trial_eligible=payload.is_trial_eligible, + sort_order=payload.sort_order, + promo_group_ids=payload.promo_group_ids, + ) + except ValueError as error: + raise HTTPException(status.HTTP_400_BAD_REQUEST, str(error)) from error + + await cache.delete_pattern("available_countries*") + + server = await get_server_squad_by_id(db, server.id) + assert server is not None + return _serialize_server(server) + + +@router.get("/{server_id}", response_model=ServerResponse) +async def get_server_endpoint( + server_id: int, + _: Any = Security(require_api_token), + db: AsyncSession = Depends(get_db_session), +) -> ServerResponse: + server = await get_server_squad_by_id(db, server_id) + if not server: + raise HTTPException(status.HTTP_404_NOT_FOUND, "Server not found") + + return _serialize_server(server) + + +@router.patch("/{server_id}", response_model=ServerResponse) +async def update_server_endpoint( + server_id: int, + payload: ServerUpdateRequest, + _: Any = Security(require_api_token), + db: AsyncSession = Depends(get_db_session), +) -> ServerResponse: + server = await get_server_squad_by_id(db, server_id) + if not server: + raise HTTPException(status.HTTP_404_NOT_FOUND, "Server not found") + + updates = payload.model_dump(exclude_unset=True, by_alias=False) + promo_group_ids = updates.pop("promo_group_ids", None) + + if updates: + server = await update_server_squad(db, server_id, **updates) or server + + if promo_group_ids is not None: + try: + server = await update_server_squad_promo_groups( + db, server_id, promo_group_ids + ) or server + except ValueError as error: + raise HTTPException(status.HTTP_400_BAD_REQUEST, str(error)) from error + + await cache.delete_pattern("available_countries*") + + server = await get_server_squad_by_id(db, server_id) + assert server is not None + return _serialize_server(server) + + +@router.delete("/{server_id}", response_model=ServerDeleteResponse) +async def delete_server_endpoint( + server_id: int, + _: Any = Security(require_api_token), + db: AsyncSession = Depends(get_db_session), +) -> ServerDeleteResponse: + server = await get_server_squad_by_id(db, server_id) + if not server: + raise HTTPException(status.HTTP_404_NOT_FOUND, "Server not found") + + deleted = await delete_server_squad(db, server_id) + if not deleted: + raise HTTPException( + status.HTTP_400_BAD_REQUEST, + "Server cannot be deleted because it has active connections", + ) + + await cache.delete_pattern("available_countries*") + + return ServerDeleteResponse(success=True, message="Server deleted") + + +@router.get( + "/{server_id}/users", + response_model=ServerConnectedUsersResponse, +) +async def get_server_connected_users_endpoint( + server_id: int, + _: Any = Security(require_api_token), + db: AsyncSession = Depends(get_db_session), + limit: int = Query(100, ge=1, le=500), + offset: int = Query(0, ge=0), +) -> ServerConnectedUsersResponse: + server = await get_server_squad_by_id(db, server_id) + if not server: + raise HTTPException(status.HTTP_404_NOT_FOUND, "Server not found") + + users = await get_server_connected_users(db, server_id) + total = len(users) + sliced = users[offset : offset + limit] + + return ServerConnectedUsersResponse( + items=[_serialize_connected_user(user) for user in sliced], + total=total, + limit=limit, + offset=offset, + ) + + +@router.post("/sync", response_model=ServerSyncResponse) +async def sync_servers_with_remnawave( + _: Any = Security(require_api_token), + db: AsyncSession = Depends(get_db_session), +) -> ServerSyncResponse: + service = _get_remnawave_service() + _ensure_service_configured(service) + + squads = await service.get_all_squads() + total = len(squads) + + created = updated = removed = 0 + if squads: + created, updated, removed = await sync_with_remnawave(db, squads) + + await cache.delete_pattern("available_countries*") + + return ServerSyncResponse( + created=created, + updated=updated, + removed=removed, + total=total, + ) + + +@router.post("/sync-counts", response_model=ServerCountsSyncResponse) +async def sync_server_counts( + _: Any = Security(require_api_token), + db: AsyncSession = Depends(get_db_session), +) -> ServerCountsSyncResponse: + updated = await sync_server_user_counts(db) + return ServerCountsSyncResponse(updated=updated) + diff --git a/app/webapi/schemas/servers.py b/app/webapi/schemas/servers.py new file mode 100644 index 00000000..478ffb9b --- /dev/null +++ b/app/webapi/schemas/servers.py @@ -0,0 +1,160 @@ +"""Pydantic-схемы для управления серверами через Web API.""" + +from __future__ import annotations + +from datetime import datetime +from typing import List, Optional + +from pydantic import BaseModel, ConfigDict, Field + +from .users import PromoGroupSummary + + +class ServerResponse(BaseModel): + """Полная информация о сервере.""" + + model_config = ConfigDict(from_attributes=True, populate_by_name=True) + + id: int + squad_uuid: str = Field(alias="squadUuid") + display_name: str = Field(alias="displayName") + original_name: Optional[str] = Field(default=None, alias="originalName") + country_code: Optional[str] = Field(default=None, alias="countryCode") + is_available: bool = Field(alias="isAvailable") + is_trial_eligible: bool = Field(default=False, alias="isTrialEligible") + price_kopeks: int = Field(alias="priceKopeks") + price_rubles: float = Field(alias="priceRubles") + description: Optional[str] = None + sort_order: int = Field(default=0, alias="sortOrder") + max_users: Optional[int] = Field(default=None, alias="maxUsers") + current_users: int = Field(default=0, alias="currentUsers") + created_at: Optional[datetime] = Field(default=None, alias="createdAt") + updated_at: Optional[datetime] = Field(default=None, alias="updatedAt") + promo_groups: List[PromoGroupSummary] = Field( + default_factory=list, alias="promoGroups" + ) + + +class ServerListResponse(BaseModel): + """Список серверов с пагинацией.""" + + items: List[ServerResponse] + total: int + page: int + limit: int + + +class ServerCreateRequest(BaseModel): + """Запрос на создание сервера.""" + + squad_uuid: str = Field(alias="squadUuid") + display_name: str = Field(alias="displayName") + original_name: Optional[str] = Field(default=None, alias="originalName") + country_code: Optional[str] = Field(default=None, alias="countryCode") + price_kopeks: int = Field(default=0, alias="priceKopeks") + description: Optional[str] = None + max_users: Optional[int] = Field(default=None, alias="maxUsers") + is_available: bool = Field(default=True, alias="isAvailable") + is_trial_eligible: bool = Field(default=False, alias="isTrialEligible") + sort_order: int = Field(default=0, alias="sortOrder") + promo_group_ids: Optional[List[int]] = Field( + default=None, + alias="promoGroupIds", + description="Список идентификаторов промогрупп, доступных на сервере.", + ) + + +class ServerUpdateRequest(BaseModel): + """Запрос на обновление свойств сервера.""" + + display_name: Optional[str] = Field(default=None, alias="displayName") + original_name: Optional[str] = Field(default=None, alias="originalName") + country_code: Optional[str] = Field(default=None, alias="countryCode") + price_kopeks: Optional[int] = Field(default=None, alias="priceKopeks") + description: Optional[str] = None + max_users: Optional[int] = Field(default=None, alias="maxUsers") + is_available: Optional[bool] = Field(default=None, alias="isAvailable") + is_trial_eligible: Optional[bool] = Field( + default=None, alias="isTrialEligible" + ) + sort_order: Optional[int] = Field(default=None, alias="sortOrder") + promo_group_ids: Optional[List[int]] = Field( + default=None, + alias="promoGroupIds", + description="Если передан список, он заменит текущие промогруппы сервера.", + ) + + +class ServerSyncResponse(BaseModel): + """Результат синхронизации серверов с RemnaWave.""" + + model_config = ConfigDict(populate_by_name=True) + + created: int + updated: int + removed: int + total: int + + +class ServerStatisticsResponse(BaseModel): + """Агрегированная статистика по серверам.""" + + model_config = ConfigDict(populate_by_name=True) + + total_servers: int = Field(alias="totalServers") + available_servers: int = Field(alias="availableServers") + unavailable_servers: int = Field(alias="unavailableServers") + servers_with_connections: int = Field(alias="serversWithConnections") + total_revenue_kopeks: int = Field(alias="totalRevenueKopeks") + total_revenue_rubles: float = Field(alias="totalRevenueRubles") + + +class ServerCountsSyncResponse(BaseModel): + """Результат обновления счетчиков пользователей серверов.""" + + model_config = ConfigDict(populate_by_name=True) + + updated: int + + +class ServerConnectedUser(BaseModel): + """Краткая информация о пользователе, подключенном к серверу.""" + + model_config = ConfigDict(populate_by_name=True) + + id: int + telegram_id: int = Field(alias="telegramId") + username: Optional[str] = None + first_name: Optional[str] = Field(default=None, alias="firstName") + last_name: Optional[str] = Field(default=None, alias="lastName") + status: str + balance_kopeks: int = Field(alias="balanceKopeks") + balance_rubles: float = Field(alias="balanceRubles") + subscription_id: Optional[int] = Field(default=None, alias="subscriptionId") + subscription_status: Optional[str] = Field( + default=None, alias="subscriptionStatus" + ) + subscription_end_date: Optional[datetime] = Field( + default=None, alias="subscriptionEndDate" + ) + + +class ServerConnectedUsersResponse(BaseModel): + """Список пользователей, подключенных к серверу.""" + + model_config = ConfigDict(populate_by_name=True) + + items: List[ServerConnectedUser] + total: int + limit: int + offset: int + + +class ServerDeleteResponse(BaseModel): + """Ответ при удалении сервера.""" + + model_config = ConfigDict(populate_by_name=True) + + success: bool + message: str + From 348f02f34c9133a7fedead620206bd8976959129 Mon Sep 17 00:00:00 2001 From: Egor Date: Mon, 3 Nov 2025 07:18:24 +0300 Subject: [PATCH 04/12] Revert "Fix RemnaWave panel user creation defaults" --- app/database/crud/server_squad.py | 13 +- app/services/remnawave_service.py | 18 +- app/webapi/app.py | 9 - app/webapi/routes/__init__.py | 2 - app/webapi/routes/servers.py | 386 ------------------------------ app/webapi/schemas/servers.py | 160 ------------- 6 files changed, 9 insertions(+), 579 deletions(-) delete mode 100644 app/webapi/routes/servers.py delete mode 100644 app/webapi/schemas/servers.py diff --git a/app/database/crud/server_squad.py b/app/database/crud/server_squad.py index 29fba112..a2021bd5 100644 --- a/app/database/crud/server_squad.py +++ b/app/database/crud/server_squad.py @@ -47,7 +47,6 @@ async def create_server_squad( max_users: int = None, is_available: bool = True, is_trial_eligible: bool = False, - sort_order: int = 0, promo_group_ids: Optional[Iterable[int]] = None, ) -> ServerSquad: @@ -81,7 +80,6 @@ async def create_server_squad( max_users=max_users, is_available=is_available, is_trial_eligible=is_trial_eligible, - sort_order=sort_order, allowed_promo_groups=promo_groups, ) @@ -262,15 +260,8 @@ async def update_server_squad( ) -> Optional[ServerSquad]: valid_fields = { - "display_name", - "original_name", - "country_code", - "price_kopeks", - "description", - "max_users", - "is_available", - "sort_order", - "is_trial_eligible", + 'display_name', 'country_code', 'price_kopeks', 'description', + 'max_users', 'is_available', 'sort_order', 'is_trial_eligible' } filtered_updates = {k: v for k, v in updates.items() if k in valid_fields} diff --git a/app/services/remnawave_service.py b/app/services/remnawave_service.py index 2db162d6..0a165bcd 100644 --- a/app/services/remnawave_service.py +++ b/app/services/remnawave_service.py @@ -18,6 +18,7 @@ from sqlalchemy.orm import selectinload from sqlalchemy.ext.asyncio import AsyncSession from app.database.crud.user import ( create_user, + create_user_no_commit, get_users_list, get_user_by_telegram_id, update_user, @@ -302,32 +303,27 @@ class RemnaWaveService: first_name_from_desc, last_name_from_desc, username_from_desc = self._extract_user_data_from_description(description) # Используем извлеченное имя или дефолтное значение - fallback_first_name = f"Panel User {telegram_id}" - full_first_name = fallback_first_name - full_last_name = None - if first_name_from_desc and last_name_from_desc: full_first_name = first_name_from_desc full_last_name = last_name_from_desc elif first_name_from_desc: full_first_name = first_name_from_desc full_last_name = last_name_from_desc - + else: + full_first_name = f"User {telegram_id}" + full_last_name = None + username = username_from_desc or panel_user.get("username") try: - create_kwargs = dict( + db_user = await create_user_no_commit( db=db, telegram_id=telegram_id, username=username, first_name=full_first_name, + last_name=full_last_name, language="ru", ) - - if full_last_name: - create_kwargs["last_name"] = full_last_name - - db_user = await create_user(**create_kwargs) return db_user, True except IntegrityError as create_error: logger.info( diff --git a/app/webapi/app.py b/app/webapi/app.py index 07b86b86..d5848663 100644 --- a/app/webapi/app.py +++ b/app/webapi/app.py @@ -20,7 +20,6 @@ from .routes import ( promo_offers, pages, remnawave, - servers, stats, subscriptions, tickets, @@ -68,13 +67,6 @@ OPENAPI_TAGS = [ "name": "promo-groups", "description": "Создание и управление промо-группами и их участниками.", }, - { - "name": "servers", - "description": ( - "Управление серверами RemnaWave, их доступностью, промогруппами и " - "ручная синхронизация данных.", - ), - }, { "name": "promo-offers", "description": "Управление промо-предложениями, шаблонами и журналом событий.", @@ -145,7 +137,6 @@ def create_web_api_app() -> FastAPI: app.include_router(transactions.router, prefix="/transactions", tags=["transactions"]) app.include_router(promo_groups.router, prefix="/promo-groups", tags=["promo-groups"]) app.include_router(promo_offers.router, prefix="/promo-offers", tags=["promo-offers"]) - app.include_router(servers.router, prefix="/servers", tags=["servers"]) app.include_router( main_menu_buttons.router, prefix="/main-menu/buttons", diff --git a/app/webapi/routes/__init__.py b/app/webapi/routes/__init__.py index e4320cf9..85684d75 100644 --- a/app/webapi/routes/__init__.py +++ b/app/webapi/routes/__init__.py @@ -7,7 +7,6 @@ from . import ( promo_offers, pages, promo_groups, - servers, remnawave, stats, subscriptions, @@ -27,7 +26,6 @@ __all__ = [ "promo_offers", "pages", "promo_groups", - "servers", "remnawave", "stats", "subscriptions", diff --git a/app/webapi/routes/servers.py b/app/webapi/routes/servers.py deleted file mode 100644 index 38f2883f..00000000 --- a/app/webapi/routes/servers.py +++ /dev/null @@ -1,386 +0,0 @@ -"""Маршруты управления серверами в административном API.""" - -from __future__ import annotations - -from typing import TYPE_CHECKING, Any, Iterable, List, Optional - -from fastapi import APIRouter, Depends, HTTPException, Query, Security, status -from sqlalchemy import func, or_, select -from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy.orm import selectinload - -from app.database.crud.server_squad import ( - create_server_squad, - delete_server_squad, - get_server_connected_users, - get_server_squad_by_id, - get_server_squad_by_uuid, - get_server_statistics, - sync_server_user_counts, - sync_with_remnawave, - update_server_squad, - update_server_squad_promo_groups, -) -from app.database.models import PromoGroup, ServerSquad, User -from app.utils.cache import cache - -from ..dependencies import get_db_session, require_api_token -from ..schemas.servers import ( - ServerConnectedUser, - ServerConnectedUsersResponse, - ServerCountsSyncResponse, - ServerCreateRequest, - ServerDeleteResponse, - ServerListResponse, - ServerResponse, - ServerStatisticsResponse, - ServerSyncResponse, - ServerUpdateRequest, -) -from ..schemas.users import PromoGroupSummary - -try: # pragma: no cover - импорт может провалиться без optional-зависимостей - from app.services.remnawave_service import RemnaWaveService # type: ignore -except Exception: # pragma: no cover - скрываем функционал, если сервис недоступен - RemnaWaveService = None # type: ignore[assignment] - - -if TYPE_CHECKING: # pragma: no cover - только для подсказок типов в IDE - from app.services.remnawave_service import ( # type: ignore - RemnaWaveService as RemnaWaveServiceType, - ) -else: - RemnaWaveServiceType = Any - - -router = APIRouter() - - -def _serialize_promo_group(group: PromoGroup) -> PromoGroupSummary: - return PromoGroupSummary( - id=group.id, - name=group.name, - server_discount_percent=group.server_discount_percent, - traffic_discount_percent=group.traffic_discount_percent, - device_discount_percent=group.device_discount_percent, - apply_discounts_to_addons=getattr(group, "apply_discounts_to_addons", True), - ) - - -def _serialize_server(server: ServerSquad) -> ServerResponse: - promo_groups = [ - _serialize_promo_group(group) - for group in sorted( - getattr(server, "allowed_promo_groups", []) or [], - key=lambda pg: pg.name.lower() if getattr(pg, "name", None) else "", - ) - ] - - return ServerResponse( - id=server.id, - squad_uuid=server.squad_uuid, - display_name=server.display_name, - original_name=server.original_name, - country_code=server.country_code, - is_available=bool(server.is_available), - is_trial_eligible=bool(server.is_trial_eligible), - price_kopeks=int(server.price_kopeks or 0), - price_rubles=round((server.price_kopeks or 0) / 100, 2), - description=server.description, - sort_order=int(server.sort_order or 0), - max_users=server.max_users, - current_users=int(server.current_users or 0), - created_at=getattr(server, "created_at", None), - updated_at=getattr(server, "updated_at", None), - promo_groups=promo_groups, - ) - - -def _serialize_connected_user(user: User) -> ServerConnectedUser: - subscription = getattr(user, "subscription", None) - subscription_status = getattr(subscription, "status", None) - if hasattr(subscription_status, "value"): - subscription_status = subscription_status.value - - return ServerConnectedUser( - id=user.id, - telegram_id=user.telegram_id, - username=user.username, - first_name=user.first_name, - last_name=user.last_name, - status=getattr(getattr(user, "status", None), "value", user.status), - balance_kopeks=int(user.balance_kopeks or 0), - balance_rubles=round((user.balance_kopeks or 0) / 100, 2), - subscription_id=getattr(subscription, "id", None), - subscription_status=subscription_status, - subscription_end_date=getattr(subscription, "end_date", None), - ) - - -def _apply_filters( - filters: Iterable[Any], - query, -): - for condition in filters: - query = query.where(condition) - return query - - -def _get_remnawave_service() -> "RemnaWaveServiceType": - if RemnaWaveService is None: # pragma: no cover - зависимость не доступна - raise HTTPException( - status_code=status.HTTP_503_SERVICE_UNAVAILABLE, - detail="RemnaWave сервис недоступен", - ) - - return RemnaWaveService() - - -def _ensure_service_configured(service: "RemnaWaveServiceType") -> None: - if RemnaWaveService is None: # pragma: no cover - зависимость не доступна - raise HTTPException( - status_code=status.HTTP_503_SERVICE_UNAVAILABLE, - detail="RemnaWave сервис недоступен", - ) - - if not service.is_configured: - raise HTTPException( - status_code=status.HTTP_503_SERVICE_UNAVAILABLE, - detail=service.configuration_error or "RemnaWave API не настроен", - ) - - -@router.get("", response_model=ServerListResponse) -async def list_servers( - _: Any = Security(require_api_token), - db: AsyncSession = Depends(get_db_session), - page: int = Query(1, ge=1), - limit: int = Query(50, ge=1, le=200), - available_only: bool = Query(False, alias="available"), - search: Optional[str] = Query(default=None), -) -> ServerListResponse: - filters = [] - - if available_only: - filters.append(ServerSquad.is_available.is_(True)) - - if search: - pattern = f"%{search.lower()}%" - filters.append( - or_( - func.lower(ServerSquad.display_name).like(pattern), - func.lower(ServerSquad.original_name).like(pattern), - func.lower(ServerSquad.squad_uuid).like(pattern), - func.lower(ServerSquad.country_code).like(pattern), - ) - ) - - base_query = ( - select(ServerSquad) - .options(selectinload(ServerSquad.allowed_promo_groups)) - .order_by(ServerSquad.sort_order, ServerSquad.display_name) - ) - - count_query = select(func.count(ServerSquad.id)) - - if filters: - base_query = _apply_filters(filters, base_query) - count_query = _apply_filters(filters, count_query) - - total = await db.scalar(count_query) or 0 - - result = await db.execute( - base_query.offset((page - 1) * limit).limit(limit) - ) - servers = result.scalars().unique().all() - - return ServerListResponse( - items=[_serialize_server(server) for server in servers], - total=int(total), - page=page, - limit=limit, - ) - - -@router.get("/stats", response_model=ServerStatisticsResponse) -async def get_servers_statistics( - _: Any = Security(require_api_token), - db: AsyncSession = Depends(get_db_session), -) -> ServerStatisticsResponse: - stats = await get_server_statistics(db) - - return ServerStatisticsResponse( - total_servers=int(stats.get("total_servers", 0) or 0), - available_servers=int(stats.get("available_servers", 0) or 0), - unavailable_servers=int(stats.get("unavailable_servers", 0) or 0), - servers_with_connections=int(stats.get("servers_with_connections", 0) or 0), - total_revenue_kopeks=int(stats.get("total_revenue_kopeks", 0) or 0), - total_revenue_rubles=float(stats.get("total_revenue_rubles", 0) or 0), - ) - - -@router.post("", response_model=ServerResponse, status_code=status.HTTP_201_CREATED) -async def create_server_endpoint( - payload: ServerCreateRequest, - _: Any = Security(require_api_token), - db: AsyncSession = Depends(get_db_session), -) -> ServerResponse: - existing = await get_server_squad_by_uuid(db, payload.squad_uuid) - if existing: - raise HTTPException( - status.HTTP_400_BAD_REQUEST, - "Server with this UUID already exists", - ) - - try: - server = await create_server_squad( - db, - squad_uuid=payload.squad_uuid, - display_name=payload.display_name, - original_name=payload.original_name, - country_code=payload.country_code, - price_kopeks=payload.price_kopeks, - description=payload.description, - max_users=payload.max_users, - is_available=payload.is_available, - is_trial_eligible=payload.is_trial_eligible, - sort_order=payload.sort_order, - promo_group_ids=payload.promo_group_ids, - ) - except ValueError as error: - raise HTTPException(status.HTTP_400_BAD_REQUEST, str(error)) from error - - await cache.delete_pattern("available_countries*") - - server = await get_server_squad_by_id(db, server.id) - assert server is not None - return _serialize_server(server) - - -@router.get("/{server_id}", response_model=ServerResponse) -async def get_server_endpoint( - server_id: int, - _: Any = Security(require_api_token), - db: AsyncSession = Depends(get_db_session), -) -> ServerResponse: - server = await get_server_squad_by_id(db, server_id) - if not server: - raise HTTPException(status.HTTP_404_NOT_FOUND, "Server not found") - - return _serialize_server(server) - - -@router.patch("/{server_id}", response_model=ServerResponse) -async def update_server_endpoint( - server_id: int, - payload: ServerUpdateRequest, - _: Any = Security(require_api_token), - db: AsyncSession = Depends(get_db_session), -) -> ServerResponse: - server = await get_server_squad_by_id(db, server_id) - if not server: - raise HTTPException(status.HTTP_404_NOT_FOUND, "Server not found") - - updates = payload.model_dump(exclude_unset=True, by_alias=False) - promo_group_ids = updates.pop("promo_group_ids", None) - - if updates: - server = await update_server_squad(db, server_id, **updates) or server - - if promo_group_ids is not None: - try: - server = await update_server_squad_promo_groups( - db, server_id, promo_group_ids - ) or server - except ValueError as error: - raise HTTPException(status.HTTP_400_BAD_REQUEST, str(error)) from error - - await cache.delete_pattern("available_countries*") - - server = await get_server_squad_by_id(db, server_id) - assert server is not None - return _serialize_server(server) - - -@router.delete("/{server_id}", response_model=ServerDeleteResponse) -async def delete_server_endpoint( - server_id: int, - _: Any = Security(require_api_token), - db: AsyncSession = Depends(get_db_session), -) -> ServerDeleteResponse: - server = await get_server_squad_by_id(db, server_id) - if not server: - raise HTTPException(status.HTTP_404_NOT_FOUND, "Server not found") - - deleted = await delete_server_squad(db, server_id) - if not deleted: - raise HTTPException( - status.HTTP_400_BAD_REQUEST, - "Server cannot be deleted because it has active connections", - ) - - await cache.delete_pattern("available_countries*") - - return ServerDeleteResponse(success=True, message="Server deleted") - - -@router.get( - "/{server_id}/users", - response_model=ServerConnectedUsersResponse, -) -async def get_server_connected_users_endpoint( - server_id: int, - _: Any = Security(require_api_token), - db: AsyncSession = Depends(get_db_session), - limit: int = Query(100, ge=1, le=500), - offset: int = Query(0, ge=0), -) -> ServerConnectedUsersResponse: - server = await get_server_squad_by_id(db, server_id) - if not server: - raise HTTPException(status.HTTP_404_NOT_FOUND, "Server not found") - - users = await get_server_connected_users(db, server_id) - total = len(users) - sliced = users[offset : offset + limit] - - return ServerConnectedUsersResponse( - items=[_serialize_connected_user(user) for user in sliced], - total=total, - limit=limit, - offset=offset, - ) - - -@router.post("/sync", response_model=ServerSyncResponse) -async def sync_servers_with_remnawave( - _: Any = Security(require_api_token), - db: AsyncSession = Depends(get_db_session), -) -> ServerSyncResponse: - service = _get_remnawave_service() - _ensure_service_configured(service) - - squads = await service.get_all_squads() - total = len(squads) - - created = updated = removed = 0 - if squads: - created, updated, removed = await sync_with_remnawave(db, squads) - - await cache.delete_pattern("available_countries*") - - return ServerSyncResponse( - created=created, - updated=updated, - removed=removed, - total=total, - ) - - -@router.post("/sync-counts", response_model=ServerCountsSyncResponse) -async def sync_server_counts( - _: Any = Security(require_api_token), - db: AsyncSession = Depends(get_db_session), -) -> ServerCountsSyncResponse: - updated = await sync_server_user_counts(db) - return ServerCountsSyncResponse(updated=updated) - diff --git a/app/webapi/schemas/servers.py b/app/webapi/schemas/servers.py deleted file mode 100644 index 478ffb9b..00000000 --- a/app/webapi/schemas/servers.py +++ /dev/null @@ -1,160 +0,0 @@ -"""Pydantic-схемы для управления серверами через Web API.""" - -from __future__ import annotations - -from datetime import datetime -from typing import List, Optional - -from pydantic import BaseModel, ConfigDict, Field - -from .users import PromoGroupSummary - - -class ServerResponse(BaseModel): - """Полная информация о сервере.""" - - model_config = ConfigDict(from_attributes=True, populate_by_name=True) - - id: int - squad_uuid: str = Field(alias="squadUuid") - display_name: str = Field(alias="displayName") - original_name: Optional[str] = Field(default=None, alias="originalName") - country_code: Optional[str] = Field(default=None, alias="countryCode") - is_available: bool = Field(alias="isAvailable") - is_trial_eligible: bool = Field(default=False, alias="isTrialEligible") - price_kopeks: int = Field(alias="priceKopeks") - price_rubles: float = Field(alias="priceRubles") - description: Optional[str] = None - sort_order: int = Field(default=0, alias="sortOrder") - max_users: Optional[int] = Field(default=None, alias="maxUsers") - current_users: int = Field(default=0, alias="currentUsers") - created_at: Optional[datetime] = Field(default=None, alias="createdAt") - updated_at: Optional[datetime] = Field(default=None, alias="updatedAt") - promo_groups: List[PromoGroupSummary] = Field( - default_factory=list, alias="promoGroups" - ) - - -class ServerListResponse(BaseModel): - """Список серверов с пагинацией.""" - - items: List[ServerResponse] - total: int - page: int - limit: int - - -class ServerCreateRequest(BaseModel): - """Запрос на создание сервера.""" - - squad_uuid: str = Field(alias="squadUuid") - display_name: str = Field(alias="displayName") - original_name: Optional[str] = Field(default=None, alias="originalName") - country_code: Optional[str] = Field(default=None, alias="countryCode") - price_kopeks: int = Field(default=0, alias="priceKopeks") - description: Optional[str] = None - max_users: Optional[int] = Field(default=None, alias="maxUsers") - is_available: bool = Field(default=True, alias="isAvailable") - is_trial_eligible: bool = Field(default=False, alias="isTrialEligible") - sort_order: int = Field(default=0, alias="sortOrder") - promo_group_ids: Optional[List[int]] = Field( - default=None, - alias="promoGroupIds", - description="Список идентификаторов промогрупп, доступных на сервере.", - ) - - -class ServerUpdateRequest(BaseModel): - """Запрос на обновление свойств сервера.""" - - display_name: Optional[str] = Field(default=None, alias="displayName") - original_name: Optional[str] = Field(default=None, alias="originalName") - country_code: Optional[str] = Field(default=None, alias="countryCode") - price_kopeks: Optional[int] = Field(default=None, alias="priceKopeks") - description: Optional[str] = None - max_users: Optional[int] = Field(default=None, alias="maxUsers") - is_available: Optional[bool] = Field(default=None, alias="isAvailable") - is_trial_eligible: Optional[bool] = Field( - default=None, alias="isTrialEligible" - ) - sort_order: Optional[int] = Field(default=None, alias="sortOrder") - promo_group_ids: Optional[List[int]] = Field( - default=None, - alias="promoGroupIds", - description="Если передан список, он заменит текущие промогруппы сервера.", - ) - - -class ServerSyncResponse(BaseModel): - """Результат синхронизации серверов с RemnaWave.""" - - model_config = ConfigDict(populate_by_name=True) - - created: int - updated: int - removed: int - total: int - - -class ServerStatisticsResponse(BaseModel): - """Агрегированная статистика по серверам.""" - - model_config = ConfigDict(populate_by_name=True) - - total_servers: int = Field(alias="totalServers") - available_servers: int = Field(alias="availableServers") - unavailable_servers: int = Field(alias="unavailableServers") - servers_with_connections: int = Field(alias="serversWithConnections") - total_revenue_kopeks: int = Field(alias="totalRevenueKopeks") - total_revenue_rubles: float = Field(alias="totalRevenueRubles") - - -class ServerCountsSyncResponse(BaseModel): - """Результат обновления счетчиков пользователей серверов.""" - - model_config = ConfigDict(populate_by_name=True) - - updated: int - - -class ServerConnectedUser(BaseModel): - """Краткая информация о пользователе, подключенном к серверу.""" - - model_config = ConfigDict(populate_by_name=True) - - id: int - telegram_id: int = Field(alias="telegramId") - username: Optional[str] = None - first_name: Optional[str] = Field(default=None, alias="firstName") - last_name: Optional[str] = Field(default=None, alias="lastName") - status: str - balance_kopeks: int = Field(alias="balanceKopeks") - balance_rubles: float = Field(alias="balanceRubles") - subscription_id: Optional[int] = Field(default=None, alias="subscriptionId") - subscription_status: Optional[str] = Field( - default=None, alias="subscriptionStatus" - ) - subscription_end_date: Optional[datetime] = Field( - default=None, alias="subscriptionEndDate" - ) - - -class ServerConnectedUsersResponse(BaseModel): - """Список пользователей, подключенных к серверу.""" - - model_config = ConfigDict(populate_by_name=True) - - items: List[ServerConnectedUser] - total: int - limit: int - offset: int - - -class ServerDeleteResponse(BaseModel): - """Ответ при удалении сервера.""" - - model_config = ConfigDict(populate_by_name=True) - - success: bool - message: str - From 08e0b3a6571fa61a0fe7efe371ae20d9c68d8e2e Mon Sep 17 00:00:00 2001 From: Egor Date: Mon, 3 Nov 2025 07:19:03 +0300 Subject: [PATCH 05/12] Validate promo groups before updating server --- app/database/crud/server_squad.py | 13 +- app/services/remnawave_service.py | 18 +- app/webapi/app.py | 9 + app/webapi/routes/__init__.py | 2 + app/webapi/routes/servers.py | 418 ++++++++++++++++++++++++++++++ app/webapi/schemas/servers.py | 160 ++++++++++++ 6 files changed, 611 insertions(+), 9 deletions(-) create mode 100644 app/webapi/routes/servers.py create mode 100644 app/webapi/schemas/servers.py diff --git a/app/database/crud/server_squad.py b/app/database/crud/server_squad.py index a2021bd5..29fba112 100644 --- a/app/database/crud/server_squad.py +++ b/app/database/crud/server_squad.py @@ -47,6 +47,7 @@ async def create_server_squad( max_users: int = None, is_available: bool = True, is_trial_eligible: bool = False, + sort_order: int = 0, promo_group_ids: Optional[Iterable[int]] = None, ) -> ServerSquad: @@ -80,6 +81,7 @@ async def create_server_squad( max_users=max_users, is_available=is_available, is_trial_eligible=is_trial_eligible, + sort_order=sort_order, allowed_promo_groups=promo_groups, ) @@ -260,8 +262,15 @@ async def update_server_squad( ) -> Optional[ServerSquad]: valid_fields = { - 'display_name', 'country_code', 'price_kopeks', 'description', - 'max_users', 'is_available', 'sort_order', 'is_trial_eligible' + "display_name", + "original_name", + "country_code", + "price_kopeks", + "description", + "max_users", + "is_available", + "sort_order", + "is_trial_eligible", } filtered_updates = {k: v for k, v in updates.items() if k in valid_fields} diff --git a/app/services/remnawave_service.py b/app/services/remnawave_service.py index 0a165bcd..2db162d6 100644 --- a/app/services/remnawave_service.py +++ b/app/services/remnawave_service.py @@ -18,7 +18,6 @@ from sqlalchemy.orm import selectinload from sqlalchemy.ext.asyncio import AsyncSession from app.database.crud.user import ( create_user, - create_user_no_commit, get_users_list, get_user_by_telegram_id, update_user, @@ -303,27 +302,32 @@ class RemnaWaveService: first_name_from_desc, last_name_from_desc, username_from_desc = self._extract_user_data_from_description(description) # Используем извлеченное имя или дефолтное значение + fallback_first_name = f"Panel User {telegram_id}" + full_first_name = fallback_first_name + full_last_name = None + if first_name_from_desc and last_name_from_desc: full_first_name = first_name_from_desc full_last_name = last_name_from_desc elif first_name_from_desc: full_first_name = first_name_from_desc full_last_name = last_name_from_desc - else: - full_first_name = f"User {telegram_id}" - full_last_name = None - + username = username_from_desc or panel_user.get("username") try: - db_user = await create_user_no_commit( + create_kwargs = dict( db=db, telegram_id=telegram_id, username=username, first_name=full_first_name, - last_name=full_last_name, language="ru", ) + + if full_last_name: + create_kwargs["last_name"] = full_last_name + + db_user = await create_user(**create_kwargs) return db_user, True except IntegrityError as create_error: logger.info( diff --git a/app/webapi/app.py b/app/webapi/app.py index d5848663..07b86b86 100644 --- a/app/webapi/app.py +++ b/app/webapi/app.py @@ -20,6 +20,7 @@ from .routes import ( promo_offers, pages, remnawave, + servers, stats, subscriptions, tickets, @@ -67,6 +68,13 @@ OPENAPI_TAGS = [ "name": "promo-groups", "description": "Создание и управление промо-группами и их участниками.", }, + { + "name": "servers", + "description": ( + "Управление серверами RemnaWave, их доступностью, промогруппами и " + "ручная синхронизация данных.", + ), + }, { "name": "promo-offers", "description": "Управление промо-предложениями, шаблонами и журналом событий.", @@ -137,6 +145,7 @@ def create_web_api_app() -> FastAPI: app.include_router(transactions.router, prefix="/transactions", tags=["transactions"]) app.include_router(promo_groups.router, prefix="/promo-groups", tags=["promo-groups"]) app.include_router(promo_offers.router, prefix="/promo-offers", tags=["promo-offers"]) + app.include_router(servers.router, prefix="/servers", tags=["servers"]) app.include_router( main_menu_buttons.router, prefix="/main-menu/buttons", diff --git a/app/webapi/routes/__init__.py b/app/webapi/routes/__init__.py index 85684d75..e4320cf9 100644 --- a/app/webapi/routes/__init__.py +++ b/app/webapi/routes/__init__.py @@ -7,6 +7,7 @@ from . import ( promo_offers, pages, promo_groups, + servers, remnawave, stats, subscriptions, @@ -26,6 +27,7 @@ __all__ = [ "promo_offers", "pages", "promo_groups", + "servers", "remnawave", "stats", "subscriptions", diff --git a/app/webapi/routes/servers.py b/app/webapi/routes/servers.py new file mode 100644 index 00000000..ad4129ee --- /dev/null +++ b/app/webapi/routes/servers.py @@ -0,0 +1,418 @@ +"""Маршруты управления серверами в административном API.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any, Iterable, List, Optional + +from fastapi import APIRouter, Depends, HTTPException, Query, Security, status +from sqlalchemy import func, or_, select +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.orm import selectinload + +from app.database.crud.server_squad import ( + create_server_squad, + delete_server_squad, + get_server_connected_users, + get_server_squad_by_id, + get_server_squad_by_uuid, + get_server_statistics, + sync_server_user_counts, + sync_with_remnawave, + update_server_squad, + update_server_squad_promo_groups, +) +from app.database.models import PromoGroup, ServerSquad, User +from app.utils.cache import cache + +from ..dependencies import get_db_session, require_api_token +from ..schemas.servers import ( + ServerConnectedUser, + ServerConnectedUsersResponse, + ServerCountsSyncResponse, + ServerCreateRequest, + ServerDeleteResponse, + ServerListResponse, + ServerResponse, + ServerStatisticsResponse, + ServerSyncResponse, + ServerUpdateRequest, +) +from ..schemas.users import PromoGroupSummary + +try: # pragma: no cover - импорт может провалиться без optional-зависимостей + from app.services.remnawave_service import RemnaWaveService # type: ignore +except Exception: # pragma: no cover - скрываем функционал, если сервис недоступен + RemnaWaveService = None # type: ignore[assignment] + + +if TYPE_CHECKING: # pragma: no cover - только для подсказок типов в IDE + from app.services.remnawave_service import ( # type: ignore + RemnaWaveService as RemnaWaveServiceType, + ) +else: + RemnaWaveServiceType = Any + + +router = APIRouter() + + +def _serialize_promo_group(group: PromoGroup) -> PromoGroupSummary: + return PromoGroupSummary( + id=group.id, + name=group.name, + server_discount_percent=group.server_discount_percent, + traffic_discount_percent=group.traffic_discount_percent, + device_discount_percent=group.device_discount_percent, + apply_discounts_to_addons=getattr(group, "apply_discounts_to_addons", True), + ) + + +def _serialize_server(server: ServerSquad) -> ServerResponse: + promo_groups = [ + _serialize_promo_group(group) + for group in sorted( + getattr(server, "allowed_promo_groups", []) or [], + key=lambda pg: pg.name.lower() if getattr(pg, "name", None) else "", + ) + ] + + return ServerResponse( + id=server.id, + squad_uuid=server.squad_uuid, + display_name=server.display_name, + original_name=server.original_name, + country_code=server.country_code, + is_available=bool(server.is_available), + is_trial_eligible=bool(server.is_trial_eligible), + price_kopeks=int(server.price_kopeks or 0), + price_rubles=round((server.price_kopeks or 0) / 100, 2), + description=server.description, + sort_order=int(server.sort_order or 0), + max_users=server.max_users, + current_users=int(server.current_users or 0), + created_at=getattr(server, "created_at", None), + updated_at=getattr(server, "updated_at", None), + promo_groups=promo_groups, + ) + + +def _serialize_connected_user(user: User) -> ServerConnectedUser: + subscription = getattr(user, "subscription", None) + subscription_status = getattr(subscription, "status", None) + if hasattr(subscription_status, "value"): + subscription_status = subscription_status.value + + return ServerConnectedUser( + id=user.id, + telegram_id=user.telegram_id, + username=user.username, + first_name=user.first_name, + last_name=user.last_name, + status=getattr(getattr(user, "status", None), "value", user.status), + balance_kopeks=int(user.balance_kopeks or 0), + balance_rubles=round((user.balance_kopeks or 0) / 100, 2), + subscription_id=getattr(subscription, "id", None), + subscription_status=subscription_status, + subscription_end_date=getattr(subscription, "end_date", None), + ) + + +def _apply_filters( + filters: Iterable[Any], + query, +): + for condition in filters: + query = query.where(condition) + return query + + +def _get_remnawave_service() -> "RemnaWaveServiceType": + if RemnaWaveService is None: # pragma: no cover - зависимость не доступна + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail="RemnaWave сервис недоступен", + ) + + return RemnaWaveService() + + +def _ensure_service_configured(service: "RemnaWaveServiceType") -> None: + if RemnaWaveService is None: # pragma: no cover - зависимость не доступна + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail="RemnaWave сервис недоступен", + ) + + if not service.is_configured: + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail=service.configuration_error or "RemnaWave API не настроен", + ) + + +async def _validate_promo_group_ids( + db: AsyncSession, promo_group_ids: Iterable[int] +) -> List[int]: + unique_ids = [int(pg_id) for pg_id in set(promo_group_ids)] + + if not unique_ids: + raise HTTPException( + status.HTTP_400_BAD_REQUEST, + "Нужно выбрать хотя бы одну промогруппу", + ) + + result = await db.execute( + select(PromoGroup.id).where(PromoGroup.id.in_(unique_ids)) + ) + found_ids = result.scalars().all() + + if not found_ids: + raise HTTPException( + status.HTTP_400_BAD_REQUEST, + "Не найдены промогруппы для обновления сервера", + ) + + return unique_ids + + +@router.get("", response_model=ServerListResponse) +async def list_servers( + _: Any = Security(require_api_token), + db: AsyncSession = Depends(get_db_session), + page: int = Query(1, ge=1), + limit: int = Query(50, ge=1, le=200), + available_only: bool = Query(False, alias="available"), + search: Optional[str] = Query(default=None), +) -> ServerListResponse: + filters = [] + + if available_only: + filters.append(ServerSquad.is_available.is_(True)) + + if search: + pattern = f"%{search.lower()}%" + filters.append( + or_( + func.lower(ServerSquad.display_name).like(pattern), + func.lower(ServerSquad.original_name).like(pattern), + func.lower(ServerSquad.squad_uuid).like(pattern), + func.lower(ServerSquad.country_code).like(pattern), + ) + ) + + base_query = ( + select(ServerSquad) + .options(selectinload(ServerSquad.allowed_promo_groups)) + .order_by(ServerSquad.sort_order, ServerSquad.display_name) + ) + + count_query = select(func.count(ServerSquad.id)) + + if filters: + base_query = _apply_filters(filters, base_query) + count_query = _apply_filters(filters, count_query) + + total = await db.scalar(count_query) or 0 + + result = await db.execute( + base_query.offset((page - 1) * limit).limit(limit) + ) + servers = result.scalars().unique().all() + + return ServerListResponse( + items=[_serialize_server(server) for server in servers], + total=int(total), + page=page, + limit=limit, + ) + + +@router.get("/stats", response_model=ServerStatisticsResponse) +async def get_servers_statistics( + _: Any = Security(require_api_token), + db: AsyncSession = Depends(get_db_session), +) -> ServerStatisticsResponse: + stats = await get_server_statistics(db) + + return ServerStatisticsResponse( + total_servers=int(stats.get("total_servers", 0) or 0), + available_servers=int(stats.get("available_servers", 0) or 0), + unavailable_servers=int(stats.get("unavailable_servers", 0) or 0), + servers_with_connections=int(stats.get("servers_with_connections", 0) or 0), + total_revenue_kopeks=int(stats.get("total_revenue_kopeks", 0) or 0), + total_revenue_rubles=float(stats.get("total_revenue_rubles", 0) or 0), + ) + + +@router.post("", response_model=ServerResponse, status_code=status.HTTP_201_CREATED) +async def create_server_endpoint( + payload: ServerCreateRequest, + _: Any = Security(require_api_token), + db: AsyncSession = Depends(get_db_session), +) -> ServerResponse: + existing = await get_server_squad_by_uuid(db, payload.squad_uuid) + if existing: + raise HTTPException( + status.HTTP_400_BAD_REQUEST, + "Server with this UUID already exists", + ) + + try: + server = await create_server_squad( + db, + squad_uuid=payload.squad_uuid, + display_name=payload.display_name, + original_name=payload.original_name, + country_code=payload.country_code, + price_kopeks=payload.price_kopeks, + description=payload.description, + max_users=payload.max_users, + is_available=payload.is_available, + is_trial_eligible=payload.is_trial_eligible, + sort_order=payload.sort_order, + promo_group_ids=payload.promo_group_ids, + ) + except ValueError as error: + raise HTTPException(status.HTTP_400_BAD_REQUEST, str(error)) from error + + await cache.delete_pattern("available_countries*") + + server = await get_server_squad_by_id(db, server.id) + assert server is not None + return _serialize_server(server) + + +@router.get("/{server_id}", response_model=ServerResponse) +async def get_server_endpoint( + server_id: int, + _: Any = Security(require_api_token), + db: AsyncSession = Depends(get_db_session), +) -> ServerResponse: + server = await get_server_squad_by_id(db, server_id) + if not server: + raise HTTPException(status.HTTP_404_NOT_FOUND, "Server not found") + + return _serialize_server(server) + + +@router.patch("/{server_id}", response_model=ServerResponse) +async def update_server_endpoint( + server_id: int, + payload: ServerUpdateRequest, + _: Any = Security(require_api_token), + db: AsyncSession = Depends(get_db_session), +) -> ServerResponse: + server = await get_server_squad_by_id(db, server_id) + if not server: + raise HTTPException(status.HTTP_404_NOT_FOUND, "Server not found") + + updates = payload.model_dump(exclude_unset=True, by_alias=False) + promo_group_ids = updates.pop("promo_group_ids", None) + + validated_promo_group_ids: Optional[List[int]] = None + if promo_group_ids is not None: + validated_promo_group_ids = await _validate_promo_group_ids( + db, promo_group_ids + ) + + if updates: + server = await update_server_squad(db, server_id, **updates) or server + + if promo_group_ids is not None: + try: + assert validated_promo_group_ids is not None + server = await update_server_squad_promo_groups( + db, server_id, validated_promo_group_ids + ) or server + except ValueError as error: + raise HTTPException(status.HTTP_400_BAD_REQUEST, str(error)) from error + + await cache.delete_pattern("available_countries*") + + server = await get_server_squad_by_id(db, server_id) + assert server is not None + return _serialize_server(server) + + +@router.delete("/{server_id}", response_model=ServerDeleteResponse) +async def delete_server_endpoint( + server_id: int, + _: Any = Security(require_api_token), + db: AsyncSession = Depends(get_db_session), +) -> ServerDeleteResponse: + server = await get_server_squad_by_id(db, server_id) + if not server: + raise HTTPException(status.HTTP_404_NOT_FOUND, "Server not found") + + deleted = await delete_server_squad(db, server_id) + if not deleted: + raise HTTPException( + status.HTTP_400_BAD_REQUEST, + "Server cannot be deleted because it has active connections", + ) + + await cache.delete_pattern("available_countries*") + + return ServerDeleteResponse(success=True, message="Server deleted") + + +@router.get( + "/{server_id}/users", + response_model=ServerConnectedUsersResponse, +) +async def get_server_connected_users_endpoint( + server_id: int, + _: Any = Security(require_api_token), + db: AsyncSession = Depends(get_db_session), + limit: int = Query(100, ge=1, le=500), + offset: int = Query(0, ge=0), +) -> ServerConnectedUsersResponse: + server = await get_server_squad_by_id(db, server_id) + if not server: + raise HTTPException(status.HTTP_404_NOT_FOUND, "Server not found") + + users = await get_server_connected_users(db, server_id) + total = len(users) + sliced = users[offset : offset + limit] + + return ServerConnectedUsersResponse( + items=[_serialize_connected_user(user) for user in sliced], + total=total, + limit=limit, + offset=offset, + ) + + +@router.post("/sync", response_model=ServerSyncResponse) +async def sync_servers_with_remnawave( + _: Any = Security(require_api_token), + db: AsyncSession = Depends(get_db_session), +) -> ServerSyncResponse: + service = _get_remnawave_service() + _ensure_service_configured(service) + + squads = await service.get_all_squads() + total = len(squads) + + created = updated = removed = 0 + if squads: + created, updated, removed = await sync_with_remnawave(db, squads) + + await cache.delete_pattern("available_countries*") + + return ServerSyncResponse( + created=created, + updated=updated, + removed=removed, + total=total, + ) + + +@router.post("/sync-counts", response_model=ServerCountsSyncResponse) +async def sync_server_counts( + _: Any = Security(require_api_token), + db: AsyncSession = Depends(get_db_session), +) -> ServerCountsSyncResponse: + updated = await sync_server_user_counts(db) + return ServerCountsSyncResponse(updated=updated) + diff --git a/app/webapi/schemas/servers.py b/app/webapi/schemas/servers.py new file mode 100644 index 00000000..478ffb9b --- /dev/null +++ b/app/webapi/schemas/servers.py @@ -0,0 +1,160 @@ +"""Pydantic-схемы для управления серверами через Web API.""" + +from __future__ import annotations + +from datetime import datetime +from typing import List, Optional + +from pydantic import BaseModel, ConfigDict, Field + +from .users import PromoGroupSummary + + +class ServerResponse(BaseModel): + """Полная информация о сервере.""" + + model_config = ConfigDict(from_attributes=True, populate_by_name=True) + + id: int + squad_uuid: str = Field(alias="squadUuid") + display_name: str = Field(alias="displayName") + original_name: Optional[str] = Field(default=None, alias="originalName") + country_code: Optional[str] = Field(default=None, alias="countryCode") + is_available: bool = Field(alias="isAvailable") + is_trial_eligible: bool = Field(default=False, alias="isTrialEligible") + price_kopeks: int = Field(alias="priceKopeks") + price_rubles: float = Field(alias="priceRubles") + description: Optional[str] = None + sort_order: int = Field(default=0, alias="sortOrder") + max_users: Optional[int] = Field(default=None, alias="maxUsers") + current_users: int = Field(default=0, alias="currentUsers") + created_at: Optional[datetime] = Field(default=None, alias="createdAt") + updated_at: Optional[datetime] = Field(default=None, alias="updatedAt") + promo_groups: List[PromoGroupSummary] = Field( + default_factory=list, alias="promoGroups" + ) + + +class ServerListResponse(BaseModel): + """Список серверов с пагинацией.""" + + items: List[ServerResponse] + total: int + page: int + limit: int + + +class ServerCreateRequest(BaseModel): + """Запрос на создание сервера.""" + + squad_uuid: str = Field(alias="squadUuid") + display_name: str = Field(alias="displayName") + original_name: Optional[str] = Field(default=None, alias="originalName") + country_code: Optional[str] = Field(default=None, alias="countryCode") + price_kopeks: int = Field(default=0, alias="priceKopeks") + description: Optional[str] = None + max_users: Optional[int] = Field(default=None, alias="maxUsers") + is_available: bool = Field(default=True, alias="isAvailable") + is_trial_eligible: bool = Field(default=False, alias="isTrialEligible") + sort_order: int = Field(default=0, alias="sortOrder") + promo_group_ids: Optional[List[int]] = Field( + default=None, + alias="promoGroupIds", + description="Список идентификаторов промогрупп, доступных на сервере.", + ) + + +class ServerUpdateRequest(BaseModel): + """Запрос на обновление свойств сервера.""" + + display_name: Optional[str] = Field(default=None, alias="displayName") + original_name: Optional[str] = Field(default=None, alias="originalName") + country_code: Optional[str] = Field(default=None, alias="countryCode") + price_kopeks: Optional[int] = Field(default=None, alias="priceKopeks") + description: Optional[str] = None + max_users: Optional[int] = Field(default=None, alias="maxUsers") + is_available: Optional[bool] = Field(default=None, alias="isAvailable") + is_trial_eligible: Optional[bool] = Field( + default=None, alias="isTrialEligible" + ) + sort_order: Optional[int] = Field(default=None, alias="sortOrder") + promo_group_ids: Optional[List[int]] = Field( + default=None, + alias="promoGroupIds", + description="Если передан список, он заменит текущие промогруппы сервера.", + ) + + +class ServerSyncResponse(BaseModel): + """Результат синхронизации серверов с RemnaWave.""" + + model_config = ConfigDict(populate_by_name=True) + + created: int + updated: int + removed: int + total: int + + +class ServerStatisticsResponse(BaseModel): + """Агрегированная статистика по серверам.""" + + model_config = ConfigDict(populate_by_name=True) + + total_servers: int = Field(alias="totalServers") + available_servers: int = Field(alias="availableServers") + unavailable_servers: int = Field(alias="unavailableServers") + servers_with_connections: int = Field(alias="serversWithConnections") + total_revenue_kopeks: int = Field(alias="totalRevenueKopeks") + total_revenue_rubles: float = Field(alias="totalRevenueRubles") + + +class ServerCountsSyncResponse(BaseModel): + """Результат обновления счетчиков пользователей серверов.""" + + model_config = ConfigDict(populate_by_name=True) + + updated: int + + +class ServerConnectedUser(BaseModel): + """Краткая информация о пользователе, подключенном к серверу.""" + + model_config = ConfigDict(populate_by_name=True) + + id: int + telegram_id: int = Field(alias="telegramId") + username: Optional[str] = None + first_name: Optional[str] = Field(default=None, alias="firstName") + last_name: Optional[str] = Field(default=None, alias="lastName") + status: str + balance_kopeks: int = Field(alias="balanceKopeks") + balance_rubles: float = Field(alias="balanceRubles") + subscription_id: Optional[int] = Field(default=None, alias="subscriptionId") + subscription_status: Optional[str] = Field( + default=None, alias="subscriptionStatus" + ) + subscription_end_date: Optional[datetime] = Field( + default=None, alias="subscriptionEndDate" + ) + + +class ServerConnectedUsersResponse(BaseModel): + """Список пользователей, подключенных к серверу.""" + + model_config = ConfigDict(populate_by_name=True) + + items: List[ServerConnectedUser] + total: int + limit: int + offset: int + + +class ServerDeleteResponse(BaseModel): + """Ответ при удалении сервера.""" + + model_config = ConfigDict(populate_by_name=True) + + success: bool + message: str + From 22dcff47d150b17407fa868658d44d4bae6a1a61 Mon Sep 17 00:00:00 2001 From: "Ivan.Nginx" Date: Mon, 3 Nov 2025 23:42:21 +0300 Subject: [PATCH 06/12] fix: update Telegram Stars rate & disable by default in `.env` --- .env.example | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.env.example b/.env.example index b8a0403a..3ebadd30 100644 --- a/.env.example +++ b/.env.example @@ -179,8 +179,8 @@ MIN_BALANCE_FOR_AUTOPAY_KOPEKS=10000 # ===== ПЛАТЕЖНЫЕ СИСТЕМЫ ===== # Telegram Stars (работает автоматически) -TELEGRAM_STARS_ENABLED=true -TELEGRAM_STARS_RATE_RUB=1.3 +TELEGRAM_STARS_ENABLED=false +TELEGRAM_STARS_RATE_RUB=1.79 # Tribute (https://tribute.app) TRIBUTE_ENABLED=false From 06323f7168ea3e5b6fdc6f50725248a37bb709ca Mon Sep 17 00:00:00 2001 From: Egor Date: Tue, 4 Nov 2025 07:39:12 +0300 Subject: [PATCH 07/12] Enhance search functionality for telegram_id Added error handling for converting search input to int for telegram_id. --- app/database/crud/user.py | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/app/database/crud/user.py b/app/database/crud/user.py index 8ce68101..e0f724a6 100644 --- a/app/database/crud/user.py +++ b/app/database/crud/user.py @@ -583,7 +583,14 @@ async def get_users_list( ] if search.isdigit(): - conditions.append(User.telegram_id == int(search)) + try: + search_int = int(search) + # Добавляем условие поиска по telegram_id, который является BigInteger + # и может содержать большие значения, в отличие от User.id (INTEGER) + conditions.append(User.telegram_id == search_int) + except ValueError: + # Если не удалось преобразовать в int, просто ищем по текстовым полям + pass query = query.where(or_(*conditions)) @@ -680,7 +687,14 @@ async def get_users_count( ] if search.isdigit(): - conditions.append(User.telegram_id == int(search)) + try: + search_int = int(search) + # Добавляем условие поиска по telegram_id, который является BigInteger + # и может содержать большие значения, в отличие от User.id (INTEGER) + conditions.append(User.telegram_id == search_int) + except ValueError: + # Если не удалось преобразовать в int, просто ищем по текстовым полям + pass query = query.where(or_(*conditions)) From 477b0b26c4ea105273cb84f3277a513b94acb20e Mon Sep 17 00:00:00 2001 From: Egor Date: Tue, 4 Nov 2025 07:53:53 +0300 Subject: [PATCH 08/12] Update docker-compose.yml --- docker-compose.yml | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 20bb8f14..5a4444c4 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -11,7 +11,7 @@ services: volumes: - postgres_data:/var/lib/postgresql/data networks: - - remnawave-network + - bot_network healthcheck: test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER:-remnawave_user} -d ${POSTGRES_DB:-remnawave_bot}"] interval: 30s @@ -27,7 +27,7 @@ services: volumes: - redis_data:/data networks: - - remnawave-network + - bot_network healthcheck: test: ["CMD", "redis-cli", "ping"] interval: 30s @@ -80,7 +80,7 @@ services: - "${WATA_WEBHOOK_PORT:-8085}:8085" - "${HELEKET_WEBHOOK_PORT:-8086}:8086" networks: - - remnawave-network + - bot_network healthcheck: test: ["CMD-SHELL", "python -c 'import requests; requests.get(\"http://localhost:8081/health\", timeout=5)' || exit 1"] interval: 60s @@ -95,7 +95,9 @@ volumes: driver: local networks: - remnawave-network: - name: remnawave-network - driver: bridge - external: true \ No newline at end of file + bot_network: + driver: bridge + ipam: + config: + - subnet: 172.20.0.0/16 + gateway: 172.20.0.1 From e250812463680963bd99676cf4aac5e10ae06082 Mon Sep 17 00:00:00 2001 From: Egor Date: Tue, 4 Nov 2025 08:02:03 +0300 Subject: [PATCH 09/12] Update simple_subscription.py --- app/handlers/simple_subscription.py | 419 +++++++++++++++++++++++++--- 1 file changed, 385 insertions(+), 34 deletions(-) diff --git a/app/handlers/simple_subscription.py b/app/handlers/simple_subscription.py index a672c85b..d4508dc9 100644 --- a/app/handlers/simple_subscription.py +++ b/app/handlers/simple_subscription.py @@ -39,7 +39,7 @@ async def start_simple_subscription_purchase( await callback.answer("❌ Простая покупка подписки временно недоступна", show_alert=True) return - # Проверяем, есть ли у пользователя подписка (информируем, но не блокируем покупку) + # Проверяем, есть ли у пользователя подписка from app.database.crud.subscription import get_subscription_by_user_id current_subscription = await get_subscription_by_user_id(db, db_user.id) @@ -99,17 +99,24 @@ async def start_simple_subscription_purchase( can_pay_from_balance, ) + # Проверяем, является ли у пользователя текущая подписка активной платной подпиской + has_active_paid_subscription = False trial_notice = "" - if current_subscription and getattr(current_subscription, "is_trial", False): - try: - days_left = max(0, (current_subscription.end_date - datetime.utcnow()).days) - except Exception: - days_left = 0 - key = "SIMPLE_SUBSCRIPTION_TRIAL_NOTICE_ACTIVE" if current_subscription.is_active else "SIMPLE_SUBSCRIPTION_TRIAL_NOTICE_TRIAL" - trial_notice = texts.t( - key, - "ℹ️ У вас уже есть триальная подписка. Она истекает через {days} дн.", - ).format(days=days_left) + if current_subscription: + if not getattr(current_subscription, "is_trial", False) and current_subscription.is_active: + # Это платная активная подписка - требуем подтверждение + has_active_paid_subscription = True + elif getattr(current_subscription, "is_trial", False): + # Это тестовая подписка + try: + days_left = max(0, (current_subscription.end_date - datetime.utcnow()).days) + except Exception: + days_left = 0 + key = "SIMPLE_SUBSCRIPTION_TRIAL_NOTICE_ACTIVE" if current_subscription.is_active else "SIMPLE_SUBSCRIPTION_TRIAL_NOTICE_TRIAL" + trial_notice = texts.t( + key, + "ℹ️ У вас уже есть триальная подписка. Она истекает через {days} дн.", + ).format(days=days_left) server_label = _get_simple_subscription_server_label( texts, @@ -134,40 +141,73 @@ async def start_simple_subscription_purchase( f"💰 Стоимость: {settings.format_price(price_kopeks)}", f"💳 Ваш баланс: {settings.format_price(user_balance_kopeks)}", "", - ( - "Вы можете оплатить подписку с баланса или выбрать другой способ оплаты." - if can_pay_from_balance - else "Баланс пока недостаточный для мгновенной оплаты. Выберите подходящий способ оплаты:" - ), ]) - message_text = "\n".join(message_lines) + # Если у пользователя уже есть активная платная подписка, требуем подтверждение + if has_active_paid_subscription: + # У пользователя уже есть активная платная подписка + message_lines.append( + "⚠️ У вас уже есть активная платная подписка. " + "Покупка простой подписки изменит параметры вашей текущей подписки. " + "Требуется подтверждение." + ) + message_text = "\n".join(message_lines) - if trial_notice: - message_text = f"{trial_notice}\n\n{message_text}" - - methods_keyboard = _get_simple_subscription_payment_keyboard(db_user.language) - keyboard_rows = [] - - if can_pay_from_balance: - keyboard_rows.append([ - types.InlineKeyboardButton( - text="✅ Оплатить с баланса", - callback_data="simple_subscription_pay_with_balance", + # Клавиатура с подтверждением + keyboard_rows = [ + [types.InlineKeyboardButton( + text="✅ Подтвердить покупку", + callback_data="simple_subscription_confirm_purchase" + )], + [types.InlineKeyboardButton( + text=texts.BACK, + callback_data="subscription_purchase" + )] + ] + keyboard = types.InlineKeyboardMarkup(inline_keyboard=keyboard_rows) + else: + # У пользователя нет активной платной подписки (или есть только пробная) + # Показываем стандартный выбор метода оплаты + if can_pay_from_balance: + message_lines.append( + "Вы можете оплатить подписку с баланса или выбрать другой способ оплаты." ) - ]) + else: + message_lines.append( + "Баланс пока недостаточный для мгновенной оплаты. Выберите подходящий способ оплаты:" + ) + + message_text = "\n".join(message_lines) + + if trial_notice: + message_text = f"{trial_notice}\n\n{message_text}" - keyboard_rows.extend(methods_keyboard.inline_keyboard) + methods_keyboard = _get_simple_subscription_payment_keyboard(db_user.language) + keyboard_rows = [] + + if can_pay_from_balance: + keyboard_rows.append([ + types.InlineKeyboardButton( + text="✅ Оплатить с баланса", + callback_data="simple_subscription_pay_with_balance", + ) + ]) + + keyboard_rows.extend(methods_keyboard.inline_keyboard) + + keyboard = types.InlineKeyboardMarkup(inline_keyboard=keyboard_rows) - keyboard = types.InlineKeyboardMarkup(inline_keyboard=keyboard_rows) - await callback.message.edit_text( message_text, reply_markup=keyboard, parse_mode="HTML" ) - await state.set_state(SubscriptionStates.waiting_for_simple_subscription_payment_method) + # Устанавливаем соответствующее состояние + if has_active_paid_subscription: + await state.set_state(SubscriptionStates.waiting_for_simple_subscription_confirmation) + else: + await state.set_state(SubscriptionStates.waiting_for_simple_subscription_payment_method) await callback.answer() @@ -335,6 +375,15 @@ async def handle_simple_subscription_pay_with_balance( await callback.answer("❌ Данные подписки устарели. Пожалуйста, начните сначала.", show_alert=True) return + # Проверяем, имеет ли пользователь активную платную подписку + from app.database.crud.subscription import get_subscription_by_user_id + current_subscription = await get_subscription_by_user_id(db, db_user.id) + + if current_subscription and not getattr(current_subscription, "is_trial", False) and current_subscription.is_active: + # У пользователя есть активная платная подписка - требуем подтверждение + await callback.answer("⚠️ У вас уже есть активная платная подписка. Пожалуйста, подтвердите покупку.", show_alert=True) + return + resolved_squad_uuid = await _ensure_simple_subscription_squad_uuid( db, state, @@ -392,7 +441,10 @@ async def handle_simple_subscription_pay_with_balance( existing_subscription = await get_subscription_by_user_id(db, db_user.id) if existing_subscription: - # Если подписка уже существует, продлеваем её + # Если подписка уже существует (платная или тестовая), продлеваем её + # Сохраняем информацию о текущей подписке, особенно является ли она пробной + was_trial = getattr(existing_subscription, "is_trial", False) + subscription = await extend_subscription( db=db, subscription=existing_subscription, @@ -401,6 +453,16 @@ async def handle_simple_subscription_pay_with_balance( # Обновляем параметры подписки subscription.traffic_limit_gb = subscription_params["traffic_limit_gb"] subscription.device_limit = subscription_params["device_limit"] + + # Если текущая подписка была пробной, и мы обновляем её + # нужно изменить статус подписки + if was_trial: + from app.database.models import SubscriptionStatus + # Переводим подписку из пробной в активную платную + subscription.status = SubscriptionStatus.ACTIVE.value + subscription.is_trial = False + + # Устанавливаем новый выбранный сквад if resolved_squad_uuid: subscription.connected_squads = [resolved_squad_uuid] @@ -714,6 +776,15 @@ async def handle_simple_subscription_payment_method( await callback.answer("❌ Данные подписки устарели. Пожалуйста, начните сначала.", show_alert=True) return + # Проверяем, имеет ли пользователь активную платную подписку + from app.database.crud.subscription import get_subscription_by_user_id + current_subscription = await get_subscription_by_user_id(db, db_user.id) + + if current_subscription and not getattr(current_subscription, "is_trial", False) and current_subscription.is_active: + # У пользователя есть активная платная подписка - показываем сообщение + await callback.answer("⚠️ У вас уже есть активная платная подписка. Пожалуйста, подтвердите покупку через главное меню.", show_alert=True) + return + payment_method = callback.data.replace("simple_subscription_", "") try: @@ -1945,6 +2016,281 @@ async def check_simple_wata_payment_status( parse_mode="HTML", ) + +@error_handler +async def confirm_simple_subscription_purchase( + callback: types.CallbackQuery, + db_user: User, + state: FSMContext, + db: AsyncSession, +): + """Обрабатывает подтверждение простой покупки подписки при наличии активной платной подписки.""" + texts = get_texts(db_user.language) + + data = await state.get_data() + subscription_params = data.get("subscription_params", {}) + + if not subscription_params: + await callback.answer("❌ Данные подписки устарели. Пожалуйста, начните сначала.", show_alert=True) + return + + resolved_squad_uuid = await _ensure_simple_subscription_squad_uuid( + db, + state, + subscription_params, + user_id=db_user.id, + state_data=data, + ) + + # Рассчитываем цену подписки + price_kopeks, price_breakdown = await _calculate_simple_subscription_price( + db, + subscription_params, + user=db_user, + resolved_squad_uuid=resolved_squad_uuid, + ) + total_required = price_kopeks + logger.warning( + "SIMPLE_SUBSCRIPTION_DEBUG_CONFIRM | user=%s | period=%s | base=%s | traffic=%s | devices=%s | servers=%s | discount=%s | total_required=%s | balance=%s", + db_user.id, + subscription_params["period_days"], + price_breakdown.get("base_price", 0), + price_breakdown.get("traffic_price", 0), + price_breakdown.get("devices_price", 0), + price_breakdown.get("servers_price", 0), + price_breakdown.get("total_discount", 0), + total_required, + getattr(db_user, "balance_kopeks", 0), + ) + + # Проверяем баланс пользователя + user_balance_kopeks = getattr(db_user, "balance_kopeks", 0) + + if user_balance_kopeks < total_required: + await callback.answer("❌ Недостаточно средств на балансе для оплаты подписки", show_alert=True) + return + + try: + # Списываем средства с баланса пользователя + from app.database.crud.user import subtract_user_balance + success = await subtract_user_balance( + db, + db_user, + price_kopeks, + f"Оплата подписки на {subscription_params['period_days']} дней", + consume_promo_offer=False, + ) + + if not success: + await callback.answer("❌ Ошибка списания средств с баланса", show_alert=True) + return + + # Проверяем, есть ли у пользователя уже подписка + from app.database.crud.subscription import get_subscription_by_user_id, extend_subscription + + existing_subscription = await get_subscription_by_user_id(db, db_user.id) + + if existing_subscription: + # Если подписка уже существует, продлеваем её + # Сохраняем информацию о текущей подписке, особенно является ли она пробной + was_trial = getattr(existing_subscription, "is_trial", False) + + subscription = await extend_subscription( + db=db, + subscription=existing_subscription, + days=subscription_params["period_days"] + ) + # Обновляем параметры подписки + subscription.traffic_limit_gb = subscription_params["traffic_limit_gb"] + subscription.device_limit = subscription_params["device_limit"] + + # Если текущая подписка была пробной, и мы обновляем её + # нужно изменить статус подписки + if was_trial: + from app.database.models import SubscriptionStatus + # Переводим подписку из пробной в активную платную + subscription.status = SubscriptionStatus.ACTIVE.value + subscription.is_trial = False + + # Устанавливаем новый выбранный сквад + if resolved_squad_uuid: + subscription.connected_squads = [resolved_squad_uuid] + + await db.commit() + await db.refresh(subscription) + else: + # Если подписки нет, создаём новую + from app.database.crud.subscription import create_paid_subscription + subscription = await create_paid_subscription( + db=db, + user_id=db_user.id, + duration_days=subscription_params["period_days"], + traffic_limit_gb=subscription_params["traffic_limit_gb"], + device_limit=subscription_params["device_limit"], + connected_squads=[resolved_squad_uuid] if resolved_squad_uuid else [], + update_server_counters=True, + ) + + if not subscription: + # Возвращаем средства на баланс в случае ошибки + from app.services.payment_service import add_user_balance + await add_user_balance( + db, + db_user.id, + price_kopeks, + f"Возврат средств за неудавшуюся подписку на {subscription_params['period_days']} дней", + ) + await callback.answer("❌ Ошибка создания подписки. Средства возвращены на баланс.", show_alert=True) + return + + # Обновляем баланс пользователя + await db.refresh(db_user) + + # Обновляем или создаём ссылку подписки в RemnaWave + try: + from app.services.subscription_service import SubscriptionService + subscription_service = SubscriptionService() + remnawave_user = await subscription_service.create_remnawave_user(db, subscription) + if remnawave_user: + await db.refresh(subscription) + except Exception as sync_error: + logger.error(f"Ошибка синхронизации подписки с RemnaWave для пользователя {db_user.id}: {sync_error}", exc_info=True) + + # Отправляем уведомление об успешной покупке + server_label = _get_simple_subscription_server_label( + texts, + subscription_params, + resolved_squad_uuid, + ) + show_devices = settings.is_devices_selection_enabled() + + success_lines = [ + "✅ Подписка успешно активирована!", + "", + f"📅 Период: {subscription_params['period_days']} дней", + ] + + if show_devices: + success_lines.append(f"📱 Устройства: {subscription_params['device_limit']}") + + success_lines.extend([ + f"📊 Трафик: {'Безлимит' if subscription_params['traffic_limit_gb'] == 0 else f'{subscription_params['traffic_limit_gb']} ГБ'}", + f"🌍 Сервер: {server_label}", + "", + f"💰 Списано с баланса: {settings.format_price(price_kopeks)}", + f"💳 Ваш баланс: {settings.format_price(db_user.balance_kopeks)}", + "", + "🔗 Для подключения перейдите в раздел 'Подключиться'", + ]) + + success_message = "\n".join(success_lines) + + connect_mode = settings.CONNECT_BUTTON_MODE + subscription_link = get_display_subscription_link(subscription) + connect_button_text = texts.t("CONNECT_BUTTON", "🔗 Подключиться") + + def _fallback_connect_button() -> types.InlineKeyboardButton: + return types.InlineKeyboardButton( + text=connect_button_text, + callback_data="subscription_connect", + ) + + if connect_mode == "miniapp_subscription": + if subscription_link: + connect_row = [ + types.InlineKeyboardButton( + text=connect_button_text, + web_app=types.WebAppInfo(url=subscription_link), + ) + ] + else: + connect_row = [_fallback_connect_button()] + elif connect_mode == "miniapp_custom": + custom_url = settings.MINIAPP_CUSTOM_URL + if custom_url: + connect_row = [ + types.InlineKeyboardButton( + text=connect_button_text, + web_app=types.WebAppInfo(url=custom_url), + ) + ] + else: + connect_row = [_fallback_connect_button()] + elif connect_mode == "link": + if subscription_link: + connect_row = [ + types.InlineKeyboardButton( + text=connect_button_text, + url=subscription_link, + ) + ] + else: + connect_row = [_fallback_connect_button()] + elif connect_mode == "happ_cryptolink": + if subscription_link: + connect_row = [ + types.InlineKeyboardButton( + text=connect_button_text, + callback_data="open_subscription_link", + ) + ] + else: + connect_row = [_fallback_connect_button()] + else: + connect_row = [_fallback_connect_button()] + + keyboard_rows = [connect_row] + + happ_row = get_happ_download_button_row(texts) + if happ_row: + keyboard_rows.append(happ_row) + + keyboard_rows.append( + [types.InlineKeyboardButton(text="🏠 Главное меню", callback_data="back_to_menu")] + ) + + keyboard = types.InlineKeyboardMarkup(inline_keyboard=keyboard_rows) + + await callback.message.edit_text( + success_message, + reply_markup=keyboard, + parse_mode="HTML" + ) + + # Отправляем уведомление админам + try: + from app.services.admin_notification_service import AdminNotificationService + notification_service = AdminNotificationService(callback.bot) + await notification_service.send_subscription_purchase_notification( + db, + db_user, + subscription, + None, # transaction + subscription_params["period_days"], + False, # was_trial_conversion + amount_kopeks=price_kopeks, + ) + except Exception as e: + logger.error(f"Ошибка отправки уведомления админам о покупке: {e}") + + await state.clear() + await callback.answer() + + logger.info(f"Пользователь {db_user.telegram_id} успешно купил подписку с баланса на {price_kopeks/100}₽") + + except Exception as error: + logger.error( + "Ошибка подтверждения простой подписки с баланса для пользователя %s: %s", + db_user.id, + error, + exc_info=True, + ) + await callback.answer( + "❌ Ошибка оплаты подписки. Попробуйте позже или обратитесь в поддержку.", + show_alert=True, + ) + await state.clear() + def register_simple_subscription_handlers(dp): """Регистрирует обработчики простой покупки подписки.""" @@ -1953,6 +2299,11 @@ def register_simple_subscription_handlers(dp): F.data == "simple_subscription_purchase" ) + dp.callback_query.register( + confirm_simple_subscription_purchase, + F.data == "simple_subscription_confirm_purchase" + ) + dp.callback_query.register( handle_simple_subscription_pay_with_balance, F.data == "simple_subscription_pay_with_balance" From 71c219e6ea1e5f5b25d46c75893b14448a7aaa19 Mon Sep 17 00:00:00 2001 From: Egor Date: Tue, 4 Nov 2025 09:29:46 +0300 Subject: [PATCH 10/12] Update user.py --- app/database/crud/user.py | 145 +++++++++++++++++++++++++++++++++++--- 1 file changed, 136 insertions(+), 9 deletions(-) diff --git a/app/database/crud/user.py b/app/database/crud/user.py index e0f724a6..ac24f461 100644 --- a/app/database/crud/user.py +++ b/app/database/crud/user.py @@ -5,7 +5,7 @@ from datetime import datetime, timedelta from typing import Optional, List, Dict from sqlalchemy import select, and_, or_, func, case, nullslast, text from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy.orm import selectinload +from sqlalchemy.orm import selectinload, joinedload from sqlalchemy.exc import IntegrityError from app.database.models import ( @@ -46,8 +46,18 @@ async def get_user_by_id(db: AsyncSession, user_id: int) -> Optional[User]: user = result.scalar_one_or_none() if user and user.subscription: + # Загружаем дополнительные зависимости для subscription _ = user.subscription.is_active + if user and user.promo_group: + # Убедимся, что все атрибуты promo_group доступны + _ = user.promo_group.name + _ = user.promo_group.server_discount_percent + _ = user.promo_group.traffic_discount_percent + _ = user.promo_group.device_discount_percent + _ = user.promo_group.period_discounts + _ = user.promo_group.is_default + return user @@ -64,7 +74,17 @@ async def get_user_by_telegram_id(db: AsyncSession, telegram_id: int) -> Optiona user = result.scalar_one_or_none() if user and user.subscription: + # Загружаем дополнительные зависимости для subscription _ = user.subscription.is_active + + if user and user.promo_group: + # Убедимся, что все атрибуты promo_group доступны + _ = user.promo_group.name + _ = user.promo_group.server_discount_percent + _ = user.promo_group.traffic_discount_percent + _ = user.promo_group.device_discount_percent + _ = user.promo_group.period_discounts + _ = user.promo_group.is_default return user @@ -88,7 +108,17 @@ async def get_user_by_username(db: AsyncSession, username: str) -> Optional[User user = result.scalar_one_or_none() if user and user.subscription: + # Загружаем дополнительные зависимости для subscription _ = user.subscription.is_active + + if user and user.promo_group: + # Убедимся, что все атрибуты promo_group доступны + _ = user.promo_group.name + _ = user.promo_group.server_discount_percent + _ = user.promo_group.traffic_discount_percent + _ = user.promo_group.device_discount_percent + _ = user.promo_group.period_discounts + _ = user.promo_group.is_default return user @@ -96,10 +126,29 @@ async def get_user_by_username(db: AsyncSession, username: str) -> Optional[User async def get_user_by_referral_code(db: AsyncSession, referral_code: str) -> Optional[User]: result = await db.execute( select(User) - .options(selectinload(User.promo_group)) + .options( + selectinload(User.subscription), + selectinload(User.promo_group), + selectinload(User.referrer), + ) .where(User.referral_code == referral_code) ) - return result.scalar_one_or_none() + user = result.scalar_one_or_none() + + if user and user.subscription: + # Загружаем дополнительные зависимости для subscription + _ = user.subscription.is_active + + if user and user.promo_group: + # Убедимся, что все атрибуты promo_group доступны + _ = user.promo_group.name + _ = user.promo_group.server_discount_percent + _ = user.promo_group.traffic_discount_percent + _ = user.promo_group.device_discount_percent + _ = user.promo_group.period_discounts + _ = user.promo_group.is_default + + return user async def create_unique_referral_code(db: AsyncSession) -> str: @@ -569,7 +618,11 @@ async def get_users_list( order_by_purchase_count: bool = False ) -> List[User]: - query = select(User).options(selectinload(User.subscription)) + query = select(User).options( + selectinload(User.subscription), + selectinload(User.promo_group), + selectinload(User.referrer), + ) if status: query = query.where(User.status == status.value) @@ -664,7 +717,24 @@ async def get_users_list( query = query.offset(offset).limit(limit) result = await db.execute(query) - return result.scalars().all() + users = result.scalars().all() + + # Загружаем дополнительные зависимости для всех пользователей + for user in users: + if user and user.subscription: + # Загружаем дополнительные зависимости для subscription + _ = user.subscription.is_active + + if user and user.promo_group: + # Убедимся, что все атрибуты promo_group доступны + _ = user.promo_group.name + _ = user.promo_group.server_discount_percent + _ = user.promo_group.traffic_discount_percent + _ = user.promo_group.device_discount_percent + _ = user.promo_group.period_discounts + _ = user.promo_group.is_default + + return users async def get_users_count( @@ -764,11 +834,29 @@ async def get_referrals(db: AsyncSession, user_id: int) -> List[User]: .options( selectinload(User.subscription), selectinload(User.promo_group), + selectinload(User.referrer), ) .where(User.referred_by_id == user_id) .order_by(User.created_at.desc()) ) - return result.scalars().all() + users = result.scalars().all() + + # Загружаем дополнительные зависимости для всех пользователей + for user in users: + if user and user.subscription: + # Загружаем дополнительные зависимости для subscription + _ = user.subscription.is_active + + if user and user.promo_group: + # Убедимся, что все атрибуты promo_group доступны + _ = user.promo_group.name + _ = user.promo_group.server_discount_percent + _ = user.promo_group.traffic_discount_percent + _ = user.promo_group.device_discount_percent + _ = user.promo_group.period_discounts + _ = user.promo_group.is_default + + return users async def get_users_for_promo_segment(db: AsyncSession, segment: str) -> List[User]: @@ -776,7 +864,11 @@ async def get_users_for_promo_segment(db: AsyncSession, segment: str) -> List[Us base_query = ( select(User) - .options(selectinload(User.subscription)) + .options( + selectinload(User.subscription), + selectinload(User.promo_group), + selectinload(User.referrer), + ) .where(User.status == UserStatus.ACTIVE.value) ) @@ -821,7 +913,24 @@ async def get_users_for_promo_segment(db: AsyncSession, segment: str) -> List[Us return [] result = await db.execute(query.order_by(User.id)) - return result.scalars().unique().all() + users = result.scalars().unique().all() + + # Загружаем дополнительные зависимости для всех пользователей + for user in users: + if user and user.subscription: + # Загружаем дополнительные зависимости для subscription + _ = user.subscription.is_active + + if user and user.promo_group: + # Убедимся, что все атрибуты promo_group доступны + _ = user.promo_group.name + _ = user.promo_group.server_discount_percent + _ = user.promo_group.traffic_discount_percent + _ = user.promo_group.device_discount_percent + _ = user.promo_group.period_discounts + _ = user.promo_group.is_default + + return users async def get_inactive_users(db: AsyncSession, months: int = 3) -> List[User]: @@ -832,6 +941,7 @@ async def get_inactive_users(db: AsyncSession, months: int = 3) -> List[User]: .options( selectinload(User.subscription), selectinload(User.promo_group), + selectinload(User.referrer), ) .where( and_( @@ -840,7 +950,24 @@ async def get_inactive_users(db: AsyncSession, months: int = 3) -> List[User]: ) ) ) - return result.scalars().all() + users = result.scalars().all() + + # Загружаем дополнительные зависимости для всех пользователей + for user in users: + if user and user.subscription: + # Загружаем дополнительные зависимости для subscription + _ = user.subscription.is_active + + if user and user.promo_group: + # Убедимся, что все атрибуты promo_group доступны + _ = user.promo_group.name + _ = user.promo_group.server_discount_percent + _ = user.promo_group.traffic_discount_percent + _ = user.promo_group.device_discount_percent + _ = user.promo_group.period_discounts + _ = user.promo_group.is_default + + return users async def delete_user(db: AsyncSession, user: User) -> bool: From dc196fb478798cc94924664c075e1305886f0418 Mon Sep 17 00:00:00 2001 From: Egor Date: Tue, 4 Nov 2025 09:30:21 +0300 Subject: [PATCH 11/12] Optimize user promo_group and subscription access Refactor user attribute access to avoid lazy loading. --- app/services/payment/mulenpay.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/app/services/payment/mulenpay.py b/app/services/payment/mulenpay.py index d7514a8b..fe27ab25 100644 --- a/app/services/payment/mulenpay.py +++ b/app/services/payment/mulenpay.py @@ -272,8 +272,9 @@ class MulenPayPaymentMixin: await db.refresh(user) - promo_group = getattr(user, "promo_group", None) - subscription = getattr(user, "subscription", None) + # Используем предзагруженные значения для избежания lazy-загрузки + promo_group = user.promo_group if hasattr(user, 'promo_group') and user.promo_group else None + subscription = user.subscription if hasattr(user, 'subscription') and user.subscription else None referrer_info = format_referrer_info(user) topup_status = ( "🆕 Первое пополнение" if was_first_topup else "🔄 Пополнение" From c8aba42d3db7e9fd3ec751b7bd90504d574e0245 Mon Sep 17 00:00:00 2001 From: Egor Date: Tue, 4 Nov 2025 16:33:34 +0300 Subject: [PATCH 12/12] Reload user after MulenPay callback commit --- app/services/payment/mulenpay.py | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/app/services/payment/mulenpay.py b/app/services/payment/mulenpay.py index fe27ab25..4ef4f1bf 100644 --- a/app/services/payment/mulenpay.py +++ b/app/services/payment/mulenpay.py @@ -270,11 +270,24 @@ class MulenPayPaymentMixin: user.has_made_first_topup = True await db.commit() - await db.refresh(user) + # После коммита отношения пользователя могли быть сброшены, поэтому + # повторно загружаем пользователя с предзагрузкой зависимостей + user = await payment_module.get_user_by_id(db, user.id) + if not user: + logger.error( + "Пользователь %s не найден при повторной загрузке после %s", + payment.user_id, + display_name, + ) + return False # Используем предзагруженные значения для избежания lazy-загрузки - promo_group = user.promo_group if hasattr(user, 'promo_group') and user.promo_group else None - subscription = user.subscription if hasattr(user, 'subscription') and user.subscription else None + promo_group = ( + user.promo_group if hasattr(user, "promo_group") and user.promo_group else None + ) + subscription = ( + user.subscription if hasattr(user, "subscription") and user.subscription else None + ) referrer_info = format_referrer_info(user) topup_status = ( "🆕 Первое пополнение" if was_first_topup else "🔄 Пополнение"