Интеграция системы мониторинга банов

This commit is contained in:
PEDZEO
2026-01-16 15:17:02 +03:00
parent fed3fda83e
commit b392a99f56
6 changed files with 1138 additions and 0 deletions

View File

@@ -705,6 +705,17 @@ APP_CONFIG_PATH=app-config.json
ENABLE_DEEP_LINKS=true
APP_CONFIG_CACHE_TTL=3600
# ===== BAN SYSTEM INTEGRATION (BedolagaBan) =====
# Интеграция с системой мониторинга банов BedolagaBan
# Включить интеграцию с Ban системой
BAN_SYSTEM_ENABLED=false
# URL API сервера Ban системы (например: http://ban-server:8000)
BAN_SYSTEM_API_URL=
# API токен для авторизации в Ban системе
BAN_SYSTEM_API_TOKEN=
# Таймаут запросов к API (секунды)
BAN_SYSTEM_REQUEST_TIMEOUT=30
# ===== СИСТЕМА БЕКАПОВ =====
BACKUP_AUTO_ENABLED=true
BACKUP_INTERVAL_HOURS=24

View File

@@ -22,6 +22,7 @@ from .admin_wheel import router as admin_wheel_router
from .admin_tariffs import router as admin_tariffs_router
from .admin_servers import router as admin_servers_router
from .admin_stats import router as admin_stats_router
from .admin_ban_system import router as admin_ban_system_router
from .media import router as media_router
# Main cabinet router
@@ -53,5 +54,6 @@ router.include_router(admin_wheel_router)
router.include_router(admin_tariffs_router)
router.include_router(admin_servers_router)
router.include_router(admin_stats_router)
router.include_router(admin_ban_system_router)
__all__ = ["router"]

View File

@@ -0,0 +1,512 @@
"""Admin routes for Ban System monitoring in cabinet."""
import logging
from typing import Optional, List, Any
from fastapi import APIRouter, Depends, HTTPException, status, Query
from app.config import settings
from app.database.models import User
from app.external.ban_system_api import BanSystemAPI, BanSystemAPIError
from ..dependencies import get_current_admin_user
from ..schemas.ban_system import (
BanSystemStatusResponse,
BanSystemStatsResponse,
BanUsersListResponse,
BanUserListItem,
BanUserDetailResponse,
BanUserIPInfo,
BanUserRequestLog,
BanPunishmentsListResponse,
BanPunishmentItem,
BanHistoryResponse,
BanUserRequest,
UnbanResponse,
BanNodesListResponse,
BanNodeItem,
BanAgentsListResponse,
BanAgentItem,
BanAgentsSummary,
BanTrafficViolationsResponse,
BanTrafficViolationItem,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/admin/ban-system", tags=["Cabinet Admin Ban System"])
def _get_ban_api() -> BanSystemAPI:
"""Get Ban System API instance."""
if not settings.is_ban_system_enabled():
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Ban System integration is disabled",
)
if not settings.is_ban_system_configured():
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Ban System is not configured",
)
return BanSystemAPI(
base_url=settings.get_ban_system_api_url(),
api_token=settings.get_ban_system_api_token(),
timeout=settings.get_ban_system_request_timeout(),
)
async def _api_request(api: BanSystemAPI, method: str, *args, **kwargs) -> Any:
"""Execute API request with error handling."""
try:
async with api:
func = getattr(api, method)
return await func(*args, **kwargs)
except BanSystemAPIError as e:
logger.error(f"Ban System API error: {e}")
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail=f"Ban System API error: {e.message}",
)
except Exception as e:
logger.error(f"Ban System unexpected error: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Internal error: {str(e)}",
)
# === Status ===
@router.get("/status", response_model=BanSystemStatusResponse)
async def get_ban_system_status(
admin: User = Depends(get_current_admin_user),
) -> BanSystemStatusResponse:
"""Get Ban System integration status."""
return BanSystemStatusResponse(
enabled=settings.is_ban_system_enabled(),
configured=settings.is_ban_system_configured(),
)
# === Stats ===
@router.get("/stats", response_model=BanSystemStatsResponse)
async def get_stats(
admin: User = Depends(get_current_admin_user),
) -> BanSystemStatsResponse:
"""Get overall Ban System statistics."""
api = _get_ban_api()
data = await _api_request(api, "get_stats")
return BanSystemStatsResponse(
total_users=data.get("total_users", 0),
active_users=data.get("active_users", 0),
users_over_limit=data.get("users_over_limit", 0),
total_requests=data.get("total_requests", 0),
total_punishments=data.get("total_punishments", 0),
active_punishments=data.get("active_punishments", 0),
nodes_online=data.get("nodes_online", 0),
nodes_total=data.get("nodes_total", 0),
agents_online=data.get("agents_online", 0),
agents_total=data.get("agents_total", 0),
panel_connected=data.get("panel_connected", False),
uptime_seconds=data.get("uptime_seconds"),
)
# === Users ===
@router.get("/users", response_model=BanUsersListResponse)
async def get_users(
offset: int = Query(0, ge=0),
limit: int = Query(50, ge=1, le=100),
status: Optional[str] = Query(None, description="Filter: over_limit, with_limit, unlimited"),
admin: User = Depends(get_current_admin_user),
) -> BanUsersListResponse:
"""Get list of users from Ban System."""
api = _get_ban_api()
data = await _api_request(api, "get_users", offset=offset, limit=limit, status=status)
users = []
for user_data in data.get("users", []):
users.append(BanUserListItem(
email=user_data.get("email", ""),
unique_ip_count=user_data.get("unique_ip_count", 0),
total_requests=user_data.get("total_requests", 0),
limit=user_data.get("limit"),
is_over_limit=user_data.get("is_over_limit", False),
blocked_count=user_data.get("blocked_count", 0),
))
return BanUsersListResponse(
users=users,
total=data.get("total", len(users)),
offset=offset,
limit=limit,
)
@router.get("/users/over-limit", response_model=BanUsersListResponse)
async def get_users_over_limit(
limit: int = Query(50, ge=1, le=100),
admin: User = Depends(get_current_admin_user),
) -> BanUsersListResponse:
"""Get users who exceeded their device limit."""
api = _get_ban_api()
data = await _api_request(api, "get_users_over_limit", limit=limit)
users = []
for user_data in data.get("users", []):
users.append(BanUserListItem(
email=user_data.get("email", ""),
unique_ip_count=user_data.get("unique_ip_count", 0),
total_requests=user_data.get("total_requests", 0),
limit=user_data.get("limit"),
is_over_limit=True,
blocked_count=user_data.get("blocked_count", 0),
))
return BanUsersListResponse(
users=users,
total=len(users),
offset=0,
limit=limit,
)
@router.get("/users/search/{query}")
async def search_users(
query: str,
admin: User = Depends(get_current_admin_user),
) -> BanUsersListResponse:
"""Search for users."""
api = _get_ban_api()
data = await _api_request(api, "search_users", query=query)
users = []
users_data = data.get("users", []) if isinstance(data, dict) else data
for user_data in users_data:
users.append(BanUserListItem(
email=user_data.get("email", ""),
unique_ip_count=user_data.get("unique_ip_count", 0),
total_requests=user_data.get("total_requests", 0),
limit=user_data.get("limit"),
is_over_limit=user_data.get("is_over_limit", False),
blocked_count=user_data.get("blocked_count", 0),
))
return BanUsersListResponse(
users=users,
total=len(users),
offset=0,
limit=100,
)
@router.get("/users/{email}", response_model=BanUserDetailResponse)
async def get_user_detail(
email: str,
admin: User = Depends(get_current_admin_user),
) -> BanUserDetailResponse:
"""Get detailed user information."""
api = _get_ban_api()
data = await _api_request(api, "get_user", email=email)
ips = []
for ip_data in data.get("ips", {}).values() if isinstance(data.get("ips"), dict) else data.get("ips", []):
ips.append(BanUserIPInfo(
ip=ip_data.get("ip", ""),
first_seen=ip_data.get("first_seen"),
last_seen=ip_data.get("last_seen"),
node=ip_data.get("node"),
request_count=ip_data.get("request_count", 0),
country_code=ip_data.get("country_code"),
country_name=ip_data.get("country_name"),
city=ip_data.get("city"),
))
recent_requests = []
for req_data in data.get("recent_requests", []):
recent_requests.append(BanUserRequestLog(
timestamp=req_data.get("timestamp"),
source_ip=req_data.get("source_ip", ""),
destination=req_data.get("destination"),
dest_port=req_data.get("dest_port"),
protocol=req_data.get("protocol"),
action=req_data.get("action"),
node=req_data.get("node"),
))
return BanUserDetailResponse(
email=data.get("email", email),
unique_ip_count=data.get("unique_ip_count", 0),
total_requests=data.get("total_requests", 0),
limit=data.get("limit"),
is_over_limit=data.get("is_over_limit", False),
blocked_count=data.get("blocked_count", 0),
ips=ips,
recent_requests=recent_requests,
network_type=data.get("network_type"),
)
# === Punishments ===
@router.get("/punishments", response_model=BanPunishmentsListResponse)
async def get_punishments(
admin: User = Depends(get_current_admin_user),
) -> BanPunishmentsListResponse:
"""Get list of active punishments (bans)."""
api = _get_ban_api()
data = await _api_request(api, "get_punishments")
punishments = []
punishments_data = data if isinstance(data, list) else data.get("punishments", [])
for p in punishments_data:
punishments.append(BanPunishmentItem(
id=p.get("id"),
user_id=p.get("user_id", ""),
uuid=p.get("uuid"),
username=p.get("username", ""),
reason=p.get("reason"),
punished_at=p.get("punished_at"),
enable_at=p.get("enable_at"),
ip_count=p.get("ip_count", 0),
limit=p.get("limit", 0),
enabled=p.get("enabled", False),
enabled_at=p.get("enabled_at"),
node_name=p.get("node_name"),
))
return BanPunishmentsListResponse(
punishments=punishments,
total=len(punishments),
)
@router.post("/punishments/{user_id}/unban", response_model=UnbanResponse)
async def unban_user(
user_id: str,
admin: User = Depends(get_current_admin_user),
) -> UnbanResponse:
"""Unban (enable) a user."""
api = _get_ban_api()
try:
await _api_request(api, "enable_user", user_id=user_id)
logger.info(f"Admin {admin.id} unbanned user {user_id} in Ban System")
return UnbanResponse(success=True, message="User unbanned successfully")
except HTTPException:
raise
except Exception as e:
return UnbanResponse(success=False, message=str(e))
@router.post("/ban", response_model=UnbanResponse)
async def ban_user(
request: BanUserRequest,
admin: User = Depends(get_current_admin_user),
) -> UnbanResponse:
"""Manually ban a user."""
api = _get_ban_api()
try:
await _api_request(
api,
"ban_user",
username=request.username,
minutes=request.minutes,
reason=request.reason,
)
logger.info(f"Admin {admin.id} banned user {request.username}: {request.reason}")
return UnbanResponse(success=True, message="User banned successfully")
except HTTPException:
raise
except Exception as e:
return UnbanResponse(success=False, message=str(e))
@router.get("/history/{query}", response_model=BanHistoryResponse)
async def get_punishment_history(
query: str,
limit: int = Query(20, ge=1, le=100),
admin: User = Depends(get_current_admin_user),
) -> BanHistoryResponse:
"""Get punishment history for a user."""
api = _get_ban_api()
data = await _api_request(api, "get_punishment_history", query=query, limit=limit)
items = []
history_data = data if isinstance(data, list) else data.get("items", [])
for p in history_data:
items.append(BanPunishmentItem(
id=p.get("id"),
user_id=p.get("user_id", ""),
uuid=p.get("uuid"),
username=p.get("username", ""),
reason=p.get("reason"),
punished_at=p.get("punished_at"),
enable_at=p.get("enable_at"),
ip_count=p.get("ip_count", 0),
limit=p.get("limit", 0),
enabled=p.get("enabled", False),
enabled_at=p.get("enabled_at"),
node_name=p.get("node_name"),
))
return BanHistoryResponse(
items=items,
total=len(items),
)
# === Nodes ===
@router.get("/nodes", response_model=BanNodesListResponse)
async def get_nodes(
admin: User = Depends(get_current_admin_user),
) -> BanNodesListResponse:
"""Get list of connected nodes."""
api = _get_ban_api()
data = await _api_request(api, "get_nodes")
nodes = []
nodes_data = data if isinstance(data, list) else data.get("nodes", [])
online_count = 0
for n in nodes_data:
is_connected = n.get("is_connected", False)
if is_connected:
online_count += 1
nodes.append(BanNodeItem(
name=n.get("name", ""),
address=n.get("address"),
is_connected=is_connected,
last_seen=n.get("last_seen"),
users_count=n.get("users_count", 0),
agent_stats=n.get("agent_stats"),
))
return BanNodesListResponse(
nodes=nodes,
total=len(nodes),
online=online_count,
)
# === Agents ===
@router.get("/agents", response_model=BanAgentsListResponse)
async def get_agents(
search: Optional[str] = Query(None),
health: Optional[str] = Query(None, description="healthy, warning, critical"),
agent_status: Optional[str] = Query(None, alias="status", description="online, offline"),
admin: User = Depends(get_current_admin_user),
) -> BanAgentsListResponse:
"""Get list of monitoring agents."""
api = _get_ban_api()
data = await _api_request(
api,
"get_agents",
search=search,
health=health,
status=agent_status,
)
agents = []
agents_data = data.get("agents", []) if isinstance(data, dict) else data
online_count = 0
for a in agents_data:
is_online = a.get("is_online", False)
if is_online:
online_count += 1
agents.append(BanAgentItem(
node_name=a.get("node_name", ""),
sent_total=a.get("sent_total", 0),
dropped_total=a.get("dropped_total", 0),
batches_total=a.get("batches_total", 0),
reconnects=a.get("reconnects", 0),
failures=a.get("failures", 0),
queue_size=a.get("queue_size", 0),
queue_max=a.get("queue_max", 0),
dedup_checked=a.get("dedup_checked", 0),
dedup_skipped=a.get("dedup_skipped", 0),
filter_checked=a.get("filter_checked", 0),
filter_filtered=a.get("filter_filtered", 0),
health=a.get("health", "unknown"),
is_online=is_online,
last_report=a.get("last_report"),
))
summary = None
if isinstance(data, dict) and "summary" in data:
s = data["summary"]
summary = BanAgentsSummary(
total_agents=s.get("total_agents", len(agents)),
online_agents=s.get("online_agents", online_count),
total_sent=s.get("total_sent", 0),
total_dropped=s.get("total_dropped", 0),
avg_queue_size=s.get("avg_queue_size", 0.0),
healthy_count=s.get("healthy_count", 0),
warning_count=s.get("warning_count", 0),
critical_count=s.get("critical_count", 0),
)
return BanAgentsListResponse(
agents=agents,
summary=summary,
total=len(agents),
online=online_count,
)
@router.get("/agents/summary", response_model=BanAgentsSummary)
async def get_agents_summary(
admin: User = Depends(get_current_admin_user),
) -> BanAgentsSummary:
"""Get agents summary statistics."""
api = _get_ban_api()
data = await _api_request(api, "get_agents_summary")
return BanAgentsSummary(
total_agents=data.get("total_agents", 0),
online_agents=data.get("online_agents", 0),
total_sent=data.get("total_sent", 0),
total_dropped=data.get("total_dropped", 0),
avg_queue_size=data.get("avg_queue_size", 0.0),
healthy_count=data.get("healthy_count", 0),
warning_count=data.get("warning_count", 0),
critical_count=data.get("critical_count", 0),
)
# === Traffic Violations ===
@router.get("/traffic/violations", response_model=BanTrafficViolationsResponse)
async def get_traffic_violations(
limit: int = Query(50, ge=1, le=100),
admin: User = Depends(get_current_admin_user),
) -> BanTrafficViolationsResponse:
"""Get list of traffic limit violations."""
api = _get_ban_api()
data = await _api_request(api, "get_traffic_violations", limit=limit)
violations = []
violations_data = data if isinstance(data, list) else data.get("violations", [])
for v in violations_data:
violations.append(BanTrafficViolationItem(
id=v.get("id"),
username=v.get("username", ""),
email=v.get("email"),
violation_type=v.get("violation_type", v.get("type", "")),
description=v.get("description"),
bytes_used=v.get("bytes_used", 0),
bytes_limit=v.get("bytes_limit", 0),
detected_at=v.get("detected_at"),
resolved=v.get("resolved", False),
))
return BanTrafficViolationsResponse(
violations=violations,
total=len(violations),
)

View File

@@ -0,0 +1,232 @@
"""Schemas for Ban System integration in cabinet."""
from datetime import datetime
from typing import List, Optional, Dict, Any
from pydantic import BaseModel, Field
# === Status ===
class BanSystemStatusResponse(BaseModel):
"""Ban System integration status."""
enabled: bool
configured: bool
# === Stats ===
class BanSystemStatsResponse(BaseModel):
"""Overall Ban System statistics."""
total_users: int = 0
active_users: int = 0
users_over_limit: int = 0
total_requests: int = 0
total_punishments: int = 0
active_punishments: int = 0
nodes_online: int = 0
nodes_total: int = 0
agents_online: int = 0
agents_total: int = 0
panel_connected: bool = False
uptime_seconds: Optional[int] = None
# === Users ===
class BanUserIPInfo(BaseModel):
"""User IP address information."""
ip: str
first_seen: Optional[datetime] = None
last_seen: Optional[datetime] = None
node: Optional[str] = None
request_count: int = 0
country_code: Optional[str] = None
country_name: Optional[str] = None
city: Optional[str] = None
class BanUserRequestLog(BaseModel):
"""User request log entry."""
timestamp: datetime
source_ip: str
destination: Optional[str] = None
dest_port: Optional[int] = None
protocol: Optional[str] = None
action: Optional[str] = None
node: Optional[str] = None
class BanUserListItem(BaseModel):
"""User in the list."""
email: str
unique_ip_count: int = 0
total_requests: int = 0
limit: Optional[int] = None
is_over_limit: bool = False
blocked_count: int = 0
last_seen: Optional[datetime] = None
class BanUsersListResponse(BaseModel):
"""Paginated list of users."""
users: List[BanUserListItem] = []
total: int = 0
offset: int = 0
limit: int = 50
class BanUserDetailResponse(BaseModel):
"""Detailed user information."""
email: str
unique_ip_count: int = 0
total_requests: int = 0
limit: Optional[int] = None
is_over_limit: bool = False
blocked_count: int = 0
ips: List[BanUserIPInfo] = []
recent_requests: List[BanUserRequestLog] = []
network_type: Optional[str] = None # wifi, mobile, mixed
# === Punishments (Bans) ===
class BanPunishmentItem(BaseModel):
"""Punishment/ban entry."""
id: Optional[int] = None
user_id: str
uuid: Optional[str] = None
username: str
reason: Optional[str] = None
punished_at: datetime
enable_at: Optional[datetime] = None
ip_count: int = 0
limit: int = 0
enabled: bool = False
enabled_at: Optional[datetime] = None
node_name: Optional[str] = None
class BanPunishmentsListResponse(BaseModel):
"""List of active punishments."""
punishments: List[BanPunishmentItem] = []
total: int = 0
class BanHistoryResponse(BaseModel):
"""Punishment history."""
items: List[BanPunishmentItem] = []
total: int = 0
class BanUserRequest(BaseModel):
"""Request to ban a user."""
username: str = Field(..., min_length=1)
minutes: int = Field(default=30, ge=1)
reason: Optional[str] = Field(None, max_length=500)
class UnbanResponse(BaseModel):
"""Unban response."""
success: bool
message: str
# === Nodes ===
class BanNodeItem(BaseModel):
"""Node information."""
name: str
address: Optional[str] = None
is_connected: bool = False
last_seen: Optional[datetime] = None
users_count: int = 0
agent_stats: Optional[Dict[str, Any]] = None
class BanNodesListResponse(BaseModel):
"""List of nodes."""
nodes: List[BanNodeItem] = []
total: int = 0
online: int = 0
# === Agents ===
class BanAgentItem(BaseModel):
"""Monitoring agent information."""
node_name: str
sent_total: int = 0
dropped_total: int = 0
batches_total: int = 0
reconnects: int = 0
failures: int = 0
queue_size: int = 0
queue_max: int = 0
dedup_checked: int = 0
dedup_skipped: int = 0
filter_checked: int = 0
filter_filtered: int = 0
health: str = "unknown" # healthy, warning, critical
is_online: bool = False
last_report: Optional[datetime] = None
class BanAgentsSummary(BaseModel):
"""Agents summary statistics."""
total_agents: int = 0
online_agents: int = 0
total_sent: int = 0
total_dropped: int = 0
avg_queue_size: float = 0.0
healthy_count: int = 0
warning_count: int = 0
critical_count: int = 0
class BanAgentsListResponse(BaseModel):
"""List of agents."""
agents: List[BanAgentItem] = []
summary: Optional[BanAgentsSummary] = None
total: int = 0
online: int = 0
# === Traffic ===
class BanTrafficStats(BaseModel):
"""Traffic statistics."""
total_bytes: int = 0
upload_bytes: int = 0
download_bytes: int = 0
total_users: int = 0
violators_count: int = 0
class BanTrafficUserItem(BaseModel):
"""User traffic information."""
username: str
email: Optional[str] = None
total_bytes: int = 0
upload_bytes: int = 0
download_bytes: int = 0
limit_bytes: Optional[int] = None
is_over_limit: bool = False
class BanTrafficViolationItem(BaseModel):
"""Traffic limit violation entry."""
id: Optional[int] = None
username: str
email: Optional[str] = None
violation_type: str
description: Optional[str] = None
bytes_used: int = 0
bytes_limit: int = 0
detected_at: datetime
resolved: bool = False
class BanTrafficViolationsResponse(BaseModel):
"""List of traffic violations."""
violations: List[BanTrafficViolationItem] = []
total: int = 0

View File

@@ -656,6 +656,12 @@ class Settings(BaseSettings):
SMTP_FROM_NAME: str = "VPN Service"
SMTP_USE_TLS: bool = True
# Ban System Integration (BedolagaBan monitoring)
BAN_SYSTEM_ENABLED: bool = False
BAN_SYSTEM_API_URL: Optional[str] = None # e.g., http://ban-server:8000
BAN_SYSTEM_API_TOKEN: Optional[str] = None
BAN_SYSTEM_REQUEST_TIMEOUT: int = 30
@field_validator('MAIN_MENU_MODE', mode='before')
@classmethod
def normalize_main_menu_mode(cls, value: Optional[str]) -> str:
@@ -2334,6 +2340,24 @@ class Settings(BaseSettings):
return self.SMTP_FROM_EMAIL
return self.SMTP_USER
# Ban System helpers
def is_ban_system_enabled(self) -> bool:
return bool(self.BAN_SYSTEM_ENABLED)
def is_ban_system_configured(self) -> bool:
return bool(self.BAN_SYSTEM_API_URL and self.BAN_SYSTEM_API_TOKEN)
def get_ban_system_api_url(self) -> Optional[str]:
if self.BAN_SYSTEM_API_URL:
return self.BAN_SYSTEM_API_URL.rstrip('/')
return None
def get_ban_system_api_token(self) -> Optional[str]:
return self.BAN_SYSTEM_API_TOKEN
def get_ban_system_request_timeout(self) -> int:
return max(1, self.BAN_SYSTEM_REQUEST_TIMEOUT)
model_config = {
"env_file": ".env",
"env_file_encoding": "utf-8",

357
app/external/ban_system_api.py vendored Normal file
View File

@@ -0,0 +1,357 @@
"""
Ban System API Client.
Client for interacting with the BedolagaBan monitoring system.
"""
import asyncio
import logging
from datetime import datetime
from typing import Any, Dict, List, Optional
import aiohttp
logger = logging.getLogger(__name__)
class BanSystemAPIError(Exception):
"""Ban System API error."""
def __init__(self, message: str, status_code: Optional[int] = None, response_data: Optional[dict] = None):
self.message = message
self.status_code = status_code
self.response_data = response_data
super().__init__(self.message)
class BanSystemAPI:
"""HTTP client for Ban System API."""
def __init__(self, base_url: str, api_token: str, timeout: int = 30):
self.base_url = base_url.rstrip('/')
self.api_token = api_token
self.timeout = aiohttp.ClientTimeout(total=timeout)
self.session: Optional[aiohttp.ClientSession] = None
def _get_headers(self) -> Dict[str, str]:
"""Get request headers with authorization."""
return {
"Authorization": f"Bearer {self.api_token}",
"Content-Type": "application/json",
"Accept": "application/json",
}
async def __aenter__(self):
"""Async context manager entry."""
self.session = aiohttp.ClientSession(
timeout=self.timeout,
headers=self._get_headers()
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit."""
if self.session:
await self.session.close()
self.session = None
async def _ensure_session(self):
"""Ensure session is created."""
if self.session is None:
self.session = aiohttp.ClientSession(
timeout=self.timeout,
headers=self._get_headers()
)
async def _request(
self,
method: str,
endpoint: str,
params: Optional[Dict] = None,
json_data: Optional[Dict] = None,
) -> Any:
"""Execute HTTP request."""
await self._ensure_session()
url = f"{self.base_url}{endpoint}"
try:
async with self.session.request(
method=method,
url=url,
params=params,
json=json_data,
) as response:
response_text = await response.text()
if response.status >= 400:
logger.error(f"Ban System API error: {response.status} - {response_text}")
raise BanSystemAPIError(
message=f"API error {response.status}: {response_text}",
status_code=response.status,
response_data={"error": response_text}
)
if response_text:
try:
return await response.json()
except Exception:
return {"raw": response_text}
return {}
except aiohttp.ClientError as e:
logger.error(f"Ban System API connection error: {e}")
raise BanSystemAPIError(
message=f"Connection error: {str(e)}",
status_code=None,
response_data=None
)
except asyncio.TimeoutError:
logger.error("Ban System API request timeout")
raise BanSystemAPIError(
message="Request timeout",
status_code=None,
response_data=None
)
async def close(self):
"""Close the session."""
if self.session:
await self.session.close()
self.session = None
# === Stats ===
async def get_stats(self) -> Dict[str, Any]:
"""
Get overall system statistics.
GET /api/stats
"""
return await self._request("GET", "/api/stats")
async def get_stats_period(self, hours: int = 24) -> Dict[str, Any]:
"""
Get statistics for a specific period.
GET /api/stats/period?hours={hours}
"""
return await self._request("GET", "/api/stats/period", params={"hours": hours})
# === Users ===
async def get_users(
self,
offset: int = 0,
limit: int = 50,
status: Optional[str] = None,
) -> Dict[str, Any]:
"""
Get list of users with pagination.
GET /api/users
Args:
offset: Pagination offset
limit: Number of users per page (max 100)
status: Filter by status (over_limit, with_limit, unlimited)
"""
params = {"offset": offset, "limit": min(limit, 100)}
if status:
params["status"] = status
return await self._request("GET", "/api/users", params=params)
async def get_users_over_limit(self, limit: int = 50, window: bool = True) -> Dict[str, Any]:
"""
Get users who exceeded their device limit.
GET /api/users/over-limit
"""
return await self._request(
"GET",
"/api/users/over-limit",
params={"limit": limit, "window": str(window).lower()}
)
async def search_users(self, query: str) -> Dict[str, Any]:
"""
Search for a user.
GET /api/users/search/{query}
"""
return await self._request("GET", f"/api/users/search/{query}")
async def get_user(self, email: str) -> Dict[str, Any]:
"""
Get detailed user information.
GET /api/users/{email}
"""
return await self._request("GET", f"/api/users/{email}")
async def get_user_network(self, email: str) -> Dict[str, Any]:
"""
Get user network information (WiFi/Mobile detection).
GET /api/users/{email}/network
"""
return await self._request("GET", f"/api/users/{email}/network")
# === Punishments (Bans) ===
async def get_punishments(self) -> List[Dict[str, Any]]:
"""
Get list of active punishments (bans).
GET /api/punishments
"""
return await self._request("GET", "/api/punishments")
async def enable_user(self, user_id: str) -> Dict[str, Any]:
"""
Enable (unban) a user.
POST /api/punishments/{user_id}/enable
"""
return await self._request("POST", f"/api/punishments/{user_id}/enable")
async def ban_user(
self,
username: str,
minutes: int = 30,
reason: Optional[str] = None,
) -> Dict[str, Any]:
"""
Manually ban a user.
POST /api/ban
"""
params = {"username": username, "minutes": minutes}
if reason:
params["reason"] = reason
return await self._request("POST", "/api/ban", params=params)
async def get_punishment_history(self, query: str, limit: int = 20) -> List[Dict[str, Any]]:
"""
Get punishment history for a user.
GET /api/history/{query}
"""
return await self._request(
"GET",
f"/api/history/{query}",
params={"limit": limit}
)
# === Nodes ===
async def get_nodes(self, include_agent_stats: bool = True) -> List[Dict[str, Any]]:
"""
Get list of connected nodes.
GET /api/nodes
"""
return await self._request(
"GET",
"/api/nodes",
params={"include_agent_stats": str(include_agent_stats).lower()}
)
# === Agents ===
async def get_agents(
self,
search: Optional[str] = None,
health: Optional[str] = None,
status: Optional[str] = None,
sort_by: str = "name",
sort_order: str = "asc",
) -> Dict[str, Any]:
"""
Get list of monitoring agents.
GET /api/agents
Args:
search: Search query
health: Filter by health (healthy, warning, critical)
status: Filter by status (online, offline)
sort_by: Sort by field (name, sent, dropped, health)
sort_order: Sort order (asc, desc)
"""
params = {"sort_by": sort_by, "sort_order": sort_order}
if search:
params["search"] = search
if health:
params["health"] = health
if status:
params["status"] = status
return await self._request("GET", "/api/agents", params=params)
async def get_agents_summary(self) -> Dict[str, Any]:
"""
Get summary statistics for all agents.
GET /api/agents/summary
"""
return await self._request("GET", "/api/agents/summary")
async def get_agent_history(
self,
node_name: str,
hours: int = 24,
limit: int = 50,
) -> List[Dict[str, Any]]:
"""
Get agent statistics history.
GET /api/agents/{node_name}/history
"""
return await self._request(
"GET",
f"/api/agents/{node_name}/history",
params={"hours": hours, "limit": limit}
)
# === Traffic ===
async def get_traffic(self) -> Dict[str, Any]:
"""
Get overall traffic statistics.
GET /api/traffic
"""
return await self._request("GET", "/api/traffic")
async def get_traffic_top(self, limit: int = 20) -> List[Dict[str, Any]]:
"""
Get top users by traffic.
GET /api/traffic/top
"""
return await self._request("GET", "/api/traffic/top", params={"limit": limit})
async def get_user_traffic(self, username: str) -> Dict[str, Any]:
"""
Get traffic information for a specific user.
GET /api/traffic/user/{username}
"""
return await self._request("GET", f"/api/traffic/user/{username}")
async def get_traffic_violations(self, limit: int = 50) -> List[Dict[str, Any]]:
"""
Get list of traffic limit violations.
GET /api/traffic/violations
"""
return await self._request("GET", "/api/traffic/violations", params={"limit": limit})
# === Health ===
async def health_check(self) -> Dict[str, Any]:
"""
Check API health.
GET /api/health
"""
return await self._request("GET", "/api/health")