diff --git a/app/services/remnawave_service.py b/app/services/remnawave_service.py index 84e90c6f..cf73665e 100644 --- a/app/services/remnawave_service.py +++ b/app/services/remnawave_service.py @@ -132,6 +132,67 @@ class RemnaWaveService: except Exception as e: logger.warning(f"⚠️ Не удалось распарсить дату '{date_str}': {e}. Используем дефолтную дату.") return self._now_in_panel_timezone() + timedelta(days=30) + + def _safe_panel_expire_date(self, panel_user: Dict[str, Any]) -> datetime: + """Парсит дату окончания подписки пользователя панели для сравнения.""" + + expire_at_value = panel_user.get('expireAt') + + if expire_at_value is None: + return datetime.min.replace(tzinfo=None) + + expire_at_str = str(expire_at_value).strip() + if not expire_at_str: + return datetime.min.replace(tzinfo=None) + + return self._parse_remnawave_date(expire_at_str) + + def _is_preferred_panel_user( + self, + *, + candidate: Dict[str, Any], + current: Dict[str, Any], + ) -> bool: + """Определяет, является ли новая запись предпочтительной для Telegram ID.""" + + candidate_expire = self._safe_panel_expire_date(candidate) + current_expire = self._safe_panel_expire_date(current) + + if candidate_expire > current_expire: + return True + if candidate_expire < current_expire: + return False + + candidate_status = (candidate.get('status') or '').upper() + current_status = (current.get('status') or '').upper() + + active_statuses = {'ACTIVE', 'TRIAL'} + if candidate_status in active_statuses and current_status not in active_statuses: + return True + + return False + + def _deduplicate_panel_users_by_telegram_id( + self, + panel_users: List[Dict[str, Any]], + ) -> Dict[Any, Dict[str, Any]]: + """Возвращает уникальных пользователей панели по Telegram ID.""" + + unique_users: Dict[Any, Dict[str, Any]] = {} + + for panel_user in panel_users: + telegram_id = panel_user.get('telegramId') + if telegram_id is None: + continue + + existing_user = unique_users.get(telegram_id) + if existing_user is None or self._is_preferred_panel_user( + candidate=panel_user, + current=existing_user, + ): + unique_users[telegram_id] = panel_user + + return unique_users async def get_system_statistics(self) -> Dict[str, Any]: try: @@ -773,24 +834,32 @@ class RemnaWaveService: logger.info(f"📊 Пользователей в боте: {len(bot_users)}") panel_users_with_tg = [ - user for user in panel_users + user for user in panel_users if user.get('telegramId') is not None ] - + logger.info(f"📊 Пользователей в панели с Telegram ID: {len(panel_users_with_tg)}") - - panel_telegram_ids = set() - - for i, panel_user in enumerate(panel_users_with_tg): + + unique_panel_users_map = self._deduplicate_panel_users_by_telegram_id(panel_users_with_tg) + unique_panel_users = list(unique_panel_users_map.values()) + duplicates_count = len(panel_users_with_tg) - len(unique_panel_users) + + if duplicates_count: + logger.info( + "♻️ Обнаружено %s дубликатов пользователей по Telegram ID. Используем самые свежие записи.", + duplicates_count, + ) + + panel_telegram_ids = set(unique_panel_users_map.keys()) + + for i, panel_user in enumerate(unique_panel_users): try: telegram_id = panel_user.get('telegramId') if not telegram_id: continue - - panel_telegram_ids.add(telegram_id) - - if (i + 1) % 10 == 0: - logger.info(f"🔄 Обрабатываем пользователя {i+1}/{len(panel_users_with_tg)}: {telegram_id}") + + if (i + 1) % 10 == 0: + logger.info(f"🔄 Обрабатываем пользователя {i+1}/{len(unique_panel_users)}: {telegram_id}") db_user = bot_users_by_telegram_id.get(telegram_id) diff --git a/tests/services/test_remnawave_service_sync.py b/tests/services/test_remnawave_service_sync.py new file mode 100644 index 00000000..1024005e --- /dev/null +++ b/tests/services/test_remnawave_service_sync.py @@ -0,0 +1,61 @@ +from datetime import datetime +from pathlib import Path +import sys +from zoneinfo import ZoneInfo + +ROOT_DIR = Path(__file__).resolve().parents[2] +if str(ROOT_DIR) not in sys.path: + sys.path.insert(0, str(ROOT_DIR)) + +from app.services.remnawave_service import RemnaWaveService + + +def _create_service() -> RemnaWaveService: + service = RemnaWaveService.__new__(RemnaWaveService) + service._panel_timezone = ZoneInfo("UTC") + return service + + +def _make_panel_user(telegram_id: int, expire_at: str, status: str = "ACTIVE") -> dict: + return { + "telegramId": telegram_id, + "expireAt": expire_at, + "status": status, + } + + +def test_deduplicate_prefers_latest_expire_date(): + service = _create_service() + + telegram_id = 100 + older = _make_panel_user(telegram_id, datetime(2025, 1, 1, 0, 0, 0).isoformat()) + newer = _make_panel_user(telegram_id, datetime(2025, 2, 1, 0, 0, 0).isoformat()) + + deduplicated = service._deduplicate_panel_users_by_telegram_id([older, newer]) + + assert deduplicated[telegram_id] is newer + + +def test_deduplicate_prefers_active_status_on_same_expire(): + service = _create_service() + + telegram_id = 200 + expire = datetime(2025, 1, 1, 0, 0, 0).isoformat() + disabled = _make_panel_user(telegram_id, expire, status="DISABLED") + active = _make_panel_user(telegram_id, expire, status="ACTIVE") + + deduplicated = service._deduplicate_panel_users_by_telegram_id([disabled, active]) + + assert deduplicated[telegram_id] is active + + +def test_deduplicate_ignores_records_without_expire_date(): + service = _create_service() + + telegram_id = 300 + missing_expire = _make_panel_user(telegram_id, "") + valid = _make_panel_user(telegram_id, datetime(2025, 3, 1, 0, 0, 0).isoformat()) + + deduplicated = service._deduplicate_panel_users_by_telegram_id([missing_expire, valid]) + + assert deduplicated[telegram_id] is valid