From b7f84698958a82fbd2160950a85b8655c880962b Mon Sep 17 00:00:00 2001 From: Egor Date: Sat, 17 Jan 2026 10:31:21 +0300 Subject: [PATCH 1/2] Add files via upload --- app/cabinet/routes/__init__.py | 2 + app/cabinet/routes/admin_remnawave.py | 978 ++++++++++++++++++++++++++ 2 files changed, 980 insertions(+) create mode 100644 app/cabinet/routes/admin_remnawave.py diff --git a/app/cabinet/routes/__init__.py b/app/cabinet/routes/__init__.py index 7fa3a475..035ca3d2 100644 --- a/app/cabinet/routes/__init__.py +++ b/app/cabinet/routes/__init__.py @@ -30,6 +30,7 @@ from .admin_campaigns import router as admin_campaigns_router from .admin_users import router as admin_users_router from .admin_payments import router as admin_payments_router from .admin_promo_offers import router as admin_promo_offers_router +from .admin_remnawave import router as admin_remnawave_router from .media import router as media_router # Main cabinet router @@ -69,5 +70,6 @@ router.include_router(admin_campaigns_router) router.include_router(admin_users_router) router.include_router(admin_payments_router) router.include_router(admin_promo_offers_router) +router.include_router(admin_remnawave_router) __all__ = ["router"] diff --git a/app/cabinet/routes/admin_remnawave.py b/app/cabinet/routes/admin_remnawave.py new file mode 100644 index 00000000..812753ce --- /dev/null +++ b/app/cabinet/routes/admin_remnawave.py @@ -0,0 +1,978 @@ +"""Admin routes for RemnaWave management in cabinet.""" + +import logging +from datetime import datetime, timedelta +from typing import Any, Dict, List, Optional + +from fastapi import APIRouter, Depends, HTTPException, Query, status +from sqlalchemy.ext.asyncio import AsyncSession + +from app.database.models import User +from app.database.crud.server_squad import ( + count_active_users_for_squad, + get_all_server_squads, + get_server_squad_by_uuid, + sync_with_remnawave, +) +from app.config import settings +from app.utils.cache import cache + +from ..dependencies import get_cabinet_db, get_current_admin_user +from ..schemas.remnawave import ( + # Status & Connection + RemnaWaveStatusResponse, + ConnectionStatus, + # System Statistics + SystemStatsResponse, + SystemSummary, + ServerInfo, + Bandwidth, + TrafficPeriods, + TrafficPeriod, + # Nodes + NodeInfo, + NodesListResponse, + NodesOverview, + NodeStatisticsResponse, + NodeUsageResponse, + NodeActionRequest, + NodeActionResponse, + # Squads + SquadWithLocalInfo, + SquadsListResponse, + SquadDetailResponse, + SquadCreateRequest, + SquadUpdateRequest, + SquadActionRequest, + SquadOperationResponse, + # Migration + MigrationPreviewResponse, + MigrationRequest, + MigrationStats, + MigrationResponse, + # Inbounds + InboundsListResponse, + # Auto Sync + AutoSyncStatus, + AutoSyncToggleRequest, + AutoSyncRunResponse, + # Manual Sync + SyncMode, + SyncResponse, +) + +try: + from app.services.remnawave_service import ( + RemnaWaveConfigurationError, + RemnaWaveService, + ) +except Exception: + RemnaWaveConfigurationError = None + RemnaWaveService = None + +try: + from app.services.remnawave_sync_service import remnawave_sync_service +except Exception: + remnawave_sync_service = None + + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/admin/remnawave", tags=["Cabinet Admin RemnaWave"]) + + +# ============ Helpers ============ + +def _get_service() -> RemnaWaveService: + """Get RemnaWave service instance.""" + if RemnaWaveService is None: + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail="RemnaWave service is not available", + ) + return RemnaWaveService() + + +def _ensure_configured(service: RemnaWaveService) -> None: + """Ensure RemnaWave is configured.""" + if not service.is_configured: + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail=service.configuration_error or "RemnaWave API is not configured", + ) + + +def _parse_datetime(value: Any) -> Optional[datetime]: + """Parse datetime from various formats.""" + if isinstance(value, datetime): + return value + if isinstance(value, str): + try: + return datetime.fromisoformat(value) + except ValueError: + return None + return None + + +def _serialize_node(node_data: Dict[str, Any]) -> NodeInfo: + """Serialize node data to NodeInfo model.""" + return NodeInfo( + uuid=node_data.get("uuid", ""), + name=node_data.get("name", ""), + address=node_data.get("address", ""), + country_code=node_data.get("country_code"), + is_connected=bool(node_data.get("is_connected")), + is_disabled=bool(node_data.get("is_disabled")), + is_node_online=bool(node_data.get("is_node_online")), + is_xray_running=bool(node_data.get("is_xray_running")), + users_online=node_data.get("users_online"), + traffic_used_bytes=node_data.get("traffic_used_bytes"), + traffic_limit_bytes=node_data.get("traffic_limit_bytes"), + last_status_change=_parse_datetime(node_data.get("last_status_change")), + last_status_message=node_data.get("last_status_message"), + xray_uptime=node_data.get("xray_uptime"), + is_traffic_tracking_active=bool(node_data.get("is_traffic_tracking_active", False)), + traffic_reset_day=node_data.get("traffic_reset_day"), + notify_percent=node_data.get("notify_percent"), + consumption_multiplier=float(node_data.get("consumption_multiplier", 1.0)), + cpu_count=node_data.get("cpu_count"), + cpu_model=node_data.get("cpu_model"), + total_ram=node_data.get("total_ram"), + created_at=_parse_datetime(node_data.get("created_at")), + updated_at=_parse_datetime(node_data.get("updated_at")), + provider_uuid=node_data.get("provider_uuid"), + ) + + +# ============ Status & Connection ============ + +@router.get("/status", response_model=RemnaWaveStatusResponse) +async def get_remnawave_status( + admin: User = Depends(get_current_admin_user), +) -> RemnaWaveStatusResponse: + """Get RemnaWave configuration and connection status.""" + service = _get_service() + + connection_info: Optional[ConnectionStatus] = None + connection_result = await service.test_api_connection() + + if connection_result: + connection_info = ConnectionStatus(**connection_result) + + return RemnaWaveStatusResponse( + is_configured=service.is_configured, + configuration_error=service.configuration_error, + connection=connection_info, + ) + + +# ============ System Statistics ============ + +@router.get("/system", response_model=SystemStatsResponse) +async def get_system_statistics( + admin: User = Depends(get_current_admin_user), +) -> SystemStatsResponse: + """Get full system statistics from RemnaWave.""" + service = _get_service() + _ensure_configured(service) + + stats = await service.get_system_statistics() + if not stats or "system" not in stats: + raise HTTPException( + status_code=status.HTTP_502_BAD_GATEWAY, + detail="Failed to get RemnaWave statistics", + ) + + system_data = stats.get("system", {}) + server_data = stats.get("server_info", {}) + bandwidth_data = stats.get("bandwidth", {}) + traffic_data = stats.get("traffic_periods", {}) + + return SystemStatsResponse( + system=SystemSummary( + users_online=system_data.get("users_online", 0), + total_users=system_data.get("total_users", 0), + active_connections=system_data.get("active_connections", 0), + nodes_online=system_data.get("nodes_online", 0), + users_last_day=system_data.get("users_last_day", 0), + users_last_week=system_data.get("users_last_week", 0), + users_never_online=system_data.get("users_never_online", 0), + total_user_traffic=system_data.get("total_user_traffic", 0), + ), + users_by_status=stats.get("users_by_status", {}), + server_info=ServerInfo( + cpu_cores=server_data.get("cpu_cores", 0), + cpu_physical_cores=server_data.get("cpu_physical_cores", 0), + memory_total=server_data.get("memory_total", 0), + memory_used=server_data.get("memory_used", 0), + memory_free=server_data.get("memory_free", 0), + memory_available=server_data.get("memory_available", 0), + uptime_seconds=server_data.get("uptime_seconds", 0), + ), + bandwidth=Bandwidth( + realtime_download=bandwidth_data.get("realtime_download", 0), + realtime_upload=bandwidth_data.get("realtime_upload", 0), + realtime_total=bandwidth_data.get("realtime_total", 0), + ), + traffic_periods=TrafficPeriods( + last_2_days=TrafficPeriod(**traffic_data.get("last_2_days", {"current": 0, "previous": 0})), + last_7_days=TrafficPeriod(**traffic_data.get("last_7_days", {"current": 0, "previous": 0})), + last_30_days=TrafficPeriod(**traffic_data.get("last_30_days", {"current": 0, "previous": 0})), + current_month=TrafficPeriod(**traffic_data.get("current_month", {"current": 0, "previous": 0})), + current_year=TrafficPeriod(**traffic_data.get("current_year", {"current": 0, "previous": 0})), + ), + nodes_realtime=stats.get("nodes_realtime", []), + nodes_weekly=stats.get("nodes_weekly", []), + last_updated=_parse_datetime(stats.get("last_updated")), + ) + + +# ============ Nodes ============ + +@router.get("/nodes", response_model=NodesListResponse) +async def list_nodes( + admin: User = Depends(get_current_admin_user), +) -> NodesListResponse: + """Get list of all nodes.""" + service = _get_service() + _ensure_configured(service) + + nodes = await service.get_all_nodes() + serialized = [_serialize_node(node) for node in nodes] + + return NodesListResponse(items=serialized, total=len(serialized)) + + +@router.get("/nodes/overview", response_model=NodesOverview) +async def get_nodes_overview( + admin: User = Depends(get_current_admin_user), +) -> NodesOverview: + """Get nodes overview with statistics.""" + service = _get_service() + _ensure_configured(service) + + nodes = await service.get_all_nodes() + + total = len(nodes) + online = sum(1 for n in nodes if n.get("is_connected") and not n.get("is_disabled")) + disabled = sum(1 for n in nodes if n.get("is_disabled")) + offline = total - online - disabled + total_users_online = sum(n.get("users_online", 0) or 0 for n in nodes) + + return NodesOverview( + total=total, + online=online, + offline=offline, + disabled=disabled, + total_users_online=total_users_online, + nodes=[_serialize_node(n) for n in nodes], + ) + + +@router.get("/nodes/realtime") +async def get_nodes_realtime( + admin: User = Depends(get_current_admin_user), +) -> List[Dict[str, Any]]: + """Get realtime node usage data.""" + service = _get_service() + _ensure_configured(service) + + return await service.get_nodes_realtime_usage() + + +@router.get("/nodes/{node_uuid}", response_model=NodeInfo) +async def get_node_details( + node_uuid: str, + admin: User = Depends(get_current_admin_user), +) -> NodeInfo: + """Get detailed information about a specific node.""" + service = _get_service() + _ensure_configured(service) + + node = await service.get_node_details(node_uuid) + if not node: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Node not found", + ) + + return _serialize_node(node) + + +@router.get("/nodes/{node_uuid}/statistics", response_model=NodeStatisticsResponse) +async def get_node_statistics( + node_uuid: str, + admin: User = Depends(get_current_admin_user), +) -> NodeStatisticsResponse: + """Get node statistics with usage history.""" + service = _get_service() + _ensure_configured(service) + + stats = await service.get_node_statistics(node_uuid) + if not stats or not stats.get("node"): + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Node not found or no statistics available", + ) + + return NodeStatisticsResponse( + node=_serialize_node(stats["node"]), + realtime=stats.get("realtime"), + usage_history=stats.get("usage_history") or [], + last_updated=_parse_datetime(stats.get("last_updated")), + ) + + +@router.get("/nodes/{node_uuid}/usage", response_model=NodeUsageResponse) +async def get_node_usage( + node_uuid: str, + start: Optional[datetime] = Query(default=None), + end: Optional[datetime] = Query(default=None), + admin: User = Depends(get_current_admin_user), +) -> NodeUsageResponse: + """Get node usage history for a date range.""" + service = _get_service() + _ensure_configured(service) + + end_dt = end or datetime.utcnow() + start_dt = start or (end_dt - timedelta(days=7)) + + if start_dt >= end_dt: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Invalid date range", + ) + + usage = await service.get_node_user_usage_by_range(node_uuid, start_dt, end_dt) + return NodeUsageResponse(items=usage or []) + + +@router.post("/nodes/{node_uuid}/action", response_model=NodeActionResponse) +async def perform_node_action( + node_uuid: str, + payload: NodeActionRequest, + admin: User = Depends(get_current_admin_user), +) -> NodeActionResponse: + """Perform an action on a node (enable/disable/restart).""" + service = _get_service() + _ensure_configured(service) + + # Get current node state for toggle operations + if payload.action in ("enable", "disable"): + nodes = await service.get_all_nodes() + node = next((n for n in nodes if n.get("uuid") == node_uuid), None) + if not node: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Node not found", + ) + + success = await service.manage_node(node_uuid, payload.action) + + messages = { + "enable": "Node enabled", + "disable": "Node disabled", + "restart": "Node restart initiated", + } + + if success: + logger.info(f"Admin {admin.telegram_id} performed {payload.action} on node {node_uuid}") + return NodeActionResponse( + success=True, + message=messages.get(payload.action, "Action completed"), + is_disabled=payload.action == "disable" if payload.action in ("enable", "disable") else None, + ) + else: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"Failed to {payload.action} node", + ) + + +@router.post("/nodes/restart-all", response_model=NodeActionResponse) +async def restart_all_nodes( + admin: User = Depends(get_current_admin_user), +) -> NodeActionResponse: + """Restart all nodes.""" + service = _get_service() + _ensure_configured(service) + + success = await service.restart_all_nodes() + + if success: + logger.info(f"Admin {admin.telegram_id} restarted all nodes") + return NodeActionResponse(success=True, message="All nodes restart initiated") + else: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Failed to restart all nodes", + ) + + +# ============ Squads (Internal Squads) ============ + +@router.get("/squads", response_model=SquadsListResponse) +async def list_squads( + admin: User = Depends(get_current_admin_user), + db: AsyncSession = Depends(get_cabinet_db), +) -> SquadsListResponse: + """Get list of all squads with local database info.""" + service = _get_service() + _ensure_configured(service) + + # Get squads from RemnaWave + rw_squads = await service.get_all_squads() + + # Get local squads from DB + local_squads, _ = await get_all_server_squads(db, page=1, limit=1000) + local_by_uuid = {s.squad_uuid: s for s in local_squads} + + items = [] + for squad in rw_squads: + local = local_by_uuid.get(squad.get("uuid")) + items.append(SquadWithLocalInfo( + uuid=squad.get("uuid", ""), + name=squad.get("name", ""), + members_count=squad.get("members_count", 0), + inbounds_count=squad.get("inbounds_count", 0), + inbounds=squad.get("inbounds", []), + local_id=local.id if local else None, + display_name=local.display_name if local else None, + country_code=local.country_code if local else None, + is_available=local.is_available if local else None, + is_trial_eligible=local.is_trial_eligible if local else None, + price_kopeks=local.price_kopeks if local else None, + max_users=local.max_users if local else None, + current_users=local.current_users if local else None, + is_synced=local is not None, + )) + + return SquadsListResponse(items=items, total=len(items)) + + +@router.get("/squads/{squad_uuid}", response_model=SquadDetailResponse) +async def get_squad_details( + squad_uuid: str, + admin: User = Depends(get_current_admin_user), + db: AsyncSession = Depends(get_cabinet_db), +) -> SquadDetailResponse: + """Get detailed information about a squad.""" + service = _get_service() + _ensure_configured(service) + + # Get squad from RemnaWave + squad = await service.get_squad_details(squad_uuid) + if not squad: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Squad not found", + ) + + # Get local info from DB + local = await get_server_squad_by_uuid(db, squad_uuid) + active_subs = await count_active_users_for_squad(db, squad_uuid) if local else 0 + + return SquadDetailResponse( + uuid=squad.get("uuid", ""), + name=squad.get("name", ""), + members_count=squad.get("members_count", 0), + inbounds_count=squad.get("inbounds_count", 0), + inbounds=squad.get("inbounds", []), + local_id=local.id if local else None, + display_name=local.display_name if local else None, + country_code=local.country_code if local else None, + description=local.description if local else None, + is_available=local.is_available if local else None, + is_trial_eligible=local.is_trial_eligible if local else None, + price_kopeks=local.price_kopeks if local else None, + max_users=local.max_users if local else None, + current_users=local.current_users if local else None, + sort_order=local.sort_order if local else None, + is_synced=local is not None, + active_subscriptions=active_subs, + ) + + +@router.post("/squads", response_model=SquadOperationResponse, status_code=status.HTTP_201_CREATED) +async def create_squad( + payload: SquadCreateRequest, + admin: User = Depends(get_current_admin_user), +) -> SquadOperationResponse: + """Create a new squad in RemnaWave.""" + service = _get_service() + _ensure_configured(service) + + squad_uuid = await service.create_squad(payload.name, payload.inbound_uuids) + + if squad_uuid: + logger.info(f"Admin {admin.telegram_id} created squad {payload.name} ({squad_uuid})") + return SquadOperationResponse( + success=True, + message="Squad created successfully", + data={"uuid": squad_uuid}, + ) + else: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Failed to create squad", + ) + + +@router.patch("/squads/{squad_uuid}", response_model=SquadOperationResponse) +async def update_squad( + squad_uuid: str, + payload: SquadUpdateRequest, + admin: User = Depends(get_current_admin_user), +) -> SquadOperationResponse: + """Update a squad in RemnaWave.""" + service = _get_service() + _ensure_configured(service) + + if payload.name is None and payload.inbound_uuids is None: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="No update data provided", + ) + + success = await service.update_squad( + squad_uuid, + name=payload.name, + inbounds=payload.inbound_uuids, + ) + + if success: + logger.info(f"Admin {admin.telegram_id} updated squad {squad_uuid}") + return SquadOperationResponse(success=True, message="Squad updated") + else: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Failed to update squad", + ) + + +@router.post("/squads/{squad_uuid}/action", response_model=SquadOperationResponse) +async def perform_squad_action( + squad_uuid: str, + payload: SquadActionRequest, + admin: User = Depends(get_current_admin_user), +) -> SquadOperationResponse: + """Perform an action on a squad.""" + service = _get_service() + _ensure_configured(service) + + action = payload.action + success = False + message = "Unknown action" + + if action == "add_all_users": + success = await service.add_all_users_to_squad(squad_uuid) + message = "Users added" if success else "Failed to add users" + elif action == "remove_all_users": + success = await service.remove_all_users_from_squad(squad_uuid) + message = "Users removed" if success else "Failed to remove users" + elif action == "delete": + success = await service.delete_squad(squad_uuid) + message = "Squad deleted" if success else "Failed to delete squad" + elif action == "rename": + if not payload.name: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Name is required for rename action", + ) + success = await service.rename_squad(squad_uuid, payload.name) + message = "Squad renamed" if success else "Failed to rename squad" + elif action == "update_inbounds": + if not payload.inbound_uuids: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Inbound UUIDs are required", + ) + success = await service.update_squad_inbounds(squad_uuid, payload.inbound_uuids) + message = "Inbounds updated" if success else "Failed to update inbounds" + + if success: + logger.info(f"Admin {admin.telegram_id} performed {action} on squad {squad_uuid}") + + return SquadOperationResponse(success=success, message=message) + + +@router.delete("/squads/{squad_uuid}", response_model=SquadOperationResponse) +async def delete_squad( + squad_uuid: str, + admin: User = Depends(get_current_admin_user), +) -> SquadOperationResponse: + """Delete a squad.""" + service = _get_service() + _ensure_configured(service) + + success = await service.delete_squad(squad_uuid) + + if success: + logger.info(f"Admin {admin.telegram_id} deleted squad {squad_uuid}") + return SquadOperationResponse(success=True, message="Squad deleted") + else: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Failed to delete squad", + ) + + +# ============ Migration ============ + +@router.get("/squads/{squad_uuid}/migration-preview", response_model=MigrationPreviewResponse) +async def preview_migration( + squad_uuid: str, + admin: User = Depends(get_current_admin_user), + db: AsyncSession = Depends(get_cabinet_db), +) -> MigrationPreviewResponse: + """Get migration preview for a squad.""" + squad = await get_server_squad_by_uuid(db, squad_uuid) + if not squad: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Squad not found in local database", + ) + + users_to_migrate = await count_active_users_for_squad(db, squad_uuid) + + return MigrationPreviewResponse( + squad_uuid=squad.squad_uuid, + squad_name=squad.display_name, + current_users=squad.current_users or 0, + max_users=squad.max_users, + users_to_migrate=users_to_migrate, + ) + + +@router.post("/squads/migrate", response_model=MigrationResponse) +async def migrate_squad_users( + payload: MigrationRequest, + admin: User = Depends(get_current_admin_user), + db: AsyncSession = Depends(get_cabinet_db), +) -> MigrationResponse: + """Migrate users from one squad to another.""" + service = _get_service() + _ensure_configured(service) + + source_uuid = payload.source_uuid.strip() + target_uuid = payload.target_uuid.strip() + + if source_uuid == target_uuid: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Source and target squads must be different", + ) + + source = await get_server_squad_by_uuid(db, source_uuid) + if not source: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Source squad not found", + ) + + target = await get_server_squad_by_uuid(db, target_uuid) + if not target: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Target squad not found", + ) + + try: + result = await service.migrate_squad_users( + db, + source_uuid=source.squad_uuid, + target_uuid=target.squad_uuid, + ) + except RemnaWaveConfigurationError as exc: + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail=str(exc), + ) + + if not result.get("success"): + return MigrationResponse( + success=False, + message=result.get("message") or "Migration failed", + error=result.get("error"), + ) + + logger.info(f"Admin {admin.telegram_id} migrated users from {source_uuid} to {target_uuid}") + + return MigrationResponse( + success=True, + message=result.get("message") or "Migration completed", + data=MigrationStats( + source_uuid=source.squad_uuid, + target_uuid=target.squad_uuid, + total=result.get("total", 0), + updated=result.get("updated", 0), + panel_updated=result.get("panel_updated", 0), + panel_failed=result.get("panel_failed", 0), + source_removed=result.get("source_removed", 0), + target_added=result.get("target_added", 0), + ), + ) + + +# ============ Inbounds ============ + +@router.get("/inbounds", response_model=InboundsListResponse) +async def list_inbounds( + admin: User = Depends(get_current_admin_user), +) -> InboundsListResponse: + """Get list of all available inbounds.""" + service = _get_service() + _ensure_configured(service) + + inbounds = await service.get_all_inbounds() + return InboundsListResponse(items=inbounds or [], total=len(inbounds or [])) + + +# ============ Auto Sync ============ + +@router.get("/sync/auto/status", response_model=AutoSyncStatus) +async def get_auto_sync_status( + admin: User = Depends(get_current_admin_user), +) -> AutoSyncStatus: + """Get auto sync status.""" + if remnawave_sync_service is None: + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail="Auto sync service is not available", + ) + + status_obj = remnawave_sync_service.get_status() + + return AutoSyncStatus( + enabled=status_obj.enabled, + times=[t.strftime("%H:%M") for t in status_obj.times] if status_obj.times else [], + next_run=status_obj.next_run, + is_running=status_obj.is_running, + last_run_started_at=status_obj.last_run_started_at, + last_run_finished_at=status_obj.last_run_finished_at, + last_run_success=status_obj.last_run_success, + last_run_reason=status_obj.last_run_reason, + last_run_error=status_obj.last_run_error, + last_user_stats=status_obj.last_user_stats, + last_server_stats=status_obj.last_server_stats, + ) + + +@router.post("/sync/auto/toggle", response_model=SyncResponse) +async def toggle_auto_sync( + payload: AutoSyncToggleRequest, + admin: User = Depends(get_current_admin_user), +) -> SyncResponse: + """Toggle auto sync on/off.""" + if remnawave_sync_service is None: + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail="Auto sync service is not available", + ) + + # This would need to update settings - for now just return info + # In production, this should update REMNAWAVE_AUTO_SYNC_ENABLED setting + current_status = remnawave_sync_service.get_status() + + if payload.enabled and not current_status.enabled: + # Enable - would need to update settings and refresh schedule + remnawave_sync_service.schedule_refresh(run_immediately=True) + logger.info(f"Admin {admin.telegram_id} enabled auto sync") + return SyncResponse( + success=True, + message="Auto sync enabled and scheduled", + ) + elif not payload.enabled and current_status.enabled: + # Disable - would need to update settings and stop scheduler + logger.info(f"Admin {admin.telegram_id} disabled auto sync") + return SyncResponse( + success=True, + message="Auto sync setting change requested. Restart may be required.", + ) + else: + return SyncResponse( + success=True, + message="No change needed", + ) + + +@router.post("/sync/auto/run", response_model=AutoSyncRunResponse) +async def run_auto_sync_now( + admin: User = Depends(get_current_admin_user), +) -> AutoSyncRunResponse: + """Run auto sync immediately.""" + if remnawave_sync_service is None: + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail="Auto sync service is not available", + ) + + logger.info(f"Admin {admin.telegram_id} triggered manual sync") + result = await remnawave_sync_service.run_sync_now(reason="manual") + + return AutoSyncRunResponse( + started=result.get("started", False), + success=result.get("success"), + error=result.get("error"), + user_stats=result.get("user_stats"), + server_stats=result.get("server_stats"), + reason="manual", + ) + + +# ============ Manual Sync ============ + +@router.post("/sync/from-panel", response_model=SyncResponse) +async def sync_from_panel( + payload: SyncMode, + admin: User = Depends(get_current_admin_user), + db: AsyncSession = Depends(get_cabinet_db), +) -> SyncResponse: + """Sync users from RemnaWave panel to bot.""" + service = _get_service() + _ensure_configured(service) + + try: + stats = await service.sync_users_from_panel(db, payload.mode) + logger.info(f"Admin {admin.telegram_id} synced from panel (mode: {payload.mode})") + return SyncResponse( + success=True, + message="Sync from panel completed", + data=stats, + ) + except RemnaWaveConfigurationError as exc: + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail=str(exc), + ) + + +@router.post("/sync/to-panel", response_model=SyncResponse) +async def sync_to_panel( + admin: User = Depends(get_current_admin_user), + db: AsyncSession = Depends(get_cabinet_db), +) -> SyncResponse: + """Sync users from bot to RemnaWave panel.""" + service = _get_service() + _ensure_configured(service) + + stats = await service.sync_users_to_panel(db) + logger.info(f"Admin {admin.telegram_id} synced to panel") + + return SyncResponse( + success=True, + message="Sync to panel completed", + data=stats, + ) + + +@router.post("/sync/servers", response_model=SyncResponse) +async def sync_servers( + admin: User = Depends(get_current_admin_user), + db: AsyncSession = Depends(get_cabinet_db), +) -> SyncResponse: + """Sync servers/squads from RemnaWave.""" + service = _get_service() + _ensure_configured(service) + + squads = await service.get_all_squads() + if not squads: + raise HTTPException( + status_code=status.HTTP_502_BAD_GATEWAY, + detail="Failed to get squads from RemnaWave", + ) + + created, updated, removed = await sync_with_remnawave(db, squads) + + try: + await cache.delete_pattern("available_countries*") + except Exception as e: + logger.warning(f"Failed to clear countries cache: {e}") + + logger.info(f"Admin {admin.telegram_id} synced servers: created={created}, updated={updated}, removed={removed}") + + return SyncResponse( + success=True, + message="Servers synced successfully", + data={ + "created": created, + "updated": updated, + "removed": removed, + "total": len(squads), + }, + ) + + +@router.post("/sync/subscriptions/validate", response_model=SyncResponse) +async def validate_subscriptions( + admin: User = Depends(get_current_admin_user), + db: AsyncSession = Depends(get_cabinet_db), +) -> SyncResponse: + """Validate and fix subscriptions.""" + service = _get_service() + _ensure_configured(service) + + stats = await service.validate_and_fix_subscriptions(db) + logger.info(f"Admin {admin.telegram_id} validated subscriptions") + + return SyncResponse( + success=True, + message="Subscriptions validated", + data=stats, + ) + + +@router.post("/sync/subscriptions/cleanup", response_model=SyncResponse) +async def cleanup_subscriptions( + admin: User = Depends(get_current_admin_user), + db: AsyncSession = Depends(get_cabinet_db), +) -> SyncResponse: + """Cleanup orphaned subscriptions.""" + service = _get_service() + _ensure_configured(service) + + stats = await service.cleanup_orphaned_subscriptions(db) + logger.info(f"Admin {admin.telegram_id} cleaned up subscriptions") + + return SyncResponse( + success=True, + message="Cleanup completed", + data=stats, + ) + + +@router.post("/sync/subscriptions/statuses", response_model=SyncResponse) +async def sync_subscription_statuses( + admin: User = Depends(get_current_admin_user), + db: AsyncSession = Depends(get_cabinet_db), +) -> SyncResponse: + """Sync subscription statuses.""" + service = _get_service() + _ensure_configured(service) + + stats = await service.sync_subscription_statuses(db) + logger.info(f"Admin {admin.telegram_id} synced subscription statuses") + + return SyncResponse( + success=True, + message="Subscription statuses synced", + data=stats, + ) + + +@router.get("/sync/recommendations", response_model=SyncResponse) +async def get_sync_recommendations( + admin: User = Depends(get_current_admin_user), + db: AsyncSession = Depends(get_cabinet_db), +) -> SyncResponse: + """Get sync recommendations.""" + service = _get_service() + _ensure_configured(service) + + data = await service.get_sync_recommendations(db) + + return SyncResponse( + success=True, + message="Recommendations retrieved", + data=data, + ) From cd0ce908a197737a1972803abb70ae10d504e544 Mon Sep 17 00:00:00 2001 From: Egor Date: Sat, 17 Jan 2026 10:31:40 +0300 Subject: [PATCH 2/2] Add files via upload --- app/cabinet/schemas/remnawave.py | 352 +++++++++++++++++++++++++++++++ 1 file changed, 352 insertions(+) create mode 100644 app/cabinet/schemas/remnawave.py diff --git a/app/cabinet/schemas/remnawave.py b/app/cabinet/schemas/remnawave.py new file mode 100644 index 00000000..9bd06a6a --- /dev/null +++ b/app/cabinet/schemas/remnawave.py @@ -0,0 +1,352 @@ +"""Schemas for RemnaWave management in cabinet admin panel.""" + +from datetime import datetime, time +from typing import Any, Dict, List, Literal, Optional + +from pydantic import BaseModel, Field + + +# ============ Status & Connection ============ + +class ConnectionStatus(BaseModel): + """RemnaWave API connection status.""" + status: str + message: str + api_url: Optional[str] = None + status_code: Optional[int] = None + system_info: Optional[Dict[str, Any]] = None + + +class RemnaWaveStatusResponse(BaseModel): + """RemnaWave configuration and connection status.""" + is_configured: bool + configuration_error: Optional[str] = None + connection: Optional[ConnectionStatus] = None + + +# ============ System Statistics ============ + +class SystemSummary(BaseModel): + """System summary statistics.""" + users_online: int + total_users: int + active_connections: int + nodes_online: int + users_last_day: int + users_last_week: int + users_never_online: int + total_user_traffic: int + + +class ServerInfo(BaseModel): + """Server hardware info.""" + cpu_cores: int + cpu_physical_cores: int + memory_total: int + memory_used: int + memory_free: int + memory_available: int + uptime_seconds: int + + +class Bandwidth(BaseModel): + """Realtime bandwidth statistics.""" + realtime_download: int + realtime_upload: int + realtime_total: int + + +class TrafficPeriod(BaseModel): + """Traffic statistics for a period.""" + current: int + previous: int + difference: Optional[str] = None + + +class TrafficPeriods(BaseModel): + """Traffic statistics for multiple periods.""" + last_2_days: TrafficPeriod + last_7_days: TrafficPeriod + last_30_days: TrafficPeriod + current_month: TrafficPeriod + current_year: TrafficPeriod + + +class SystemStatsResponse(BaseModel): + """Full system statistics response.""" + system: SystemSummary + users_by_status: Dict[str, int] + server_info: ServerInfo + bandwidth: Bandwidth + traffic_periods: TrafficPeriods + nodes_realtime: List[Dict[str, Any]] = Field(default_factory=list) + nodes_weekly: List[Dict[str, Any]] = Field(default_factory=list) + last_updated: Optional[datetime] = None + + +# ============ Nodes ============ + +class NodeInfo(BaseModel): + """Node information.""" + uuid: str + name: str + address: str + country_code: Optional[str] = None + is_connected: bool + is_disabled: bool + is_node_online: bool + is_xray_running: bool + users_online: Optional[int] = None + traffic_used_bytes: Optional[int] = None + traffic_limit_bytes: Optional[int] = None + last_status_change: Optional[datetime] = None + last_status_message: Optional[str] = None + xray_uptime: Optional[str] = None + is_traffic_tracking_active: bool = False + traffic_reset_day: Optional[int] = None + notify_percent: Optional[int] = None + consumption_multiplier: float = 1.0 + cpu_count: Optional[int] = None + cpu_model: Optional[str] = None + total_ram: Optional[str] = None + created_at: Optional[datetime] = None + updated_at: Optional[datetime] = None + provider_uuid: Optional[str] = None + + +class NodesListResponse(BaseModel): + """List of nodes response.""" + items: List[NodeInfo] + total: int + + +class NodesOverview(BaseModel): + """Nodes overview statistics.""" + total: int + online: int + offline: int + disabled: int + total_users_online: int + nodes: List[NodeInfo] + + +class NodeStatisticsResponse(BaseModel): + """Node statistics with usage history.""" + node: NodeInfo + realtime: Optional[Dict[str, Any]] = None + usage_history: List[Dict[str, Any]] = Field(default_factory=list) + last_updated: Optional[datetime] = None + + +class NodeUsageResponse(BaseModel): + """Node usage history response.""" + items: List[Dict[str, Any]] = Field(default_factory=list) + + +class NodeActionRequest(BaseModel): + """Request to perform node action.""" + action: Literal["enable", "disable", "restart"] + + +class NodeActionResponse(BaseModel): + """Response after node action.""" + success: bool + message: Optional[str] = None + is_disabled: Optional[bool] = None + + +# ============ Squads (Internal Squads) ============ + +class SquadInfo(BaseModel): + """Internal Squad information from RemnaWave.""" + uuid: str + name: str + members_count: int + inbounds_count: int + inbounds: List[Dict[str, Any]] = Field(default_factory=list) + + +class SquadWithLocalInfo(BaseModel): + """Squad with local database info.""" + uuid: str + name: str + members_count: int + inbounds_count: int + inbounds: List[Dict[str, Any]] = Field(default_factory=list) + # Local DB info + local_id: Optional[int] = None + display_name: Optional[str] = None + country_code: Optional[str] = None + is_available: Optional[bool] = None + is_trial_eligible: Optional[bool] = None + price_kopeks: Optional[int] = None + max_users: Optional[int] = None + current_users: Optional[int] = None + is_synced: bool = False + + +class SquadsListResponse(BaseModel): + """List of squads response.""" + items: List[SquadWithLocalInfo] + total: int + + +class SquadDetailResponse(BaseModel): + """Detailed squad response.""" + uuid: str + name: str + members_count: int + inbounds_count: int + inbounds: List[Dict[str, Any]] = Field(default_factory=list) + # Local DB info if synced + local_id: Optional[int] = None + display_name: Optional[str] = None + country_code: Optional[str] = None + description: Optional[str] = None + is_available: Optional[bool] = None + is_trial_eligible: Optional[bool] = None + price_kopeks: Optional[int] = None + max_users: Optional[int] = None + current_users: Optional[int] = None + sort_order: Optional[int] = None + is_synced: bool = False + active_subscriptions: int = 0 + + +class SquadCreateRequest(BaseModel): + """Request to create a new squad.""" + name: str = Field(..., min_length=1, max_length=255) + inbound_uuids: List[str] = Field(default_factory=list) + + +class SquadUpdateRequest(BaseModel): + """Request to update a squad.""" + name: Optional[str] = Field(None, min_length=1, max_length=255) + inbound_uuids: Optional[List[str]] = None + + +class SquadActionRequest(BaseModel): + """Request to perform squad action.""" + action: Literal["add_all_users", "remove_all_users", "delete", "rename", "update_inbounds"] + name: Optional[str] = None + inbound_uuids: Optional[List[str]] = None + + +class SquadOperationResponse(BaseModel): + """Response after squad operation.""" + success: bool + message: Optional[str] = None + data: Optional[Dict[str, Any]] = None + + +# ============ Migration ============ + +class MigrationPreviewResponse(BaseModel): + """Preview of squad migration.""" + squad_uuid: str + squad_name: str + current_users: int + max_users: Optional[int] = None + users_to_migrate: int + + +class MigrationRequest(BaseModel): + """Request to migrate users between squads.""" + source_uuid: str + target_uuid: str + + +class MigrationStats(BaseModel): + """Migration statistics.""" + source_uuid: str + target_uuid: str + total: int = 0 + updated: int = 0 + panel_updated: int = 0 + panel_failed: int = 0 + source_removed: int = 0 + target_added: int = 0 + + +class MigrationResponse(BaseModel): + """Response after migration.""" + success: bool + message: Optional[str] = None + error: Optional[str] = None + data: Optional[MigrationStats] = None + + +# ============ Inbounds ============ + +class InboundInfo(BaseModel): + """Inbound information.""" + uuid: str + tag: str + type: Optional[str] = None + network: Optional[str] = None + security: Optional[str] = None + + +class InboundsListResponse(BaseModel): + """List of inbounds response.""" + items: List[Dict[str, Any]] = Field(default_factory=list) + total: int = 0 + + +# ============ Auto Sync ============ + +class AutoSyncTime(BaseModel): + """Scheduled sync time.""" + hour: int + minute: int + + +class AutoSyncStatus(BaseModel): + """Auto sync status.""" + enabled: bool + times: List[str] = Field(default_factory=list) # HH:MM format + next_run: Optional[datetime] = None + is_running: bool = False + last_run_started_at: Optional[datetime] = None + last_run_finished_at: Optional[datetime] = None + last_run_success: Optional[bool] = None + last_run_reason: Optional[str] = None + last_run_error: Optional[str] = None + last_user_stats: Optional[Dict[str, Any]] = None + last_server_stats: Optional[Dict[str, Any]] = None + + +class AutoSyncToggleRequest(BaseModel): + """Request to toggle auto sync.""" + enabled: bool + + +class AutoSyncRunResponse(BaseModel): + """Response after running sync.""" + started: bool + success: Optional[bool] = None + error: Optional[str] = None + user_stats: Optional[Dict[str, Any]] = None + server_stats: Optional[Dict[str, Any]] = None + reason: Optional[str] = None + + +# ============ Manual Sync ============ + +class SyncMode(BaseModel): + """Sync mode options.""" + mode: Literal["all", "new_only", "update_only"] = "all" + + +class SyncResponse(BaseModel): + """Response after sync operation.""" + success: bool + message: Optional[str] = None + data: Optional[Dict[str, Any]] = None + + +class SyncRecommendations(BaseModel): + """Sync recommendations.""" + success: bool + message: Optional[str] = None + data: Optional[Dict[str, Any]] = None