From f65aa6a82df2edd200ef08290eecb3e5d765e74b Mon Sep 17 00:00:00 2001 From: Egor Date: Tue, 23 Dec 2025 11:53:11 +0300 Subject: [PATCH] Update remnawave_service.py --- app/services/remnawave_service.py | 63 ++++++++++++++++--------------- 1 file changed, 33 insertions(+), 30 deletions(-) diff --git a/app/services/remnawave_service.py b/app/services/remnawave_service.py index 24c3831f..4231dd45 100644 --- a/app/services/remnawave_service.py +++ b/app/services/remnawave_service.py @@ -1706,46 +1706,49 @@ class RemnaWaveService: raise async def sync_users_to_panel(self, db: AsyncSession) -> Dict[str, int]: + from app.database.crud.subscription import get_subscriptions_batch + try: stats = {"created": 0, "updated": 0, "errors": 0} - batch_size = 500 # Увеличен для ускорения + batch_size = 500 offset = 0 - concurrent_limit = 5 # Параллельные запросы к API (уменьшено для стабильности) + concurrent_limit = 5 async with self.get_api_client() as api: semaphore = asyncio.Semaphore(concurrent_limit) while True: - users = await get_users_list(db, offset=offset, limit=batch_size) + # Получаем подписки напрямую (не через users) + subscriptions = await get_subscriptions_batch(db, offset=offset, limit=batch_size) - if not users: + if not subscriptions: break - # Фильтруем пользователей с подписками и готовим данные - users_with_subscriptions = [u for u in users if u.subscription] + # Фильтруем подписки у которых есть пользователь + valid_subscriptions = [s for s in subscriptions if s.user] - if not users_with_subscriptions: - if len(users) < batch_size: + if not valid_subscriptions: + if len(subscriptions) < batch_size: break offset += batch_size continue # Подготавливаем задачи для параллельного выполнения - async def process_user(user): + async def process_subscription(sub): async with semaphore: try: - subscription = user.subscription - hwid_limit = resolve_hwid_device_limit_for_payload(subscription) - expire_at = self._safe_expire_at_for_panel(subscription.end_date) + user = sub.user + hwid_limit = resolve_hwid_device_limit_for_payload(sub) + expire_at = self._safe_expire_at_for_panel(sub.end_date) # Определяем статус для панели is_subscription_active = ( - subscription.status in ( + sub.status in ( SubscriptionStatus.ACTIVE.value, SubscriptionStatus.TRIAL.value, ) - and subscription.end_date > datetime.utcnow() + and sub.end_date > datetime.utcnow() ) status = UserStatus.ACTIVE if is_subscription_active else UserStatus.DISABLED @@ -1759,7 +1762,7 @@ class RemnaWaveService: username=username, expire_at=expire_at, status=status, - traffic_limit_bytes=subscription.traffic_limit_gb * (1024**3) if subscription.traffic_limit_gb > 0 else 0, + traffic_limit_bytes=sub.traffic_limit_gb * (1024**3) if sub.traffic_limit_gb > 0 else 0, traffic_limit_strategy=TrafficLimitStrategy.MONTH, telegram_id=user.telegram_id, description=settings.format_remnawave_user_description( @@ -1767,7 +1770,7 @@ class RemnaWaveService: username=user.username, telegram_id=user.telegram_id ), - active_internal_squads=subscription.connected_squads, + active_internal_squads=sub.connected_squads, ) if hwid_limit is not None: @@ -1791,7 +1794,7 @@ class RemnaWaveService: traffic_limit_bytes=create_kwargs['traffic_limit_bytes'], traffic_limit_strategy=TrafficLimitStrategy.MONTH, description=create_kwargs['description'], - active_internal_squads=subscription.connected_squads, + active_internal_squads=sub.connected_squads, ) if hwid_limit is not None: @@ -1802,23 +1805,23 @@ class RemnaWaveService: # Сохраняем UUID если его не было if not user.remnawave_uuid: user.remnawave_uuid = panel_uuid - return ("updated", user, None) + return ("updated", sub, None) except RemnaWaveAPIError as api_error: if api_error.status_code == 404: new_user = await api.create_user(**create_kwargs) - return ("created", user, new_user) + return ("created", sub, new_user) else: raise else: new_user = await api.create_user(**create_kwargs) - return ("created", user, new_user) + return ("created", sub, new_user) except Exception as e: - logger.error(f"Ошибка синхронизации пользователя {user.telegram_id} в панель: {e}") - return ("error", user, None) + logger.error(f"Ошибка синхронизации пользователя {sub.user.telegram_id if sub.user else 'N/A'} в панель: {e}") + return ("error", sub, None) # Выполняем параллельно - tasks = [process_user(user) for user in users_with_subscriptions] + tasks = [process_subscription(s) for s in valid_subscriptions] results = await asyncio.gather(*tasks, return_exceptions=True) # Обрабатываем результаты @@ -1827,11 +1830,11 @@ class RemnaWaveService: stats["errors"] += 1 continue - action, user, new_user = result + action, sub, new_user = result if action == "created": - if new_user: - user.remnawave_uuid = new_user.uuid - user.subscription.remnawave_short_uuid = new_user.short_uuid + if new_user and sub.user: + sub.user.remnawave_uuid = new_user.uuid + sub.remnawave_short_uuid = new_user.short_uuid stats["created"] += 1 elif action == "updated": stats["updated"] += 1 @@ -1846,14 +1849,14 @@ class RemnaWaveService: commit_error, ) await db.rollback() - stats["errors"] += len(users_with_subscriptions) + stats["errors"] += len(valid_subscriptions) logger.info( - f"📦 Обработано {offset + len(users)} пользователей: " + f"📦 Обработано {offset + len(subscriptions)} подписок: " f"создано {stats['created']}, обновлено {stats['updated']}, ошибок {stats['errors']}" ) - if len(users) < batch_size: + if len(subscriptions) < batch_size: break offset += batch_size