Merge pull request #1728 from Fr1ngg/bedolaga/fix-user-visibility-issue-in-remnawave-bot-4srv7j

Restore RemnaWave UUID map after sync rollbacks
This commit is contained in:
Egor
2025-11-06 07:31:11 +03:00
committed by GitHub

View File

@@ -46,6 +46,74 @@ from app.utils.timezone import get_local_timezone
logger = logging.getLogger(__name__)
_UUID_MAP_MISSING = object()
class _UUIDMapMutation:
"""Tracks in-memory UUID map/user changes so they can be rolled back."""
__slots__ = ("uuid_map", "_map_original", "_user_original")
def __init__(self, uuid_map: Dict[str, "User"]):
self.uuid_map = uuid_map
self._map_original: Dict[str, Any] = {}
self._user_original: Dict["User", Tuple[Optional[str], Optional[datetime]]] = {}
def _capture_user_state(self, user: Optional["User"]) -> None:
if not user or user in self._user_original:
return
self._user_original[user] = (
getattr(user, "remnawave_uuid", None),
getattr(user, "updated_at", None),
)
def _capture_map_entry(self, key: Optional[str]) -> None:
if key is None or key in self._map_original:
return
self._map_original[key] = self.uuid_map.get(key, _UUID_MAP_MISSING)
def set_user_uuid(self, user: Optional["User"], value: Optional[str]) -> None:
if not user:
return
self._capture_user_state(user)
user.remnawave_uuid = value
def set_user_updated_at(self, user: Optional["User"], value: datetime) -> None:
if not user:
return
self._capture_user_state(user)
user.updated_at = value
def remove_map_entry(self, key: Optional[str]) -> None:
if key is None:
return
self._capture_map_entry(key)
self.uuid_map.pop(key, None)
def set_map_entry(self, key: Optional[str], value: Optional["User"]) -> None:
if key is None:
return
self._capture_map_entry(key)
if value is None:
self.uuid_map.pop(key, None)
else:
self.uuid_map[key] = value
def has_changes(self) -> bool:
return bool(self._map_original or self._user_original)
def rollback(self) -> None:
for user, (uuid_value, updated_at) in self._user_original.items():
user.remnawave_uuid = uuid_value
user.updated_at = updated_at
for key, original in self._map_original.items():
if original is _UUID_MAP_MISSING:
self.uuid_map.pop(key, None)
else:
self.uuid_map[key] = original
class RemnaWaveConfigurationError(Exception):
"""Raised when RemnaWave API configuration is missing."""
@@ -93,6 +161,54 @@ class RemnaWaveService:
self._config_error or "RemnaWave API не настроен"
)
def _ensure_user_remnawave_uuid(
self,
user: "User",
panel_uuid: Optional[str],
uuid_map: Dict[str, "User"],
) -> Tuple[bool, Optional[_UUIDMapMutation]]:
"""Обновляет UUID пользователя, если он изменился в панели."""
if not panel_uuid:
return False, None
current_uuid = getattr(user, "remnawave_uuid", None)
if current_uuid == panel_uuid:
return False, None
mutation = _UUIDMapMutation(uuid_map)
conflicting_user = uuid_map.get(panel_uuid)
if conflicting_user and conflicting_user is not user:
logger.warning(
"♻️ Обнаружен конфликт UUID %s между пользователями %s и %s. Сбрасываем у старой записи.",
panel_uuid,
getattr(conflicting_user, "telegram_id", "?"),
getattr(user, "telegram_id", "?"),
)
mutation.set_user_uuid(conflicting_user, None)
mutation.set_user_updated_at(conflicting_user, datetime.utcnow())
mutation.remove_map_entry(panel_uuid)
if current_uuid:
mutation.remove_map_entry(current_uuid)
mutation.set_user_uuid(user, panel_uuid)
mutation.set_user_updated_at(user, datetime.utcnow())
mutation.set_map_entry(panel_uuid, user)
logger.info(
"🔁 Обновлен RemnaWave UUID пользователя %s: %s%s",
getattr(user, "telegram_id", "?"),
current_uuid,
panel_uuid,
)
if mutation.has_changes():
return True, mutation
return True, None
@asynccontextmanager
async def get_api_client(self):
self._ensure_configured()
@@ -1008,7 +1124,12 @@ class RemnaWaveService:
)
bot_users = bot_users_result.scalars().all()
bot_users_by_telegram_id = {user.telegram_id: user for user in bot_users}
bot_users_by_uuid = {
user.remnawave_uuid: user
for user in bot_users
if getattr(user, "remnawave_uuid", None)
}
logger.info(f"📊 Пользователей в боте: {len(bot_users)}")
panel_users_with_tg = [
@@ -1044,8 +1165,10 @@ class RemnaWaveService:
# Для оптимизации коммитим изменения каждые N пользователей
batch_size = 50
pending_uuid_mutations: List[_UUIDMapMutation] = []
for i, panel_user in enumerate(unique_panel_users):
uuid_mutation: Optional[_UUIDMapMutation] = None
try:
telegram_id = panel_user.get('telegramId')
if not telegram_id:
@@ -1080,8 +1203,11 @@ class RemnaWaveService:
logger.info(f"🔄 Обновлены поля {updated_fields} для пользователя {telegram_id}")
await db.flush() # Сохраняем изменения без коммита
if not db_user.remnawave_uuid:
await update_user(db, db_user, remnawave_uuid=panel_user.get('uuid'))
_, uuid_mutation = self._ensure_user_remnawave_uuid(
db_user,
panel_user.get('uuid'),
bot_users_by_uuid,
)
if is_created:
await self._create_subscription_from_panel_data(db, db_user, panel_user)
@@ -1115,50 +1241,69 @@ class RemnaWaveService:
else:
# Если подписки нет, создаем новую
await self._create_subscription_from_panel_data(db, db_user, panel_user)
if not db_user.remnawave_uuid:
await update_user(db, db_user, remnawave_uuid=panel_user.get('uuid'))
_, uuid_mutation = self._ensure_user_remnawave_uuid(
db_user,
panel_user.get('uuid'),
bot_users_by_uuid,
)
stats["updated"] += 1
logger.debug(f"✅ Обновлён пользователь {telegram_id}")
except Exception as user_error:
logger.error(f"❌ Ошибка обработки пользователя {telegram_id}: {user_error}")
stats["errors"] += 1
if uuid_mutation:
uuid_mutation.rollback()
try:
await db.rollback() # Выполняем rollback при ошибке
except:
pass
continue
else:
if uuid_mutation and uuid_mutation.has_changes():
pending_uuid_mutations.append(uuid_mutation)
# Коммитим изменения каждые N пользователей для ускорения
if (i + 1) % batch_size == 0:
try:
await db.commit()
logger.debug(f"📦 Коммит изменений после обработки {i+1} пользователей")
pending_uuid_mutations.clear()
except Exception as commit_error:
logger.error(f"❌ Ошибка коммита после обработки {i+1} пользователей: {commit_error}")
await db.rollback()
for mutation in reversed(pending_uuid_mutations):
mutation.rollback()
pending_uuid_mutations.clear()
stats["errors"] += batch_size # Учитываем ошибки за всю группу
# Коммитим оставшиеся изменения
try:
await db.commit()
pending_uuid_mutations.clear()
except Exception as final_commit_error:
logger.error(f"❌ Ошибка финального коммита: {final_commit_error}")
await db.rollback()
for mutation in reversed(pending_uuid_mutations):
mutation.rollback()
pending_uuid_mutations.clear()
if sync_type == "all":
logger.info("🗑️ Деактивация подписок пользователей, отсутствующих в панели...")
batch_size = 50
processed_count = 0
cleanup_uuid_mutations: List[_UUIDMapMutation] = []
for telegram_id, db_user in bot_users_by_telegram_id.items():
if telegram_id not in panel_telegram_ids and hasattr(db_user, 'subscription') and db_user.subscription:
cleanup_mutation: Optional[_UUIDMapMutation] = None
try:
logger.info(f"🗑️ Деактивация подписки пользователя {telegram_id} (нет в панели)")
subscription = db_user.subscription
if db_user.remnawave_uuid:
@@ -1198,42 +1343,60 @@ class RemnaWaveService:
subscription.remnawave_short_uuid = None
subscription.subscription_url = ""
subscription.subscription_crypto_link = ""
db_user.remnawave_uuid = None
old_uuid = getattr(db_user, "remnawave_uuid", None)
cleanup_mutation = _UUIDMapMutation(bot_users_by_uuid)
if old_uuid:
cleanup_mutation.remove_map_entry(old_uuid)
cleanup_mutation.set_user_uuid(db_user, None)
cleanup_mutation.set_user_updated_at(db_user, datetime.utcnow())
stats["deleted"] += 1
logger.info(f"✅ Деактивирована подписка пользователя {telegram_id} (сохранен баланс)")
processed_count += 1
# Коммитим изменения каждые N пользователей
if processed_count % batch_size == 0:
try:
await db.commit()
logger.debug(f"📦 Коммит изменений после деактивации {processed_count} подписок")
cleanup_uuid_mutations.clear()
except Exception as commit_error:
logger.error(f"❌ Ошибка коммита после деактивации {processed_count} подписок: {commit_error}")
await db.rollback()
for mutation in reversed(cleanup_uuid_mutations):
mutation.rollback()
cleanup_uuid_mutations.clear()
stats["errors"] += batch_size
break # Прерываем цикл при ошибке коммита
except Exception as delete_error:
logger.error(f"❌ Ошибка деактивации подписки {telegram_id}: {delete_error}")
stats["errors"] += 1
if cleanup_mutation:
cleanup_mutation.rollback()
try:
await db.rollback()
except:
pass
else:
if cleanup_mutation and cleanup_mutation.has_changes():
cleanup_uuid_mutations.append(cleanup_mutation)
else:
# Увеличиваем счетчик для отслеживания прогресса
processed_count += 1
# Коммитим оставшиеся изменения
try:
await db.commit()
cleanup_uuid_mutations.clear()
except Exception as final_commit_error:
logger.error(f"❌ Ошибка финального коммита при деактивации: {final_commit_error}")
await db.rollback()
for mutation in reversed(cleanup_uuid_mutations):
mutation.rollback()
cleanup_uuid_mutations.clear()
logger.info(f"🎯 Синхронизация завершена: создано {stats['created']}, обновлено {stats['updated']}, деактивировано {stats['deleted']}, ошибок {stats['errors']}")
return stats
@@ -1387,8 +1550,16 @@ class RemnaWaveService:
subscription.device_limit = device_limit
logger.debug(f"Обновлен лимит устройств: {device_limit}")
if not subscription.remnawave_short_uuid:
subscription.remnawave_short_uuid = panel_user.get('shortUuid')
new_short_uuid = panel_user.get('shortUuid')
if new_short_uuid and subscription.remnawave_short_uuid != new_short_uuid:
old_short_uuid = subscription.remnawave_short_uuid
subscription.remnawave_short_uuid = new_short_uuid
logger.debug(
"Обновлен short UUID подписки пользователя %s: %s%s",
getattr(user, "telegram_id", "?"),
old_short_uuid,
new_short_uuid,
)
panel_url = panel_user.get('subscriptionUrl', '')
if not subscription.subscription_url or subscription.subscription_url != panel_url: