From 7bb21c9f771856055161bff8e4ef5a9c9c8930e6 Mon Sep 17 00:00:00 2001 From: Egor Date: Thu, 6 Nov 2025 07:20:34 +0300 Subject: [PATCH] Protect RemnaWave UUID map updates during rollbacks --- app/services/remnawave_service.py | 217 ++++++++++++++++++++++++++---- 1 file changed, 194 insertions(+), 23 deletions(-) diff --git a/app/services/remnawave_service.py b/app/services/remnawave_service.py index de7f2475..f451e51a 100644 --- a/app/services/remnawave_service.py +++ b/app/services/remnawave_service.py @@ -46,6 +46,74 @@ from app.utils.timezone import get_local_timezone logger = logging.getLogger(__name__) +_UUID_MAP_MISSING = object() + + +class _UUIDMapMutation: + """Tracks in-memory UUID map/user changes so they can be rolled back.""" + + __slots__ = ("uuid_map", "_map_original", "_user_original") + + def __init__(self, uuid_map: Dict[str, "User"]): + self.uuid_map = uuid_map + self._map_original: Dict[str, Any] = {} + self._user_original: Dict["User", Tuple[Optional[str], Optional[datetime]]] = {} + + def _capture_user_state(self, user: Optional["User"]) -> None: + if not user or user in self._user_original: + return + self._user_original[user] = ( + getattr(user, "remnawave_uuid", None), + getattr(user, "updated_at", None), + ) + + def _capture_map_entry(self, key: Optional[str]) -> None: + if key is None or key in self._map_original: + return + self._map_original[key] = self.uuid_map.get(key, _UUID_MAP_MISSING) + + def set_user_uuid(self, user: Optional["User"], value: Optional[str]) -> None: + if not user: + return + self._capture_user_state(user) + user.remnawave_uuid = value + + def set_user_updated_at(self, user: Optional["User"], value: datetime) -> None: + if not user: + return + self._capture_user_state(user) + user.updated_at = value + + def remove_map_entry(self, key: Optional[str]) -> None: + if key is None: + return + self._capture_map_entry(key) + self.uuid_map.pop(key, None) + + def set_map_entry(self, key: Optional[str], value: Optional["User"]) -> None: + if key is None: + return + self._capture_map_entry(key) + if value is None: + self.uuid_map.pop(key, None) + else: + self.uuid_map[key] = value + + def has_changes(self) -> bool: + return bool(self._map_original or self._user_original) + + def rollback(self) -> None: + for user, (uuid_value, updated_at) in self._user_original.items(): + user.remnawave_uuid = uuid_value + user.updated_at = updated_at + + for key, original in self._map_original.items(): + if original is _UUID_MAP_MISSING: + self.uuid_map.pop(key, None) + else: + self.uuid_map[key] = original + + class RemnaWaveConfigurationError(Exception): """Raised when RemnaWave API configuration is missing.""" @@ -93,6 +161,54 @@ class RemnaWaveService: self._config_error or "RemnaWave API не настроен" ) + def _ensure_user_remnawave_uuid( + self, + user: "User", + panel_uuid: Optional[str], + uuid_map: Dict[str, "User"], + ) -> Tuple[bool, Optional[_UUIDMapMutation]]: + """Обновляет UUID пользователя, если он изменился в панели.""" + + if not panel_uuid: + return False, None + + current_uuid = getattr(user, "remnawave_uuid", None) + if current_uuid == panel_uuid: + return False, None + + mutation = _UUIDMapMutation(uuid_map) + + conflicting_user = uuid_map.get(panel_uuid) + if conflicting_user and conflicting_user is not user: + logger.warning( + "♻️ Обнаружен конфликт UUID %s между пользователями %s и %s. Сбрасываем у старой записи.", + panel_uuid, + getattr(conflicting_user, "telegram_id", "?"), + getattr(user, "telegram_id", "?"), + ) + mutation.set_user_uuid(conflicting_user, None) + mutation.set_user_updated_at(conflicting_user, datetime.utcnow()) + mutation.remove_map_entry(panel_uuid) + + if current_uuid: + mutation.remove_map_entry(current_uuid) + + mutation.set_user_uuid(user, panel_uuid) + mutation.set_user_updated_at(user, datetime.utcnow()) + mutation.set_map_entry(panel_uuid, user) + + logger.info( + "🔁 Обновлен RemnaWave UUID пользователя %s: %s → %s", + getattr(user, "telegram_id", "?"), + current_uuid, + panel_uuid, + ) + + if mutation.has_changes(): + return True, mutation + + return True, None + @asynccontextmanager async def get_api_client(self): self._ensure_configured() @@ -1008,7 +1124,12 @@ class RemnaWaveService: ) bot_users = bot_users_result.scalars().all() bot_users_by_telegram_id = {user.telegram_id: user for user in bot_users} - + bot_users_by_uuid = { + user.remnawave_uuid: user + for user in bot_users + if getattr(user, "remnawave_uuid", None) + } + logger.info(f"📊 Пользователей в боте: {len(bot_users)}") panel_users_with_tg = [ @@ -1044,8 +1165,10 @@ class RemnaWaveService: # Для оптимизации коммитим изменения каждые N пользователей batch_size = 50 - + pending_uuid_mutations: List[_UUIDMapMutation] = [] + for i, panel_user in enumerate(unique_panel_users): + uuid_mutation: Optional[_UUIDMapMutation] = None try: telegram_id = panel_user.get('telegramId') if not telegram_id: @@ -1080,8 +1203,11 @@ class RemnaWaveService: logger.info(f"🔄 Обновлены поля {updated_fields} для пользователя {telegram_id}") await db.flush() # Сохраняем изменения без коммита - if not db_user.remnawave_uuid: - await update_user(db, db_user, remnawave_uuid=panel_user.get('uuid')) + _, uuid_mutation = self._ensure_user_remnawave_uuid( + db_user, + panel_user.get('uuid'), + bot_users_by_uuid, + ) if is_created: await self._create_subscription_from_panel_data(db, db_user, panel_user) @@ -1115,50 +1241,69 @@ class RemnaWaveService: else: # Если подписки нет, создаем новую await self._create_subscription_from_panel_data(db, db_user, panel_user) - - if not db_user.remnawave_uuid: - await update_user(db, db_user, remnawave_uuid=panel_user.get('uuid')) - + + _, uuid_mutation = self._ensure_user_remnawave_uuid( + db_user, + panel_user.get('uuid'), + bot_users_by_uuid, + ) + stats["updated"] += 1 logger.debug(f"✅ Обновлён пользователь {telegram_id}") - + except Exception as user_error: logger.error(f"❌ Ошибка обработки пользователя {telegram_id}: {user_error}") stats["errors"] += 1 + if uuid_mutation: + uuid_mutation.rollback() try: await db.rollback() # Выполняем rollback при ошибке except: pass continue + else: + if uuid_mutation and uuid_mutation.has_changes(): + pending_uuid_mutations.append(uuid_mutation) + # Коммитим изменения каждые N пользователей для ускорения if (i + 1) % batch_size == 0: try: await db.commit() logger.debug(f"📦 Коммит изменений после обработки {i+1} пользователей") + pending_uuid_mutations.clear() except Exception as commit_error: logger.error(f"❌ Ошибка коммита после обработки {i+1} пользователей: {commit_error}") await db.rollback() + for mutation in reversed(pending_uuid_mutations): + mutation.rollback() + pending_uuid_mutations.clear() stats["errors"] += batch_size # Учитываем ошибки за всю группу - + # Коммитим оставшиеся изменения try: await db.commit() + pending_uuid_mutations.clear() except Exception as final_commit_error: logger.error(f"❌ Ошибка финального коммита: {final_commit_error}") await db.rollback() - + for mutation in reversed(pending_uuid_mutations): + mutation.rollback() + pending_uuid_mutations.clear() + if sync_type == "all": logger.info("🗑️ Деактивация подписок пользователей, отсутствующих в панели...") - + batch_size = 50 processed_count = 0 - + cleanup_uuid_mutations: List[_UUIDMapMutation] = [] + for telegram_id, db_user in bot_users_by_telegram_id.items(): if telegram_id not in panel_telegram_ids and hasattr(db_user, 'subscription') and db_user.subscription: + cleanup_mutation: Optional[_UUIDMapMutation] = None try: logger.info(f"🗑️ Деактивация подписки пользователя {telegram_id} (нет в панели)") - + subscription = db_user.subscription if db_user.remnawave_uuid: @@ -1198,42 +1343,60 @@ class RemnaWaveService: subscription.remnawave_short_uuid = None subscription.subscription_url = "" subscription.subscription_crypto_link = "" - - db_user.remnawave_uuid = None - + + old_uuid = getattr(db_user, "remnawave_uuid", None) + cleanup_mutation = _UUIDMapMutation(bot_users_by_uuid) + if old_uuid: + cleanup_mutation.remove_map_entry(old_uuid) + cleanup_mutation.set_user_uuid(db_user, None) + cleanup_mutation.set_user_updated_at(db_user, datetime.utcnow()) + stats["deleted"] += 1 logger.info(f"✅ Деактивирована подписка пользователя {telegram_id} (сохранен баланс)") - + processed_count += 1 - + # Коммитим изменения каждые N пользователей if processed_count % batch_size == 0: try: await db.commit() logger.debug(f"📦 Коммит изменений после деактивации {processed_count} подписок") + cleanup_uuid_mutations.clear() except Exception as commit_error: logger.error(f"❌ Ошибка коммита после деактивации {processed_count} подписок: {commit_error}") await db.rollback() + for mutation in reversed(cleanup_uuid_mutations): + mutation.rollback() + cleanup_uuid_mutations.clear() stats["errors"] += batch_size break # Прерываем цикл при ошибке коммита - + except Exception as delete_error: logger.error(f"❌ Ошибка деактивации подписки {telegram_id}: {delete_error}") stats["errors"] += 1 + if cleanup_mutation: + cleanup_mutation.rollback() try: await db.rollback() except: pass + else: + if cleanup_mutation and cleanup_mutation.has_changes(): + cleanup_uuid_mutations.append(cleanup_mutation) else: # Увеличиваем счетчик для отслеживания прогресса processed_count += 1 - + # Коммитим оставшиеся изменения try: await db.commit() + cleanup_uuid_mutations.clear() except Exception as final_commit_error: logger.error(f"❌ Ошибка финального коммита при деактивации: {final_commit_error}") await db.rollback() + for mutation in reversed(cleanup_uuid_mutations): + mutation.rollback() + cleanup_uuid_mutations.clear() logger.info(f"🎯 Синхронизация завершена: создано {stats['created']}, обновлено {stats['updated']}, деактивировано {stats['deleted']}, ошибок {stats['errors']}") return stats @@ -1387,8 +1550,16 @@ class RemnaWaveService: subscription.device_limit = device_limit logger.debug(f"Обновлен лимит устройств: {device_limit}") - if not subscription.remnawave_short_uuid: - subscription.remnawave_short_uuid = panel_user.get('shortUuid') + new_short_uuid = panel_user.get('shortUuid') + if new_short_uuid and subscription.remnawave_short_uuid != new_short_uuid: + old_short_uuid = subscription.remnawave_short_uuid + subscription.remnawave_short_uuid = new_short_uuid + logger.debug( + "Обновлен short UUID подписки пользователя %s: %s → %s", + getattr(user, "telegram_id", "?"), + old_short_uuid, + new_short_uuid, + ) panel_url = panel_user.get('subscriptionUrl', '') if not subscription.subscription_url or subscription.subscription_url != panel_url: