feat: add traffic usage enrichment endpoint with devices, spending, dates, last node

Add GET /admin/traffic/enrichment that returns per-user enrichment data
(connected devices, total spending, subscription dates, last connected node)
via bulk panel API calls with 5-min server-side cache.
This commit is contained in:
Fringg
2026-02-08 21:49:42 +03:00
parent 2f90f9134d
commit 5cf3f2f76e
3 changed files with 165 additions and 2 deletions

View File

@@ -12,20 +12,22 @@ from aiogram.client.default import DefaultBotProperties
from aiogram.enums import ParseMode
from aiogram.types import BufferedInputFile
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy import select
from sqlalchemy import and_, func, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.config import settings
from app.database.models import Subscription, User
from app.database.models import Subscription, Transaction, TransactionType, User
from app.services.remnawave_service import RemnaWaveService
from ..dependencies import get_cabinet_db, get_current_admin_user
from ..schemas.traffic import (
ExportCsvRequest,
ExportCsvResponse,
TrafficEnrichmentResponse,
TrafficNodeInfo,
TrafficUsageResponse,
UserTrafficEnrichment,
UserTrafficItem,
)
@@ -346,6 +348,145 @@ async def get_traffic_usage(
)
# ============== Enrichment endpoint ==============
_enrichment_cache: dict[str, tuple[float, dict[int, UserTrafficEnrichment]]] = {}
_ENRICHMENT_CACHE_TTL = 300 # 5 minutes
_enrichment_lock = asyncio.Lock()
async def _get_bulk_spending(db: AsyncSession, user_ids: list[int]) -> dict[int, int]:
"""Get total spent kopeks for multiple users in a single query."""
if not user_ids:
return {}
result = await db.execute(
select(Transaction.user_id, func.coalesce(func.sum(Transaction.amount_kopeks), 0))
.where(
and_(
Transaction.user_id.in_(user_ids),
Transaction.is_completed.is_(True),
Transaction.type == TransactionType.SUBSCRIPTION_PAYMENT.value,
)
)
.group_by(Transaction.user_id)
)
return {row[0]: int(row[1]) for row in result.all()}
@router.get('/enrichment', response_model=TrafficEnrichmentResponse)
async def get_traffic_enrichment(
admin: User = Depends(get_current_admin_user),
db: AsyncSession = Depends(get_cabinet_db),
):
"""Return enrichment data: device counts, spending, dates, last node."""
cache_key = 'enrichment'
now = time.time()
cached = _enrichment_cache.get(cache_key)
if cached and (now - cached[0]) < _ENRICHMENT_CACHE_TTL:
return TrafficEnrichmentResponse(data=cached[1])
async with _enrichment_lock:
now = time.time()
cached = _enrichment_cache.get(cache_key)
if cached and (now - cached[0]) < _ENRICHMENT_CACHE_TTL:
return TrafficEnrichmentResponse(data=cached[1])
user_map = await _load_user_map(db)
# Build uuid -> user_id and short_uuid -> user_id maps
uuid_to_user_id: dict[str, int] = {}
short_uuid_to_user_id: dict[str, int] = {}
for uuid, user in user_map.items():
uuid_to_user_id[uuid] = user.id
if user.subscription and user.subscription.remnawave_short_uuid:
short_uuid_to_user_id[user.subscription.remnawave_short_uuid] = user.id
service = RemnaWaveService()
devices_by_user: dict[int, int] = {}
last_node_uuid_by_user: dict[int, str] = {}
node_uuid_to_name: dict[str, str] = {}
if service.is_configured:
try:
async with service.get_api_client() as api:
devices_task = api.get_all_hwid_devices()
subs_task = api.get_all_panel_subscriptions()
nodes_task = api.get_all_nodes()
devices_data, subs_data, nodes_list = await asyncio.gather(devices_task, subs_task, nodes_task)
except Exception:
logger.warning('Failed to fetch enrichment data from panel', exc_info=True)
devices_data = {'devices': []}
subs_data = []
nodes_list = []
# Build node name map
for node in nodes_list:
node_uuid_to_name[node.uuid] = node.name
# Count devices per user
for device in devices_data.get('devices', []):
user_uuid = device.get('userUuid', '')
uid = uuid_to_user_id.get(user_uuid)
if uid is not None:
devices_by_user[uid] = devices_by_user.get(uid, 0) + 1
# Extract last connected node from panel subscriptions
if isinstance(subs_data, list):
for sub in subs_data:
sub_user = sub.get('user') or sub
short_uuid = sub_user.get('shortUuid', '')
uid = short_uuid_to_user_id.get(short_uuid)
if uid is None:
user_uuid = sub_user.get('uuid', '')
uid = uuid_to_user_id.get(user_uuid)
if uid is not None:
user_traffic = sub_user.get('userTraffic') or {}
last_node = user_traffic.get('lastConnectedNodeUuid')
if last_node:
last_node_uuid_by_user[uid] = last_node
# Bulk spending stats
all_user_ids = [u.id for u in user_map.values()]
spending_map = await _get_bulk_spending(db, all_user_ids)
# Build enrichment data
enrichment: dict[int, UserTrafficEnrichment] = {}
for uuid, user in user_map.items():
uid = user.id
sub = user.subscription
start_date = None
end_date = None
if sub:
if sub.start_date:
start_date = sub.start_date.isoformat()
if sub.end_date:
end_date = sub.end_date.isoformat()
last_node_name = None
last_uuid = last_node_uuid_by_user.get(uid)
if last_uuid:
last_node_name = node_uuid_to_name.get(last_uuid)
enrichment[uid] = UserTrafficEnrichment(
devices_connected=devices_by_user.get(uid, 0),
total_spent_kopeks=spending_map.get(uid, 0),
subscription_start_date=start_date,
subscription_end_date=end_date,
last_node_name=last_node_name,
)
_enrichment_cache[cache_key] = (now, enrichment)
# Evict expired
expired = [k for k, (ts, _) in _enrichment_cache.items() if (now - ts) >= _ENRICHMENT_CACHE_TTL]
for k in expired:
del _enrichment_cache[k]
return TrafficEnrichmentResponse(data=enrichment)
@router.post('/export-csv', response_model=ExportCsvResponse)
async def export_traffic_csv(
request: ExportCsvRequest,

View File

@@ -33,6 +33,18 @@ class TrafficUsageResponse(BaseModel):
available_statuses: list[str]
class UserTrafficEnrichment(BaseModel):
devices_connected: int = 0
total_spent_kopeks: int = 0
subscription_start_date: str | None = None
subscription_end_date: str | None = None
last_node_name: str | None = None
class TrafficEnrichmentResponse(BaseModel):
data: dict[int, UserTrafficEnrichment]
class ExportCsvRequest(BaseModel):
period: int = Field(30, ge=1, le=30)
start_date: str | None = None

View File

@@ -999,6 +999,16 @@ class RemnaWaveAPI:
uuid=data['uuid'], name=data['name'], view_position=data['viewPosition'], config=data.get('config')
)
async def get_all_hwid_devices(self) -> dict[str, Any]:
"""GET /api/hwid/devices — all devices for all users."""
response = await self._make_request('GET', '/api/hwid/devices')
return response.get('response', {'devices': [], 'total': 0})
async def get_all_panel_subscriptions(self) -> list[dict[str, Any]]:
"""GET /api/subscriptions — all panel subscriptions."""
response = await self._make_request('GET', '/api/subscriptions')
return response.get('response') or []
async def get_user_devices(self, user_uuid: str) -> dict[str, Any]:
try:
response = await self._make_request('GET', f'/api/hwid/devices/{user_uuid}')