From 9db806d3871a26b98d786c8447c55eae702d8e59 Mon Sep 17 00:00:00 2001 From: Egor Date: Thu, 30 Oct 2025 01:15:19 +0300 Subject: [PATCH] Handle Telegram ID uniqueness during RemnaWave sync --- app/services/remnawave_service.py | 196 +++++++++++++++--- tests/services/test_remnawave_service_sync.py | 121 +++++++++++ 2 files changed, 287 insertions(+), 30 deletions(-) create mode 100644 tests/services/test_remnawave_service_sync.py diff --git a/app/services/remnawave_service.py b/app/services/remnawave_service.py index 84e90c6f..c7d6a279 100644 --- a/app/services/remnawave_service.py +++ b/app/services/remnawave_service.py @@ -3,7 +3,7 @@ import os import re from contextlib import AsyncExitStack, asynccontextmanager from datetime import datetime, timedelta -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Tuple from zoneinfo import ZoneInfo @@ -13,9 +13,15 @@ from app.external.remnawave_api import ( RemnaWaveNode, UserStatus, TrafficLimitStrategy, RemnaWaveAPIError ) from sqlalchemy import and_, cast, delete, func, select, update, String +from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import selectinload from sqlalchemy.ext.asyncio import AsyncSession -from app.database.crud.user import get_users_list, get_user_by_telegram_id, update_user +from app.database.crud.user import ( + create_user, + get_users_list, + get_user_by_telegram_id, + update_user, +) from app.database.crud.subscription import ( get_subscription_by_user_id, update_subscription_usage, @@ -132,6 +138,116 @@ class RemnaWaveService: except Exception as e: logger.warning(f"⚠️ Не удалось распарсить дату '{date_str}': {e}. Используем дефолтную дату.") return self._now_in_panel_timezone() + timedelta(days=30) + + def _safe_panel_expire_date(self, panel_user: Dict[str, Any]) -> datetime: + """Парсит дату окончания подписки пользователя панели для сравнения.""" + + expire_at_value = panel_user.get('expireAt') + + if expire_at_value is None: + return datetime.min.replace(tzinfo=None) + + expire_at_str = str(expire_at_value).strip() + if not expire_at_str: + return datetime.min.replace(tzinfo=None) + + return self._parse_remnawave_date(expire_at_str) + + def _is_preferred_panel_user( + self, + *, + candidate: Dict[str, Any], + current: Dict[str, Any], + ) -> bool: + """Определяет, является ли новая запись предпочтительной для Telegram ID.""" + + candidate_expire = self._safe_panel_expire_date(candidate) + current_expire = self._safe_panel_expire_date(current) + + if candidate_expire > current_expire: + return True + if candidate_expire < current_expire: + return False + + candidate_status = (candidate.get('status') or '').upper() + current_status = (current.get('status') or '').upper() + + active_statuses = {'ACTIVE', 'TRIAL'} + if candidate_status in active_statuses and current_status not in active_statuses: + return True + + return False + + def _deduplicate_panel_users_by_telegram_id( + self, + panel_users: List[Dict[str, Any]], + ) -> Dict[Any, Dict[str, Any]]: + """Возвращает уникальных пользователей панели по Telegram ID.""" + + unique_users: Dict[Any, Dict[str, Any]] = {} + + for panel_user in panel_users: + telegram_id = panel_user.get('telegramId') + if telegram_id is None: + continue + + existing_user = unique_users.get(telegram_id) + if existing_user is None or self._is_preferred_panel_user( + candidate=panel_user, + current=existing_user, + ): + unique_users[telegram_id] = panel_user + + return unique_users + + async def _get_or_create_bot_user_from_panel( + self, + db: AsyncSession, + panel_user: Dict[str, Any], + ) -> Tuple[Optional[User], bool]: + """Возвращает пользователя бота, создавая его при необходимости. + + При конфликте уникальности telegram_id повторно загружает пользователя + из базы данных и сообщает, что запись не была создана заново. + """ + + telegram_id = panel_user.get("telegramId") + if telegram_id is None: + return None, False + + username = panel_user.get("username") or f"user_{telegram_id}" + + try: + db_user = await create_user( + db=db, + telegram_id=telegram_id, + username=username, + first_name=f"Panel User {telegram_id}", + language="ru", + ) + return db_user, True + except IntegrityError as create_error: + logger.info( + "♻️ Пользователь с telegram_id %s уже существует. Используем существующую запись.", + telegram_id, + ) + + try: + await db.rollback() + except Exception: + # create_user уже выполняет rollback при необходимости + pass + + existing_user = await get_user_by_telegram_id(db, telegram_id) + if existing_user is None: + raise create_error + + logger.debug( + "Используется существующий пользователь %s после конфликта уникальности: %s", + telegram_id, + create_error, + ) + return existing_user, False async def get_system_statistics(self) -> Dict[str, Any]: try: @@ -773,47 +889,67 @@ class RemnaWaveService: logger.info(f"📊 Пользователей в боте: {len(bot_users)}") panel_users_with_tg = [ - user for user in panel_users + user for user in panel_users if user.get('telegramId') is not None ] - + logger.info(f"📊 Пользователей в панели с Telegram ID: {len(panel_users_with_tg)}") - - panel_telegram_ids = set() - - for i, panel_user in enumerate(panel_users_with_tg): + + unique_panel_users_map = self._deduplicate_panel_users_by_telegram_id(panel_users_with_tg) + unique_panel_users = list(unique_panel_users_map.values()) + duplicates_count = len(panel_users_with_tg) - len(unique_panel_users) + + if duplicates_count: + logger.info( + "♻️ Обнаружено %s дубликатов пользователей по Telegram ID. Используем самые свежие записи.", + duplicates_count, + ) + + panel_telegram_ids = set(unique_panel_users_map.keys()) + + for i, panel_user in enumerate(unique_panel_users): try: telegram_id = panel_user.get('telegramId') if not telegram_id: continue - - panel_telegram_ids.add(telegram_id) - - if (i + 1) % 10 == 0: - logger.info(f"🔄 Обрабатываем пользователя {i+1}/{len(panel_users_with_tg)}: {telegram_id}") + + if (i + 1) % 10 == 0: + logger.info(f"🔄 Обрабатываем пользователя {i+1}/{len(unique_panel_users)}: {telegram_id}") db_user = bot_users_by_telegram_id.get(telegram_id) if not db_user: if sync_type in ["new_only", "all"]: logger.info(f"🆕 Создание пользователя для telegram_id {telegram_id}") - - from app.database.crud.user import create_user - - db_user = await create_user( - db=db, - telegram_id=telegram_id, - username=panel_user.get('username') or f"user_{telegram_id}", - first_name=f"Panel User {telegram_id}", - language="ru" - ) - - await update_user(db, db_user, remnawave_uuid=panel_user.get('uuid')) - - await self._create_subscription_from_panel_data(db, db_user, panel_user) - - stats["created"] += 1 - logger.info(f"✅ Создан пользователь {telegram_id} с подпиской") + + db_user, is_created = await self._get_or_create_bot_user_from_panel(db, panel_user) + + if not db_user: + logger.error( + "❌ Не удалось создать или получить пользователя для telegram_id %s", + telegram_id, + ) + stats["errors"] += 1 + continue + + bot_users_by_telegram_id[telegram_id] = db_user + + if not db_user.remnawave_uuid: + await update_user(db, db_user, remnawave_uuid=panel_user.get('uuid')) + + if is_created or not getattr(db_user, "subscription", None): + await self._create_subscription_from_panel_data(db, db_user, panel_user) + else: + await self._update_subscription_from_panel_data(db, db_user, panel_user) + + if is_created: + stats["created"] += 1 + logger.info(f"✅ Создан пользователь {telegram_id} с подпиской") + else: + stats["updated"] += 1 + logger.info( + f"♻️ Обновлена подписка существующего пользователя {telegram_id}" + ) else: if sync_type in ["update_only", "all"]: diff --git a/tests/services/test_remnawave_service_sync.py b/tests/services/test_remnawave_service_sync.py new file mode 100644 index 00000000..4b0007b5 --- /dev/null +++ b/tests/services/test_remnawave_service_sync.py @@ -0,0 +1,121 @@ +from datetime import datetime +from pathlib import Path +import sys +from unittest.mock import AsyncMock + +import pytest +from sqlalchemy.exc import IntegrityError +from zoneinfo import ZoneInfo + +ROOT_DIR = Path(__file__).resolve().parents[2] +if str(ROOT_DIR) not in sys.path: + sys.path.insert(0, str(ROOT_DIR)) + +from app.services.remnawave_service import RemnaWaveService + + +def _create_service() -> RemnaWaveService: + service = RemnaWaveService.__new__(RemnaWaveService) + service._panel_timezone = ZoneInfo("UTC") + return service + + +def _make_panel_user(telegram_id: int, expire_at: str, status: str = "ACTIVE") -> dict: + return { + "telegramId": telegram_id, + "expireAt": expire_at, + "status": status, + } + + +def test_deduplicate_prefers_latest_expire_date(): + service = _create_service() + + telegram_id = 100 + older = _make_panel_user(telegram_id, datetime(2025, 1, 1, 0, 0, 0).isoformat()) + newer = _make_panel_user(telegram_id, datetime(2025, 2, 1, 0, 0, 0).isoformat()) + + deduplicated = service._deduplicate_panel_users_by_telegram_id([older, newer]) + + assert deduplicated[telegram_id] is newer + + +def test_deduplicate_prefers_active_status_on_same_expire(): + service = _create_service() + + telegram_id = 200 + expire = datetime(2025, 1, 1, 0, 0, 0).isoformat() + disabled = _make_panel_user(telegram_id, expire, status="DISABLED") + active = _make_panel_user(telegram_id, expire, status="ACTIVE") + + deduplicated = service._deduplicate_panel_users_by_telegram_id([disabled, active]) + + assert deduplicated[telegram_id] is active + + +def test_deduplicate_ignores_records_without_expire_date(): + service = _create_service() + + telegram_id = 300 + missing_expire = _make_panel_user(telegram_id, "") + valid = _make_panel_user(telegram_id, datetime(2025, 3, 1, 0, 0, 0).isoformat()) + + deduplicated = service._deduplicate_panel_users_by_telegram_id([missing_expire, valid]) + + assert deduplicated[telegram_id] is valid + + +@pytest.mark.asyncio +async def test_get_or_create_user_handles_unique_violation(monkeypatch): + service = _create_service() + db = AsyncMock() + + panel_user = {"telegramId": 555, "username": "existing"} + existing_user = object() + + create_user_mock = AsyncMock( + side_effect=IntegrityError("stmt", "params", Exception("unique")) + ) + get_user_mock = AsyncMock(return_value=existing_user) + rollback_mock = AsyncMock() + + db.rollback = rollback_mock + + monkeypatch.setattr("app.services.remnawave_service.create_user", create_user_mock) + monkeypatch.setattr( + "app.services.remnawave_service.get_user_by_telegram_id", + get_user_mock, + ) + + user, created = await service._get_or_create_bot_user_from_panel(db, panel_user) + + assert user is existing_user + assert created is False + create_user_mock.assert_awaited_once() + get_user_mock.assert_awaited_once_with(db, 555) + rollback_mock.assert_awaited() + + +@pytest.mark.asyncio +async def test_get_or_create_user_creates_new(monkeypatch): + service = _create_service() + db = AsyncMock() + + panel_user = {"telegramId": 777, "username": "new_user"} + new_user = object() + + create_user_mock = AsyncMock(return_value=new_user) + + monkeypatch.setattr("app.services.remnawave_service.create_user", create_user_mock) + + user, created = await service._get_or_create_bot_user_from_panel(db, panel_user) + + assert user is new_user + assert created is True + create_user_mock.assert_awaited_once_with( + db=db, + telegram_id=777, + username="new_user", + first_name="Panel User 777", + language="ru", + )