From b4a69e3c85450bdadacbacd41fbcb55bd01224e5 Mon Sep 17 00:00:00 2001 From: Egor Date: Thu, 30 Oct 2025 01:02:53 +0300 Subject: [PATCH 1/3] Filter RemnaWave sync duplicates by latest subscription --- app/services/remnawave_service.py | 76 ++++++++++++++++++++++++++++--- 1 file changed, 69 insertions(+), 7 deletions(-) diff --git a/app/services/remnawave_service.py b/app/services/remnawave_service.py index 84e90c6f..f1066990 100644 --- a/app/services/remnawave_service.py +++ b/app/services/remnawave_service.py @@ -773,15 +773,75 @@ class RemnaWaveService: logger.info(f"📊 Пользователей в боте: {len(bot_users)}") panel_users_with_tg = [ - user for user in panel_users + user for user in panel_users if user.get('telegramId') is not None ] - + logger.info(f"📊 Пользователей в панели с Telegram ID: {len(panel_users_with_tg)}") - + + def _parse_expire_at(user_data: Dict[str, Any]) -> Optional[datetime]: + expire_at_str = user_data.get('expireAt') + if not expire_at_str: + return None + + try: + return self._parse_remnawave_date(expire_at_str) + except Exception: + logger.debug( + "⚠️ Не удалось разобрать дату expireAt для пользователя %s: %s", + user_data.get('telegramId'), + expire_at_str, + ) + return None + + latest_panel_users_by_telegram_id: Dict[int, Dict[str, Any]] = {} + duplicates_skipped = 0 + + for panel_user in panel_users_with_tg: + telegram_id = panel_user.get('telegramId') + if not telegram_id: + continue + + existing = latest_panel_users_by_telegram_id.get(telegram_id) + if not existing: + latest_panel_users_by_telegram_id[telegram_id] = panel_user + continue + + existing_expire = _parse_expire_at(existing) + candidate_expire = _parse_expire_at(panel_user) + + replace_candidate = False + + if candidate_expire and existing_expire: + replace_candidate = candidate_expire > existing_expire + elif candidate_expire and not existing_expire: + replace_candidate = True + elif not candidate_expire and not existing_expire: + existing_status = (existing.get('status') or '').upper() + candidate_status = (panel_user.get('status') or '').upper() + replace_candidate = candidate_status == 'ACTIVE' and existing_status != 'ACTIVE' + + if replace_candidate: + latest_panel_users_by_telegram_id[telegram_id] = panel_user + else: + duplicates_skipped += 1 + + unique_panel_users = list(latest_panel_users_by_telegram_id.values()) + + if duplicates_skipped: + logger.info( + "📉 Обнаружено и пропущено дубликатов пользователей: %s", + duplicates_skipped, + ) + + logger.info( + "📊 Пользователей после фильтрации дубликатов: %s", + len(unique_panel_users), + ) + panel_telegram_ids = set() - - for i, panel_user in enumerate(panel_users_with_tg): + + for i, panel_user in enumerate(unique_panel_users): try: telegram_id = panel_user.get('telegramId') if not telegram_id: @@ -789,8 +849,10 @@ class RemnaWaveService: panel_telegram_ids.add(telegram_id) - if (i + 1) % 10 == 0: - logger.info(f"🔄 Обрабатываем пользователя {i+1}/{len(panel_users_with_tg)}: {telegram_id}") + if (i + 1) % 10 == 0: + logger.info( + f"🔄 Обрабатываем пользователя {i+1}/{len(unique_panel_users)}: {telegram_id}" + ) db_user = bot_users_by_telegram_id.get(telegram_id) From 1cc37a57c1ee873195cc99a32b7ed119b355a76d Mon Sep 17 00:00:00 2001 From: Egor Date: Thu, 30 Oct 2025 01:03:35 +0300 Subject: [PATCH 2/3] Revert "Handle duplicate RemnaWave panel users during sync" --- app/services/remnawave_service.py | 76 +++---------------------------- 1 file changed, 7 insertions(+), 69 deletions(-) diff --git a/app/services/remnawave_service.py b/app/services/remnawave_service.py index f1066990..84e90c6f 100644 --- a/app/services/remnawave_service.py +++ b/app/services/remnawave_service.py @@ -773,75 +773,15 @@ class RemnaWaveService: logger.info(f"📊 Пользователей в боте: {len(bot_users)}") panel_users_with_tg = [ - user for user in panel_users + user for user in panel_users if user.get('telegramId') is not None ] - + logger.info(f"📊 Пользователей в панели с Telegram ID: {len(panel_users_with_tg)}") - - def _parse_expire_at(user_data: Dict[str, Any]) -> Optional[datetime]: - expire_at_str = user_data.get('expireAt') - if not expire_at_str: - return None - - try: - return self._parse_remnawave_date(expire_at_str) - except Exception: - logger.debug( - "⚠️ Не удалось разобрать дату expireAt для пользователя %s: %s", - user_data.get('telegramId'), - expire_at_str, - ) - return None - - latest_panel_users_by_telegram_id: Dict[int, Dict[str, Any]] = {} - duplicates_skipped = 0 - - for panel_user in panel_users_with_tg: - telegram_id = panel_user.get('telegramId') - if not telegram_id: - continue - - existing = latest_panel_users_by_telegram_id.get(telegram_id) - if not existing: - latest_panel_users_by_telegram_id[telegram_id] = panel_user - continue - - existing_expire = _parse_expire_at(existing) - candidate_expire = _parse_expire_at(panel_user) - - replace_candidate = False - - if candidate_expire and existing_expire: - replace_candidate = candidate_expire > existing_expire - elif candidate_expire and not existing_expire: - replace_candidate = True - elif not candidate_expire and not existing_expire: - existing_status = (existing.get('status') or '').upper() - candidate_status = (panel_user.get('status') or '').upper() - replace_candidate = candidate_status == 'ACTIVE' and existing_status != 'ACTIVE' - - if replace_candidate: - latest_panel_users_by_telegram_id[telegram_id] = panel_user - else: - duplicates_skipped += 1 - - unique_panel_users = list(latest_panel_users_by_telegram_id.values()) - - if duplicates_skipped: - logger.info( - "📉 Обнаружено и пропущено дубликатов пользователей: %s", - duplicates_skipped, - ) - - logger.info( - "📊 Пользователей после фильтрации дубликатов: %s", - len(unique_panel_users), - ) - + panel_telegram_ids = set() - - for i, panel_user in enumerate(unique_panel_users): + + for i, panel_user in enumerate(panel_users_with_tg): try: telegram_id = panel_user.get('telegramId') if not telegram_id: @@ -849,10 +789,8 @@ class RemnaWaveService: panel_telegram_ids.add(telegram_id) - if (i + 1) % 10 == 0: - logger.info( - f"🔄 Обрабатываем пользователя {i+1}/{len(unique_panel_users)}: {telegram_id}" - ) + if (i + 1) % 10 == 0: + logger.info(f"🔄 Обрабатываем пользователя {i+1}/{len(panel_users_with_tg)}: {telegram_id}") db_user = bot_users_by_telegram_id.get(telegram_id) From 0ab89f495681198a2c676f00c62b30e43132fa5a Mon Sep 17 00:00:00 2001 From: Egor Date: Thu, 30 Oct 2025 01:03:58 +0300 Subject: [PATCH 3/3] Deduplicate RemnaWave sync users by telegram id --- app/services/remnawave_service.py | 91 ++++++++++++++++--- tests/services/test_remnawave_service_sync.py | 61 +++++++++++++ 2 files changed, 141 insertions(+), 11 deletions(-) create mode 100644 tests/services/test_remnawave_service_sync.py diff --git a/app/services/remnawave_service.py b/app/services/remnawave_service.py index 84e90c6f..cf73665e 100644 --- a/app/services/remnawave_service.py +++ b/app/services/remnawave_service.py @@ -132,6 +132,67 @@ class RemnaWaveService: except Exception as e: logger.warning(f"⚠️ Не удалось распарсить дату '{date_str}': {e}. Используем дефолтную дату.") return self._now_in_panel_timezone() + timedelta(days=30) + + def _safe_panel_expire_date(self, panel_user: Dict[str, Any]) -> datetime: + """Парсит дату окончания подписки пользователя панели для сравнения.""" + + expire_at_value = panel_user.get('expireAt') + + if expire_at_value is None: + return datetime.min.replace(tzinfo=None) + + expire_at_str = str(expire_at_value).strip() + if not expire_at_str: + return datetime.min.replace(tzinfo=None) + + return self._parse_remnawave_date(expire_at_str) + + def _is_preferred_panel_user( + self, + *, + candidate: Dict[str, Any], + current: Dict[str, Any], + ) -> bool: + """Определяет, является ли новая запись предпочтительной для Telegram ID.""" + + candidate_expire = self._safe_panel_expire_date(candidate) + current_expire = self._safe_panel_expire_date(current) + + if candidate_expire > current_expire: + return True + if candidate_expire < current_expire: + return False + + candidate_status = (candidate.get('status') or '').upper() + current_status = (current.get('status') or '').upper() + + active_statuses = {'ACTIVE', 'TRIAL'} + if candidate_status in active_statuses and current_status not in active_statuses: + return True + + return False + + def _deduplicate_panel_users_by_telegram_id( + self, + panel_users: List[Dict[str, Any]], + ) -> Dict[Any, Dict[str, Any]]: + """Возвращает уникальных пользователей панели по Telegram ID.""" + + unique_users: Dict[Any, Dict[str, Any]] = {} + + for panel_user in panel_users: + telegram_id = panel_user.get('telegramId') + if telegram_id is None: + continue + + existing_user = unique_users.get(telegram_id) + if existing_user is None or self._is_preferred_panel_user( + candidate=panel_user, + current=existing_user, + ): + unique_users[telegram_id] = panel_user + + return unique_users async def get_system_statistics(self) -> Dict[str, Any]: try: @@ -773,24 +834,32 @@ class RemnaWaveService: logger.info(f"📊 Пользователей в боте: {len(bot_users)}") panel_users_with_tg = [ - user for user in panel_users + user for user in panel_users if user.get('telegramId') is not None ] - + logger.info(f"📊 Пользователей в панели с Telegram ID: {len(panel_users_with_tg)}") - - panel_telegram_ids = set() - - for i, panel_user in enumerate(panel_users_with_tg): + + unique_panel_users_map = self._deduplicate_panel_users_by_telegram_id(panel_users_with_tg) + unique_panel_users = list(unique_panel_users_map.values()) + duplicates_count = len(panel_users_with_tg) - len(unique_panel_users) + + if duplicates_count: + logger.info( + "♻️ Обнаружено %s дубликатов пользователей по Telegram ID. Используем самые свежие записи.", + duplicates_count, + ) + + panel_telegram_ids = set(unique_panel_users_map.keys()) + + for i, panel_user in enumerate(unique_panel_users): try: telegram_id = panel_user.get('telegramId') if not telegram_id: continue - - panel_telegram_ids.add(telegram_id) - - if (i + 1) % 10 == 0: - logger.info(f"🔄 Обрабатываем пользователя {i+1}/{len(panel_users_with_tg)}: {telegram_id}") + + if (i + 1) % 10 == 0: + logger.info(f"🔄 Обрабатываем пользователя {i+1}/{len(unique_panel_users)}: {telegram_id}") db_user = bot_users_by_telegram_id.get(telegram_id) diff --git a/tests/services/test_remnawave_service_sync.py b/tests/services/test_remnawave_service_sync.py new file mode 100644 index 00000000..1024005e --- /dev/null +++ b/tests/services/test_remnawave_service_sync.py @@ -0,0 +1,61 @@ +from datetime import datetime +from pathlib import Path +import sys +from zoneinfo import ZoneInfo + +ROOT_DIR = Path(__file__).resolve().parents[2] +if str(ROOT_DIR) not in sys.path: + sys.path.insert(0, str(ROOT_DIR)) + +from app.services.remnawave_service import RemnaWaveService + + +def _create_service() -> RemnaWaveService: + service = RemnaWaveService.__new__(RemnaWaveService) + service._panel_timezone = ZoneInfo("UTC") + return service + + +def _make_panel_user(telegram_id: int, expire_at: str, status: str = "ACTIVE") -> dict: + return { + "telegramId": telegram_id, + "expireAt": expire_at, + "status": status, + } + + +def test_deduplicate_prefers_latest_expire_date(): + service = _create_service() + + telegram_id = 100 + older = _make_panel_user(telegram_id, datetime(2025, 1, 1, 0, 0, 0).isoformat()) + newer = _make_panel_user(telegram_id, datetime(2025, 2, 1, 0, 0, 0).isoformat()) + + deduplicated = service._deduplicate_panel_users_by_telegram_id([older, newer]) + + assert deduplicated[telegram_id] is newer + + +def test_deduplicate_prefers_active_status_on_same_expire(): + service = _create_service() + + telegram_id = 200 + expire = datetime(2025, 1, 1, 0, 0, 0).isoformat() + disabled = _make_panel_user(telegram_id, expire, status="DISABLED") + active = _make_panel_user(telegram_id, expire, status="ACTIVE") + + deduplicated = service._deduplicate_panel_users_by_telegram_id([disabled, active]) + + assert deduplicated[telegram_id] is active + + +def test_deduplicate_ignores_records_without_expire_date(): + service = _create_service() + + telegram_id = 300 + missing_expire = _make_panel_user(telegram_id, "") + valid = _make_panel_user(telegram_id, datetime(2025, 3, 1, 0, 0, 0).isoformat()) + + deduplicated = service._deduplicate_panel_users_by_telegram_id([missing_expire, valid]) + + assert deduplicated[telegram_id] is valid