Update remnawave_service.py

This commit is contained in:
Egor
2025-12-23 11:53:11 +03:00
committed by GitHub
parent 94a7a5fce8
commit f65aa6a82d

View File

@@ -1706,46 +1706,49 @@ class RemnaWaveService:
raise
async def sync_users_to_panel(self, db: AsyncSession) -> Dict[str, int]:
from app.database.crud.subscription import get_subscriptions_batch
try:
stats = {"created": 0, "updated": 0, "errors": 0}
batch_size = 500 # Увеличен для ускорения
batch_size = 500
offset = 0
concurrent_limit = 5 # Параллельные запросы к API (уменьшено для стабильности)
concurrent_limit = 5
async with self.get_api_client() as api:
semaphore = asyncio.Semaphore(concurrent_limit)
while True:
users = await get_users_list(db, offset=offset, limit=batch_size)
# Получаем подписки напрямую (не через users)
subscriptions = await get_subscriptions_batch(db, offset=offset, limit=batch_size)
if not users:
if not subscriptions:
break
# Фильтруем пользователей с подписками и готовим данные
users_with_subscriptions = [u for u in users if u.subscription]
# Фильтруем подписки у которых есть пользователь
valid_subscriptions = [s for s in subscriptions if s.user]
if not users_with_subscriptions:
if len(users) < batch_size:
if not valid_subscriptions:
if len(subscriptions) < batch_size:
break
offset += batch_size
continue
# Подготавливаем задачи для параллельного выполнения
async def process_user(user):
async def process_subscription(sub):
async with semaphore:
try:
subscription = user.subscription
hwid_limit = resolve_hwid_device_limit_for_payload(subscription)
expire_at = self._safe_expire_at_for_panel(subscription.end_date)
user = sub.user
hwid_limit = resolve_hwid_device_limit_for_payload(sub)
expire_at = self._safe_expire_at_for_panel(sub.end_date)
# Определяем статус для панели
is_subscription_active = (
subscription.status in (
sub.status in (
SubscriptionStatus.ACTIVE.value,
SubscriptionStatus.TRIAL.value,
)
and subscription.end_date > datetime.utcnow()
and sub.end_date > datetime.utcnow()
)
status = UserStatus.ACTIVE if is_subscription_active else UserStatus.DISABLED
@@ -1759,7 +1762,7 @@ class RemnaWaveService:
username=username,
expire_at=expire_at,
status=status,
traffic_limit_bytes=subscription.traffic_limit_gb * (1024**3) if subscription.traffic_limit_gb > 0 else 0,
traffic_limit_bytes=sub.traffic_limit_gb * (1024**3) if sub.traffic_limit_gb > 0 else 0,
traffic_limit_strategy=TrafficLimitStrategy.MONTH,
telegram_id=user.telegram_id,
description=settings.format_remnawave_user_description(
@@ -1767,7 +1770,7 @@ class RemnaWaveService:
username=user.username,
telegram_id=user.telegram_id
),
active_internal_squads=subscription.connected_squads,
active_internal_squads=sub.connected_squads,
)
if hwid_limit is not None:
@@ -1791,7 +1794,7 @@ class RemnaWaveService:
traffic_limit_bytes=create_kwargs['traffic_limit_bytes'],
traffic_limit_strategy=TrafficLimitStrategy.MONTH,
description=create_kwargs['description'],
active_internal_squads=subscription.connected_squads,
active_internal_squads=sub.connected_squads,
)
if hwid_limit is not None:
@@ -1802,23 +1805,23 @@ class RemnaWaveService:
# Сохраняем UUID если его не было
if not user.remnawave_uuid:
user.remnawave_uuid = panel_uuid
return ("updated", user, None)
return ("updated", sub, None)
except RemnaWaveAPIError as api_error:
if api_error.status_code == 404:
new_user = await api.create_user(**create_kwargs)
return ("created", user, new_user)
return ("created", sub, new_user)
else:
raise
else:
new_user = await api.create_user(**create_kwargs)
return ("created", user, new_user)
return ("created", sub, new_user)
except Exception as e:
logger.error(f"Ошибка синхронизации пользователя {user.telegram_id} в панель: {e}")
return ("error", user, None)
logger.error(f"Ошибка синхронизации пользователя {sub.user.telegram_id if sub.user else 'N/A'} в панель: {e}")
return ("error", sub, None)
# Выполняем параллельно
tasks = [process_user(user) for user in users_with_subscriptions]
tasks = [process_subscription(s) for s in valid_subscriptions]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Обрабатываем результаты
@@ -1827,11 +1830,11 @@ class RemnaWaveService:
stats["errors"] += 1
continue
action, user, new_user = result
action, sub, new_user = result
if action == "created":
if new_user:
user.remnawave_uuid = new_user.uuid
user.subscription.remnawave_short_uuid = new_user.short_uuid
if new_user and sub.user:
sub.user.remnawave_uuid = new_user.uuid
sub.remnawave_short_uuid = new_user.short_uuid
stats["created"] += 1
elif action == "updated":
stats["updated"] += 1
@@ -1846,14 +1849,14 @@ class RemnaWaveService:
commit_error,
)
await db.rollback()
stats["errors"] += len(users_with_subscriptions)
stats["errors"] += len(valid_subscriptions)
logger.info(
f"📦 Обработано {offset + len(users)} пользователей: "
f"📦 Обработано {offset + len(subscriptions)} подписок: "
f"создано {stats['created']}, обновлено {stats['updated']}, ошибок {stats['errors']}"
)
if len(users) < batch_size:
if len(subscriptions) < batch_size:
break
offset += batch_size