Merge pull request #2300 from BEDOLAGA-DEV/main

w
This commit is contained in:
Egor
2026-01-17 01:04:26 +03:00
committed by GitHub
26 changed files with 3531 additions and 639 deletions

View File

@@ -66,11 +66,36 @@ ADMIN_REPORTS_CHAT_ID= # Опционально: чат
ADMIN_REPORTS_TOPIC_ID= # ID топика для отчетов
ADMIN_REPORTS_SEND_TIME=10:00 # Время отправки (по МСК) ежедневного отчета
# Мониторинг трафика
TRAFFIC_MONITORING_ENABLED=false # Включить мониторинг трафика пользователей
TRAFFIC_THRESHOLD_GB_PER_DAY=10.0 # Порог трафика в ГБ за сутки (превышение вызывает уведомление)
TRAFFIC_MONITORING_INTERVAL_HOURS=24 # Интервал проверки трафика в часах (например: 1, 6, 12, 24)
SUSPICIOUS_NOTIFICATIONS_TOPIC_ID=14 # ID топика для уведомлений о подозрительной активности (0 для отправки в основной чат)
# ===== МОНИТОРИНГ ТРАФИКА =====
# Логика: при запуске бота создаётся snapshot трафика всех пользователей.
# Через указанный интервал проверяется дельта (разница) трафика.
# Если дельта превышает порог — отправляется уведомление админам.
# Быстрая проверка (дельта трафика за интервал)
TRAFFIC_FAST_CHECK_ENABLED=false # Включить быструю проверку
TRAFFIC_FAST_CHECK_INTERVAL_MINUTES=10 # Интервал проверки в минутах
TRAFFIC_FAST_CHECK_THRESHOLD_GB=5.0 # Порог дельты в ГБ (сколько потрачено за интервал)
# Суточная проверка (трафик за 24 часа через bandwidth API)
TRAFFIC_DAILY_CHECK_ENABLED=false # Включить суточную проверку
TRAFFIC_DAILY_CHECK_TIME=00:00 # Время суточной проверки (HH:MM по UTC)
TRAFFIC_DAILY_THRESHOLD_GB=50.0 # Порог суточного трафика в ГБ
# Куда отправлять уведомления
SUSPICIOUS_NOTIFICATIONS_TOPIC_ID=14 # ID топика для уведомлений о подозрительной активности
# Фильтрация по серверам (UUID нод через запятую)
TRAFFIC_MONITORED_NODES= # Только эти ноды (пусто = все)
TRAFFIC_IGNORED_NODES= # Исключить эти ноды
# Исключить пользователей (UUID через запятую)
TRAFFIC_EXCLUDED_USER_UUIDS= # Служебные/тунельные пользователи
# Производительность
TRAFFIC_CHECK_BATCH_SIZE=1000 # Размер батча для получения пользователей
TRAFFIC_CHECK_CONCURRENCY=10 # Параллельных запросов к API
TRAFFIC_NOTIFICATION_COOLDOWN_MINUTES=60 # Кулдаун уведомлений на пользователя (минуты)
TRAFFIC_SNAPSHOT_TTL_HOURS=24 # TTL snapshot трафика в Redis (часы, сохраняется при рестарте)
# Черный список
BLACKLIST_CHECK_ENABLED=false # Включить проверку пользователей по черному списку
@@ -705,6 +730,17 @@ APP_CONFIG_PATH=app-config.json
ENABLE_DEEP_LINKS=true
APP_CONFIG_CACHE_TTL=3600
# ===== BAN SYSTEM INTEGRATION (BedolagaBan) =====
# Интеграция с системой мониторинга банов BedolagaBan
# Включить интеграцию с Ban системой
BAN_SYSTEM_ENABLED=false
# URL API сервера Ban системы (например: http://ban-server:8000)
BAN_SYSTEM_API_URL=
# API токен для авторизации в Ban системе
BAN_SYSTEM_API_TOKEN=
# Таймаут запросов к API (секунды)
BAN_SYSTEM_REQUEST_TIMEOUT=30
# ===== СИСТЕМА БЕКАПОВ =====
BACKUP_AUTO_ENABLED=true
BACKUP_INTERVAL_HOURS=24

View File

@@ -22,6 +22,7 @@ from .admin_wheel import router as admin_wheel_router
from .admin_tariffs import router as admin_tariffs_router
from .admin_servers import router as admin_servers_router
from .admin_stats import router as admin_stats_router
from .admin_ban_system import router as admin_ban_system_router
from .media import router as media_router
# Main cabinet router
@@ -53,5 +54,6 @@ router.include_router(admin_wheel_router)
router.include_router(admin_tariffs_router)
router.include_router(admin_servers_router)
router.include_router(admin_stats_router)
router.include_router(admin_ban_system_router)
__all__ = ["router"]

View File

@@ -0,0 +1,975 @@
"""Admin routes for Ban System monitoring in cabinet."""
import logging
from typing import Optional, List, Any
from fastapi import APIRouter, Depends, HTTPException, status, Query
from app.config import settings
from app.database.models import User
from app.external.ban_system_api import BanSystemAPI, BanSystemAPIError
from ..dependencies import get_current_admin_user
from ..schemas.ban_system import (
BanSystemStatusResponse,
BanSystemStatsResponse,
BanUsersListResponse,
BanUserListItem,
BanUserDetailResponse,
BanUserIPInfo,
BanUserRequestLog,
BanPunishmentsListResponse,
BanPunishmentItem,
BanHistoryResponse,
BanUserRequest,
UnbanResponse,
BanNodesListResponse,
BanNodeItem,
BanAgentsListResponse,
BanAgentItem,
BanAgentsSummary,
BanTrafficViolationsResponse,
BanTrafficViolationItem,
BanTrafficResponse,
BanTrafficTopItem,
BanSettingsResponse,
BanSettingDefinition,
BanWhitelistRequest,
BanReportResponse,
BanReportTopViolator,
BanHealthResponse,
BanHealthComponent,
BanHealthDetailedResponse,
BanAgentHistoryResponse,
BanAgentHistoryItem,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/admin/ban-system", tags=["Cabinet Admin Ban System"])
def _get_ban_api() -> BanSystemAPI:
"""Get Ban System API instance."""
logger.info(f"Ban System check - enabled: {settings.is_ban_system_enabled()}, configured: {settings.is_ban_system_configured()}")
logger.info(f"Ban System URL: {settings.get_ban_system_api_url()}")
if not settings.is_ban_system_enabled():
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Ban System integration is disabled",
)
if not settings.is_ban_system_configured():
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Ban System is not configured",
)
return BanSystemAPI(
base_url=settings.get_ban_system_api_url(),
api_token=settings.get_ban_system_api_token(),
timeout=settings.get_ban_system_request_timeout(),
)
async def _api_request(api: BanSystemAPI, method: str, *args, **kwargs) -> Any:
"""Execute API request with error handling."""
try:
async with api:
func = getattr(api, method)
return await func(*args, **kwargs)
except BanSystemAPIError as e:
logger.error(f"Ban System API error: {e}")
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail=f"Ban System API error: {e.message}",
)
except Exception as e:
logger.error(f"Ban System unexpected error: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Internal error: {str(e)}",
)
# === Status ===
@router.get("/status", response_model=BanSystemStatusResponse)
async def get_ban_system_status(
admin: User = Depends(get_current_admin_user),
) -> BanSystemStatusResponse:
"""Get Ban System integration status."""
return BanSystemStatusResponse(
enabled=settings.is_ban_system_enabled(),
configured=settings.is_ban_system_configured(),
)
# === Stats ===
@router.get("/stats/raw")
async def get_stats_raw(
admin: User = Depends(get_current_admin_user),
) -> dict:
"""Get raw stats from Ban System API for debugging."""
api = _get_ban_api()
data = await _api_request(api, "get_stats")
return {"raw_response": data}
@router.get("/stats", response_model=BanSystemStatsResponse)
async def get_stats(
admin: User = Depends(get_current_admin_user),
) -> BanSystemStatsResponse:
"""Get overall Ban System statistics."""
from datetime import datetime
api = _get_ban_api()
data = await _api_request(api, "get_stats")
logger.info(f"Ban System raw stats: {data}")
# Extract punishment stats
punishment_stats = data.get("punishment_stats") or {}
# Extract connected nodes info
connected_nodes = data.get("connected_nodes", [])
# Count online nodes/agents
nodes_online = sum(1 for n in connected_nodes if n.get("is_online", False))
# Extract tcp_metrics for uptime
tcp_metrics = data.get("tcp_metrics") or {}
uptime_seconds = None
intake_started = tcp_metrics.get("intake_started_at")
if intake_started:
try:
start_time = datetime.fromisoformat(intake_started.replace("Z", "+00:00"))
uptime_seconds = int((datetime.now(start_time.tzinfo) - start_time).total_seconds())
except Exception:
pass
return BanSystemStatsResponse(
total_users=data.get("total_users", 0),
active_users=data.get("users_with_limit", 0),
users_over_limit=data.get("users_over_limit", 0),
total_requests=data.get("total_requests", 0),
total_punishments=punishment_stats.get("total_punishments", 0),
active_punishments=punishment_stats.get("active_punishments", 0),
nodes_online=nodes_online,
nodes_total=len(connected_nodes),
agents_online=nodes_online, # Agents = connected nodes with stats
agents_total=len(connected_nodes),
panel_connected=data.get("panel_loaded", False),
uptime_seconds=uptime_seconds,
)
# === Users ===
@router.get("/users", response_model=BanUsersListResponse)
async def get_users(
offset: int = Query(0, ge=0),
limit: int = Query(50, ge=1, le=100),
status: Optional[str] = Query(None, description="Filter: over_limit, with_limit, unlimited"),
admin: User = Depends(get_current_admin_user),
) -> BanUsersListResponse:
"""Get list of users from Ban System."""
api = _get_ban_api()
data = await _api_request(api, "get_users", offset=offset, limit=limit, status=status)
users = []
for user_data in data.get("users", []):
users.append(BanUserListItem(
email=user_data.get("email", ""),
unique_ip_count=user_data.get("unique_ip_count", 0),
total_requests=user_data.get("total_requests", 0),
limit=user_data.get("limit"),
is_over_limit=user_data.get("is_over_limit", False),
blocked_count=user_data.get("blocked_count", 0),
))
return BanUsersListResponse(
users=users,
total=data.get("total", len(users)),
offset=offset,
limit=limit,
)
@router.get("/users/over-limit", response_model=BanUsersListResponse)
async def get_users_over_limit(
limit: int = Query(50, ge=1, le=100),
admin: User = Depends(get_current_admin_user),
) -> BanUsersListResponse:
"""Get users who exceeded their device limit."""
api = _get_ban_api()
data = await _api_request(api, "get_users_over_limit", limit=limit)
users = []
for user_data in data.get("users", []):
users.append(BanUserListItem(
email=user_data.get("email", ""),
unique_ip_count=user_data.get("unique_ip_count", 0),
total_requests=user_data.get("total_requests", 0),
limit=user_data.get("limit"),
is_over_limit=True,
blocked_count=user_data.get("blocked_count", 0),
))
return BanUsersListResponse(
users=users,
total=len(users),
offset=0,
limit=limit,
)
@router.get("/users/search/{query}")
async def search_users(
query: str,
admin: User = Depends(get_current_admin_user),
) -> BanUsersListResponse:
"""Search for users."""
api = _get_ban_api()
data = await _api_request(api, "search_users", query=query)
users = []
users_data = data.get("users", []) if isinstance(data, dict) else data
for user_data in users_data:
users.append(BanUserListItem(
email=user_data.get("email", ""),
unique_ip_count=user_data.get("unique_ip_count", 0),
total_requests=user_data.get("total_requests", 0),
limit=user_data.get("limit"),
is_over_limit=user_data.get("is_over_limit", False),
blocked_count=user_data.get("blocked_count", 0),
))
return BanUsersListResponse(
users=users,
total=len(users),
offset=0,
limit=100,
)
@router.get("/users/{email}", response_model=BanUserDetailResponse)
async def get_user_detail(
email: str,
admin: User = Depends(get_current_admin_user),
) -> BanUserDetailResponse:
"""Get detailed user information."""
api = _get_ban_api()
data = await _api_request(api, "get_user", email=email)
ips = []
for ip_data in data.get("ips", {}).values() if isinstance(data.get("ips"), dict) else data.get("ips", []):
ips.append(BanUserIPInfo(
ip=ip_data.get("ip", ""),
first_seen=ip_data.get("first_seen"),
last_seen=ip_data.get("last_seen"),
node=ip_data.get("node"),
request_count=ip_data.get("request_count", 0),
country_code=ip_data.get("country_code"),
country_name=ip_data.get("country_name"),
city=ip_data.get("city"),
))
recent_requests = []
for req_data in data.get("recent_requests", []):
recent_requests.append(BanUserRequestLog(
timestamp=req_data.get("timestamp"),
source_ip=req_data.get("source_ip", ""),
destination=req_data.get("destination"),
dest_port=req_data.get("dest_port"),
protocol=req_data.get("protocol"),
action=req_data.get("action"),
node=req_data.get("node"),
))
return BanUserDetailResponse(
email=data.get("email", email),
unique_ip_count=data.get("unique_ip_count", 0),
total_requests=data.get("total_requests", 0),
limit=data.get("limit"),
is_over_limit=data.get("is_over_limit", False),
blocked_count=data.get("blocked_count", 0),
ips=ips,
recent_requests=recent_requests,
network_type=data.get("network_type"),
)
# === Punishments ===
@router.get("/punishments", response_model=BanPunishmentsListResponse)
async def get_punishments(
admin: User = Depends(get_current_admin_user),
) -> BanPunishmentsListResponse:
"""Get list of active punishments (bans)."""
api = _get_ban_api()
data = await _api_request(api, "get_punishments")
punishments = []
punishments_data = data if isinstance(data, list) else data.get("punishments", [])
for p in punishments_data:
punishments.append(BanPunishmentItem(
id=p.get("id"),
user_id=p.get("user_id", ""),
uuid=p.get("uuid"),
username=p.get("username", ""),
reason=p.get("reason"),
punished_at=p.get("punished_at"),
enable_at=p.get("enable_at"),
ip_count=p.get("ip_count", 0),
limit=p.get("limit", 0),
enabled=p.get("enabled", False),
enabled_at=p.get("enabled_at"),
node_name=p.get("node_name"),
))
return BanPunishmentsListResponse(
punishments=punishments,
total=len(punishments),
)
@router.post("/punishments/{user_id}/unban", response_model=UnbanResponse)
async def unban_user(
user_id: str,
admin: User = Depends(get_current_admin_user),
) -> UnbanResponse:
"""Unban (enable) a user."""
api = _get_ban_api()
try:
await _api_request(api, "enable_user", user_id=user_id)
logger.info(f"Admin {admin.id} unbanned user {user_id} in Ban System")
return UnbanResponse(success=True, message="User unbanned successfully")
except HTTPException:
raise
except Exception as e:
return UnbanResponse(success=False, message=str(e))
@router.post("/ban", response_model=UnbanResponse)
async def ban_user(
request: BanUserRequest,
admin: User = Depends(get_current_admin_user),
) -> UnbanResponse:
"""Manually ban a user."""
api = _get_ban_api()
try:
await _api_request(
api,
"ban_user",
username=request.username,
minutes=request.minutes,
reason=request.reason,
)
logger.info(f"Admin {admin.id} banned user {request.username}: {request.reason}")
return UnbanResponse(success=True, message="User banned successfully")
except HTTPException:
raise
except Exception as e:
return UnbanResponse(success=False, message=str(e))
@router.get("/history/{query}", response_model=BanHistoryResponse)
async def get_punishment_history(
query: str,
limit: int = Query(20, ge=1, le=100),
admin: User = Depends(get_current_admin_user),
) -> BanHistoryResponse:
"""Get punishment history for a user."""
api = _get_ban_api()
data = await _api_request(api, "get_punishment_history", query=query, limit=limit)
items = []
history_data = data if isinstance(data, list) else data.get("items", [])
for p in history_data:
items.append(BanPunishmentItem(
id=p.get("id"),
user_id=p.get("user_id", ""),
uuid=p.get("uuid"),
username=p.get("username", ""),
reason=p.get("reason"),
punished_at=p.get("punished_at"),
enable_at=p.get("enable_at"),
ip_count=p.get("ip_count", 0),
limit=p.get("limit", 0),
enabled=p.get("enabled", False),
enabled_at=p.get("enabled_at"),
node_name=p.get("node_name"),
))
return BanHistoryResponse(
items=items,
total=len(items),
)
# === Nodes ===
@router.get("/nodes", response_model=BanNodesListResponse)
async def get_nodes(
admin: User = Depends(get_current_admin_user),
) -> BanNodesListResponse:
"""Get list of connected nodes."""
api = _get_ban_api()
data = await _api_request(api, "get_nodes")
nodes = []
nodes_data = data if isinstance(data, list) else data.get("nodes", [])
online_count = 0
for n in nodes_data:
# API returns is_online, not is_connected
is_connected = n.get("is_online", n.get("is_connected", False))
if is_connected:
online_count += 1
nodes.append(BanNodeItem(
name=n.get("name", ""),
address=n.get("address"),
is_connected=is_connected,
# API returns last_heartbeat, not last_seen
last_seen=n.get("last_heartbeat", n.get("last_seen")),
# API returns unique_users, not users_count
users_count=n.get("unique_users", n.get("users_count", 0)),
agent_stats=n.get("agent_stats"),
))
return BanNodesListResponse(
nodes=nodes,
total=len(nodes),
online=online_count,
)
# === Agents ===
@router.get("/agents", response_model=BanAgentsListResponse)
async def get_agents(
search: Optional[str] = Query(None),
health: Optional[str] = Query(None, description="healthy, warning, critical"),
agent_status: Optional[str] = Query(None, alias="status", description="online, offline"),
admin: User = Depends(get_current_admin_user),
) -> BanAgentsListResponse:
"""Get list of monitoring agents."""
api = _get_ban_api()
data = await _api_request(
api,
"get_agents",
search=search,
health=health,
status=agent_status,
)
agents = []
agents_data = data.get("agents", {}) if isinstance(data, dict) else data
online_count = 0
# API returns agents as dict: {"node_name": {stats...}, ...}
if isinstance(agents_data, dict):
for node_name, agent_info in agents_data.items():
# Extract metrics from nested structure
stats = agent_info.get("stats", {}) or {}
metrics = stats.get("metrics", {}) or {}
sent_info = metrics.get("sent", {}) or {}
queue_info = metrics.get("queue", {}) or {}
conn_info = metrics.get("connection", {}) or {}
is_online = agent_info.get("is_online", False)
if is_online:
online_count += 1
agents.append(BanAgentItem(
node_name=node_name,
sent_total=sent_info.get("total", 0),
dropped_total=sent_info.get("dropped", 0),
batches_total=sent_info.get("batches", 0),
reconnects=conn_info.get("reconnects", 0),
failures=conn_info.get("failures", sent_info.get("failed", 0)),
queue_size=queue_info.get("current", 0),
queue_max=queue_info.get("high_watermark", 0),
dedup_checked=0,
dedup_skipped=0,
filter_checked=0,
filter_filtered=0,
health=agent_info.get("health", "unknown"),
is_online=is_online,
last_report=agent_info.get("updated_at"),
))
else:
# Fallback for list format
for a in agents_data:
is_online = a.get("is_online", False)
if is_online:
online_count += 1
agents.append(BanAgentItem(
node_name=a.get("node_name", ""),
sent_total=a.get("sent_total", 0),
dropped_total=a.get("dropped_total", 0),
batches_total=a.get("batches_total", 0),
reconnects=a.get("reconnects", 0),
failures=a.get("failures", 0),
queue_size=a.get("queue_size", 0),
queue_max=a.get("queue_max", 0),
dedup_checked=a.get("dedup_checked", 0),
dedup_skipped=a.get("dedup_skipped", 0),
filter_checked=a.get("filter_checked", 0),
filter_filtered=a.get("filter_filtered", 0),
health=a.get("health", "unknown"),
is_online=is_online,
last_report=a.get("last_report"),
))
summary = None
if isinstance(data, dict) and "summary" in data:
s = data["summary"]
summary = BanAgentsSummary(
total_agents=s.get("total_agents", len(agents)),
online_agents=s.get("online_agents", online_count),
total_sent=s.get("total_sent", 0),
total_dropped=s.get("total_dropped", 0),
avg_queue_size=s.get("avg_queue_size", 0.0),
healthy_count=s.get("healthy_count", 0),
warning_count=s.get("warning_count", 0),
critical_count=s.get("critical_count", 0),
)
return BanAgentsListResponse(
agents=agents,
summary=summary,
total=len(agents),
online=online_count,
)
@router.get("/agents/summary", response_model=BanAgentsSummary)
async def get_agents_summary(
admin: User = Depends(get_current_admin_user),
) -> BanAgentsSummary:
"""Get agents summary statistics."""
api = _get_ban_api()
data = await _api_request(api, "get_agents_summary")
return BanAgentsSummary(
total_agents=data.get("total_agents", 0),
online_agents=data.get("online_agents", 0),
total_sent=data.get("total_sent", 0),
total_dropped=data.get("total_dropped", 0),
avg_queue_size=data.get("avg_queue_size", 0.0),
healthy_count=data.get("healthy_count", 0),
warning_count=data.get("warning_count", 0),
critical_count=data.get("critical_count", 0),
)
# === Traffic Violations ===
@router.get("/traffic/violations", response_model=BanTrafficViolationsResponse)
async def get_traffic_violations(
limit: int = Query(50, ge=1, le=100),
admin: User = Depends(get_current_admin_user),
) -> BanTrafficViolationsResponse:
"""Get list of traffic limit violations."""
api = _get_ban_api()
data = await _api_request(api, "get_traffic_violations", limit=limit)
violations = []
violations_data = data if isinstance(data, list) else data.get("violations", [])
for v in violations_data:
violations.append(BanTrafficViolationItem(
id=v.get("id"),
username=v.get("username", ""),
email=v.get("email"),
violation_type=v.get("violation_type", v.get("type", "")),
description=v.get("description"),
bytes_used=v.get("bytes_used", 0),
bytes_limit=v.get("bytes_limit", 0),
detected_at=v.get("detected_at"),
resolved=v.get("resolved", False),
))
return BanTrafficViolationsResponse(
violations=violations,
total=len(violations),
)
# === Full Traffic Stats ===
@router.get("/traffic", response_model=BanTrafficResponse)
async def get_traffic(
admin: User = Depends(get_current_admin_user),
) -> BanTrafficResponse:
"""Get full traffic statistics including top users."""
api = _get_ban_api()
data = await _api_request(api, "get_traffic")
top_users = []
for u in data.get("top_users", []):
top_users.append(BanTrafficTopItem(
username=u.get("username", ""),
bytes_total=u.get("bytes_total", u.get("total_bytes", 0)),
bytes_limit=u.get("bytes_limit"),
over_limit=u.get("over_limit", False),
))
violations = []
for v in data.get("recent_violations", []):
violations.append(BanTrafficViolationItem(
id=v.get("id"),
username=v.get("username", ""),
email=v.get("email"),
violation_type=v.get("violation_type", v.get("type", "")),
description=v.get("description"),
bytes_used=v.get("bytes_used", 0),
bytes_limit=v.get("bytes_limit", 0),
detected_at=v.get("detected_at"),
resolved=v.get("resolved", False),
))
return BanTrafficResponse(
enabled=data.get("enabled", False),
stats=data.get("stats"),
top_users=top_users,
recent_violations=violations,
)
@router.get("/traffic/top")
async def get_traffic_top(
limit: int = Query(20, ge=1, le=100),
admin: User = Depends(get_current_admin_user),
) -> List[BanTrafficTopItem]:
"""Get top users by traffic."""
api = _get_ban_api()
data = await _api_request(api, "get_traffic_top", limit=limit)
top_users = []
users_data = data if isinstance(data, list) else data.get("users", [])
for u in users_data:
top_users.append(BanTrafficTopItem(
username=u.get("username", ""),
bytes_total=u.get("bytes_total", u.get("total_bytes", 0)),
bytes_limit=u.get("bytes_limit"),
over_limit=u.get("over_limit", False),
))
return top_users
# === Settings ===
def _parse_setting_response(key: str, data: Any, default_type: str = "str") -> BanSettingDefinition:
"""Parse setting response from API."""
if isinstance(data, dict) and "value" in data:
return BanSettingDefinition(
key=key,
value=data.get("value"),
type=data.get("type", default_type),
min_value=data.get("min"),
max_value=data.get("max"),
editable=data.get("editable", True),
description=data.get("description"),
category=data.get("category"),
)
else:
# Простое значение или dict без "value"
value = data.get("value", data) if isinstance(data, dict) else data
value_type = default_type
if isinstance(value, bool):
value_type = "bool"
elif isinstance(value, int):
value_type = "int"
elif isinstance(value, float):
value_type = "float"
elif isinstance(value, list):
value_type = "list"
return BanSettingDefinition(
key=key,
value=value,
type=value_type,
min_value=None,
max_value=None,
editable=True,
description=None,
category=None,
)
@router.get("/settings", response_model=BanSettingsResponse)
async def get_settings(
admin: User = Depends(get_current_admin_user),
) -> BanSettingsResponse:
"""Get all Ban System settings."""
api = _get_ban_api()
data = await _api_request(api, "get_settings")
settings_list = []
settings_data = data.get("settings", {}) if isinstance(data, dict) else {}
for key, info in settings_data.items():
# API может возвращать настройки в двух форматах:
# 1. {"key": {"value": ..., "type": ...}} - с метаданными
# 2. {"key": value} - просто значение
if isinstance(info, dict) and "value" in info:
# Формат с метаданными
settings_list.append(BanSettingDefinition(
key=key,
value=info.get("value"),
type=info.get("type", "str"),
min_value=info.get("min"),
max_value=info.get("max"),
editable=info.get("editable", True),
description=info.get("description"),
category=info.get("category"),
))
else:
# Простой формат - определяем тип по значению
value_type = "str"
if isinstance(info, bool):
value_type = "bool"
elif isinstance(info, int):
value_type = "int"
elif isinstance(info, float):
value_type = "float"
elif isinstance(info, list):
value_type = "list"
settings_list.append(BanSettingDefinition(
key=key,
value=info,
type=value_type,
min_value=None,
max_value=None,
editable=True,
description=None,
category=None,
))
return BanSettingsResponse(settings=settings_list)
@router.get("/settings/{key}")
async def get_setting(
key: str,
admin: User = Depends(get_current_admin_user),
) -> BanSettingDefinition:
"""Get a specific setting."""
api = _get_ban_api()
data = await _api_request(api, "get_setting", key=key)
return _parse_setting_response(key, data)
@router.post("/settings/{key}")
async def set_setting(
key: str,
value: str = Query(...),
admin: User = Depends(get_current_admin_user),
) -> BanSettingDefinition:
"""Set a setting value."""
api = _get_ban_api()
data = await _api_request(api, "set_setting", key=key, value=value)
logger.info(f"Admin {admin.id} changed Ban System setting {key} to {value}")
return _parse_setting_response(key, data)
@router.post("/settings/{key}/toggle")
async def toggle_setting(
key: str,
admin: User = Depends(get_current_admin_user),
) -> BanSettingDefinition:
"""Toggle a boolean setting."""
api = _get_ban_api()
data = await _api_request(api, "toggle_setting", key=key)
logger.info(f"Admin {admin.id} toggled Ban System setting {key}")
return _parse_setting_response(key, data, default_type="bool")
# === Whitelist ===
@router.post("/settings/whitelist/add", response_model=UnbanResponse)
async def whitelist_add(
request: BanWhitelistRequest,
admin: User = Depends(get_current_admin_user),
) -> UnbanResponse:
"""Add user to whitelist."""
api = _get_ban_api()
try:
await _api_request(api, "whitelist_add", username=request.username)
logger.info(f"Admin {admin.id} added {request.username} to Ban System whitelist")
return UnbanResponse(success=True, message=f"User {request.username} added to whitelist")
except HTTPException:
raise
except Exception as e:
return UnbanResponse(success=False, message=str(e))
@router.post("/settings/whitelist/remove", response_model=UnbanResponse)
async def whitelist_remove(
request: BanWhitelistRequest,
admin: User = Depends(get_current_admin_user),
) -> UnbanResponse:
"""Remove user from whitelist."""
api = _get_ban_api()
try:
await _api_request(api, "whitelist_remove", username=request.username)
logger.info(f"Admin {admin.id} removed {request.username} from Ban System whitelist")
return UnbanResponse(success=True, message=f"User {request.username} removed from whitelist")
except HTTPException:
raise
except Exception as e:
return UnbanResponse(success=False, message=str(e))
# === Reports ===
@router.get("/report", response_model=BanReportResponse)
async def get_report(
hours: int = Query(24, ge=1, le=168),
admin: User = Depends(get_current_admin_user),
) -> BanReportResponse:
"""Get period report."""
api = _get_ban_api()
data = await _api_request(api, "get_stats_period", hours=hours)
top_violators = []
punishment_stats = data.get("punishment_stats", {}) or {}
for v in punishment_stats.get("top_violators", []):
top_violators.append(BanReportTopViolator(
username=v.get("username", ""),
count=v.get("count", 0),
))
return BanReportResponse(
period_hours=hours,
current_users=data.get("current_users", 0),
current_ips=data.get("current_ips", 0),
punishment_stats=punishment_stats,
top_violators=top_violators,
)
# === Health ===
@router.get("/health", response_model=BanHealthResponse)
async def get_health(
admin: User = Depends(get_current_admin_user),
) -> BanHealthResponse:
"""Get Ban System health status."""
api = _get_ban_api()
data = await _api_request(api, "health_check")
components = []
for name, info in data.get("components", {}).items():
if isinstance(info, dict):
components.append(BanHealthComponent(
name=name,
status=info.get("status", "unknown"),
message=info.get("message"),
details=info.get("details"),
))
else:
components.append(BanHealthComponent(
name=name,
status=str(info) if info else "unknown",
))
return BanHealthResponse(
status=data.get("status", "unknown"),
uptime=data.get("uptime"),
components=components,
)
@router.get("/health/detailed", response_model=BanHealthDetailedResponse)
async def get_health_detailed(
admin: User = Depends(get_current_admin_user),
) -> BanHealthDetailedResponse:
"""Get detailed health information."""
api = _get_ban_api()
data = await _api_request(api, "health_detailed")
return BanHealthDetailedResponse(
status=data.get("status", "unknown"),
uptime=data.get("uptime"),
components=data.get("components", {}),
)
# === Agent History ===
@router.get("/agents/{node_name}/history", response_model=BanAgentHistoryResponse)
async def get_agent_history(
node_name: str,
hours: int = Query(24, ge=1, le=168),
admin: User = Depends(get_current_admin_user),
) -> BanAgentHistoryResponse:
"""Get agent statistics history."""
api = _get_ban_api()
data = await _api_request(api, "get_agent_history", node_name=node_name, hours=hours)
history = []
for item in data.get("history", []):
history.append(BanAgentHistoryItem(
timestamp=item.get("timestamp"),
sent_total=item.get("sent_total", 0),
dropped_total=item.get("dropped_total", 0),
queue_size=item.get("queue_size", 0),
batches_total=item.get("batches_total", 0),
))
return BanAgentHistoryResponse(
node=data.get("node", node_name),
hours=data.get("hours", hours),
records=data.get("records", len(history)),
delta=data.get("delta"),
first=data.get("first"),
last=data.get("last"),
history=history,
)
# === User Punishment History ===
@router.get("/users/{email}/history", response_model=BanHistoryResponse)
async def get_user_punishment_history(
email: str,
limit: int = Query(20, ge=1, le=100),
admin: User = Depends(get_current_admin_user),
) -> BanHistoryResponse:
"""Get punishment history for a specific user."""
api = _get_ban_api()
data = await _api_request(api, "get_punishment_history", query=email, limit=limit)
items = []
history_data = data if isinstance(data, list) else data.get("items", [])
for p in history_data:
items.append(BanPunishmentItem(
id=p.get("id"),
user_id=p.get("user_id", ""),
uuid=p.get("uuid"),
username=p.get("username", ""),
reason=p.get("reason"),
punished_at=p.get("punished_at"),
enable_at=p.get("enable_at"),
ip_count=p.get("ip_count", 0),
limit=p.get("limit", 0),
enabled=p.get("enabled", False),
enabled_at=p.get("enabled_at"),
node_name=p.get("node_name"),
))
return BanHistoryResponse(
items=items,
total=len(items),
)

View File

@@ -119,15 +119,19 @@ async def get_payment_methods():
"""Get available payment methods."""
methods = []
# YooKassa
# YooKassa - with card and SBP options
if settings.is_yookassa_enabled():
methods.append(PaymentMethodResponse(
id="yookassa",
name="YooKassa (Bank Card)",
description="Pay with bank card via YooKassa",
name="YooKassa",
description="Pay via YooKassa",
min_amount_kopeks=settings.YOOKASSA_MIN_AMOUNT_KOPEKS,
max_amount_kopeks=settings.YOOKASSA_MAX_AMOUNT_KOPEKS,
is_available=True,
options=[
{"id": "card", "name": "💳 Карта", "description": "Банковская карта"},
{"id": "sbp", "name": "🏦 СБП", "description": "Система быстрых платежей (QR)"},
],
))
# CryptoBot
@@ -378,19 +382,34 @@ async def create_topup(
try:
if request.payment_method == "yookassa":
yookassa_service = YooKassaService()
result = await yookassa_service.create_payment(
amount=amount_rubles,
currency="RUB",
description=f"Пополнение баланса на {amount_rubles:.2f}",
metadata={
"user_id": str(user.id),
"user_telegram_id": str(user.telegram_id) if user.telegram_id else "",
"user_username": user.username or "",
"amount_kopeks": str(request.amount_kopeks),
"type": "balance_topup",
"source": "cabinet",
},
)
yookassa_metadata = {
"user_id": str(user.id),
"user_telegram_id": str(user.telegram_id) if user.telegram_id else "",
"user_username": user.username or "",
"amount_kopeks": str(request.amount_kopeks),
"type": "balance_topup",
"source": "cabinet",
}
# Use payment_option to select card or sbp (default: card)
option = (request.payment_option or "").strip().lower()
if option == "sbp":
# Create SBP payment with QR code
result = await yookassa_service.create_sbp_payment(
amount=amount_rubles,
currency="RUB",
description=f"Пополнение баланса на {amount_rubles:.2f}",
metadata=yookassa_metadata,
)
else:
# Default: card payment
result = await yookassa_service.create_payment(
amount=amount_rubles,
currency="RUB",
description=f"Пополнение баланса на {amount_rubles:.2f}",
metadata=yookassa_metadata,
)
if result and not result.get("error"):
payment_url = result.get("confirmation_url")
payment_id = result.get("id")

View File

@@ -1661,18 +1661,48 @@ def _convert_remnawave_block_to_step(block: Dict[str, Any], url_scheme: str = ""
# Known app URL schemes (fallback if RemnaWave doesn't provide urlScheme)
KNOWN_APP_URL_SCHEMES = {
# iOS
"happ": "happ://add/",
"streisand": "streisand://import/",
"shadowrocket": "sub://",
"v2rayn": "v2rayng://install-config?url=",
"v2rayng": "v2rayng://install-config?url=",
"shadow rocket": "sub://",
"karing": "karing://install-config?url=",
"foxray": "foxray://yiguo.dev/sub/add/?url=",
"fox ray": "foxray://yiguo.dev/sub/add/?url=",
"v2box": "v2box://install-sub?url=",
"sing-box": "sing-box://import-remote-profile?url=",
"singbox": "sing-box://import-remote-profile?url=",
"quantumult x": "quantumult-x://add-resource?remote-resource=",
"quantumultx": "quantumult-x://add-resource?remote-resource=",
"quantumult": "quantumult-x://add-resource?remote-resource=",
"surge": "surge3://install-config?url=",
"loon": "loon://import?sub=",
"stash": "stash://install-config?url=",
# Android
"v2rayn": "v2rayng://install-sub?url=",
"v2rayng": "v2rayng://install-sub?url=",
"v2ray ng": "v2rayng://install-sub?url=",
"nekoray": "sn://subscription?url=",
"nekobox": "sn://subscription?url=",
"neko ray": "sn://subscription?url=",
"neko box": "sn://subscription?url=",
"surfboard": "surfboard://install-config?url=",
# PC (Windows/macOS/Linux)
"clash": "clash://install-config?url=",
"clash meta": "clash://install-config?url=",
"clash verge": "clash://install-config?url=",
"hiddify": "hiddify://import/",
"nekoray": "sn://subscription?url=",
"nekobox": "sn://subscription?url=",
"karing": "karing://add/",
"clash verge rev": "clash://install-config?url=",
"clashx": "clashx://install-config?url=",
"clashx meta": "clash://install-config?url=",
"clashx pro": "clash://install-config?url=",
"flclash": "clash://install-config?url=",
"flclashx": "clash://install-config?url=",
"koala clash": "clash://install-config?url=",
"koalaclash": "clash://install-config?url=",
"hiddify": "hiddify://install-config/?url=",
"hiddify next": "hiddify://install-config/?url=",
"mihomo party": "clash://install-config?url=",
"mihomo": "clash://install-config?url=",
}
@@ -1719,7 +1749,7 @@ def _convert_remnawave_app_to_cabinet(app: Dict[str, Any]) -> Dict[str, Any]:
"id": app.get("name", "").lower().replace(" ", "-"),
"name": app.get("name", ""),
"isFeatured": app.get("featured", False),
"urlScheme": app.get("urlScheme", ""),
"urlScheme": url_scheme, # Use resolved url_scheme (with fallback from app name)
"isNeedBase64Encoding": app.get("isNeedBase64Encoding", False),
"installationStep": installation_step,
"addSubscriptionStep": subscription_step,

View File

@@ -0,0 +1,340 @@
"""Schemas for Ban System integration in cabinet."""
from datetime import datetime
from typing import List, Optional, Dict, Any
from pydantic import BaseModel, Field
# === Status ===
class BanSystemStatusResponse(BaseModel):
"""Ban System integration status."""
enabled: bool
configured: bool
# === Stats ===
class BanSystemStatsResponse(BaseModel):
"""Overall Ban System statistics."""
total_users: int = 0
active_users: int = 0
users_over_limit: int = 0
total_requests: int = 0
total_punishments: int = 0
active_punishments: int = 0
nodes_online: int = 0
nodes_total: int = 0
agents_online: int = 0
agents_total: int = 0
panel_connected: bool = False
uptime_seconds: Optional[int] = None
# === Users ===
class BanUserIPInfo(BaseModel):
"""User IP address information."""
ip: str
first_seen: Optional[datetime] = None
last_seen: Optional[datetime] = None
node: Optional[str] = None
request_count: int = 0
country_code: Optional[str] = None
country_name: Optional[str] = None
city: Optional[str] = None
class BanUserRequestLog(BaseModel):
"""User request log entry."""
timestamp: datetime
source_ip: str
destination: Optional[str] = None
dest_port: Optional[int] = None
protocol: Optional[str] = None
action: Optional[str] = None
node: Optional[str] = None
class BanUserListItem(BaseModel):
"""User in the list."""
email: str
unique_ip_count: int = 0
total_requests: int = 0
limit: Optional[int] = None
is_over_limit: bool = False
blocked_count: int = 0
last_seen: Optional[datetime] = None
class BanUsersListResponse(BaseModel):
"""Paginated list of users."""
users: List[BanUserListItem] = []
total: int = 0
offset: int = 0
limit: int = 50
class BanUserDetailResponse(BaseModel):
"""Detailed user information."""
email: str
unique_ip_count: int = 0
total_requests: int = 0
limit: Optional[int] = None
is_over_limit: bool = False
blocked_count: int = 0
ips: List[BanUserIPInfo] = []
recent_requests: List[BanUserRequestLog] = []
network_type: Optional[str] = None # wifi, mobile, mixed
# === Punishments (Bans) ===
class BanPunishmentItem(BaseModel):
"""Punishment/ban entry."""
id: Optional[int] = None
user_id: str
uuid: Optional[str] = None
username: str
reason: Optional[str] = None
punished_at: datetime
enable_at: Optional[datetime] = None
ip_count: int = 0
limit: int = 0
enabled: bool = False
enabled_at: Optional[datetime] = None
node_name: Optional[str] = None
class BanPunishmentsListResponse(BaseModel):
"""List of active punishments."""
punishments: List[BanPunishmentItem] = []
total: int = 0
class BanHistoryResponse(BaseModel):
"""Punishment history."""
items: List[BanPunishmentItem] = []
total: int = 0
class BanUserRequest(BaseModel):
"""Request to ban a user."""
username: str = Field(..., min_length=1)
minutes: int = Field(default=30, ge=1)
reason: Optional[str] = Field(None, max_length=500)
class UnbanResponse(BaseModel):
"""Unban response."""
success: bool
message: str
# === Nodes ===
class BanNodeItem(BaseModel):
"""Node information."""
name: str
address: Optional[str] = None
is_connected: bool = False
last_seen: Optional[datetime] = None
users_count: int = 0
agent_stats: Optional[Dict[str, Any]] = None
class BanNodesListResponse(BaseModel):
"""List of nodes."""
nodes: List[BanNodeItem] = []
total: int = 0
online: int = 0
# === Agents ===
class BanAgentItem(BaseModel):
"""Monitoring agent information."""
node_name: str
sent_total: int = 0
dropped_total: int = 0
batches_total: int = 0
reconnects: int = 0
failures: int = 0
queue_size: int = 0
queue_max: int = 0
dedup_checked: int = 0
dedup_skipped: int = 0
filter_checked: int = 0
filter_filtered: int = 0
health: str = "unknown" # healthy, warning, critical
is_online: bool = False
last_report: Optional[datetime] = None
class BanAgentsSummary(BaseModel):
"""Agents summary statistics."""
total_agents: int = 0
online_agents: int = 0
total_sent: int = 0
total_dropped: int = 0
avg_queue_size: float = 0.0
healthy_count: int = 0
warning_count: int = 0
critical_count: int = 0
class BanAgentsListResponse(BaseModel):
"""List of agents."""
agents: List[BanAgentItem] = []
summary: Optional[BanAgentsSummary] = None
total: int = 0
online: int = 0
# === Traffic ===
class BanTrafficStats(BaseModel):
"""Traffic statistics."""
total_bytes: int = 0
upload_bytes: int = 0
download_bytes: int = 0
total_users: int = 0
violators_count: int = 0
class BanTrafficUserItem(BaseModel):
"""User traffic information."""
username: str
email: Optional[str] = None
total_bytes: int = 0
upload_bytes: int = 0
download_bytes: int = 0
limit_bytes: Optional[int] = None
is_over_limit: bool = False
class BanTrafficViolationItem(BaseModel):
"""Traffic limit violation entry."""
id: Optional[int] = None
username: str
email: Optional[str] = None
violation_type: str
description: Optional[str] = None
bytes_used: int = 0
bytes_limit: int = 0
detected_at: datetime
resolved: bool = False
class BanTrafficViolationsResponse(BaseModel):
"""List of traffic violations."""
violations: List[BanTrafficViolationItem] = []
total: int = 0
class BanTrafficTopItem(BaseModel):
"""Top user by traffic."""
username: str
bytes_total: int = 0
bytes_limit: Optional[int] = None
over_limit: bool = False
class BanTrafficResponse(BaseModel):
"""Full traffic statistics response."""
enabled: bool = False
stats: Optional[Dict[str, Any]] = None
top_users: List[BanTrafficTopItem] = []
recent_violations: List[BanTrafficViolationItem] = []
# === Settings ===
class BanSettingDefinition(BaseModel):
"""Setting definition with value."""
key: str
value: Any
type: str # bool, int, str, list
min_value: Optional[int] = None
max_value: Optional[int] = None
editable: bool = True
description: Optional[str] = None
category: Optional[str] = None
class BanSettingsResponse(BaseModel):
"""All settings response."""
settings: List[BanSettingDefinition] = []
class BanSettingUpdateRequest(BaseModel):
"""Request to update a setting."""
value: Any
class BanWhitelistRequest(BaseModel):
"""Request to add/remove from whitelist."""
username: str = Field(..., min_length=1)
# === Reports ===
class BanReportTopViolator(BaseModel):
"""Top violator in report."""
username: str
count: int = 0
class BanReportResponse(BaseModel):
"""Period report response."""
period_hours: int = 24
current_users: int = 0
current_ips: int = 0
punishment_stats: Optional[Dict[str, Any]] = None
top_violators: List[BanReportTopViolator] = []
# === Health ===
class BanHealthComponent(BaseModel):
"""Health component status."""
name: str
status: str # healthy, degraded, unhealthy
message: Optional[str] = None
details: Optional[Dict[str, Any]] = None
class BanHealthResponse(BaseModel):
"""Health status response."""
status: str # healthy, degraded, unhealthy
uptime: Optional[int] = None
components: List[BanHealthComponent] = []
class BanHealthDetailedResponse(BaseModel):
"""Detailed health response."""
status: str
uptime: Optional[int] = None
components: Dict[str, Any] = {}
# === Agent History ===
class BanAgentHistoryItem(BaseModel):
"""Agent history item."""
timestamp: datetime
sent_total: int = 0
dropped_total: int = 0
queue_size: int = 0
batches_total: int = 0
class BanAgentHistoryResponse(BaseModel):
"""Agent history response."""
node: str
hours: int = 24
records: int = 0
delta: Optional[Dict[str, Any]] = None
first: Optional[Dict[str, Any]] = None
last: Optional[Dict[str, Any]] = None
history: List[BanAgentHistoryItem] = []

View File

@@ -243,11 +243,32 @@ class Settings(BaseSettings):
MENU_LAYOUT_ENABLED: bool = False # Включить управление меню через API
# Настройки мониторинга трафика
TRAFFIC_MONITORING_ENABLED: bool = False
TRAFFIC_THRESHOLD_GB_PER_DAY: float = 10.0 # Порог трафика в ГБ за сутки
TRAFFIC_MONITORING_INTERVAL_HOURS: int = 24 # Интервал проверки в часах (по умолчанию - раз в сутки)
TRAFFIC_MONITORING_ENABLED: bool = False # Глобальный переключатель (для обратной совместимости)
TRAFFIC_THRESHOLD_GB_PER_DAY: float = 10.0 # Порог трафика в ГБ за сутки (для обратной совместимости)
TRAFFIC_MONITORING_INTERVAL_HOURS: int = 24 # Интервал проверки в часах (для обратной совместимости)
SUSPICIOUS_NOTIFICATIONS_TOPIC_ID: Optional[int] = None
# Новый мониторинг трафика v2
# Быстрая проверка (текущий использованный трафик)
TRAFFIC_FAST_CHECK_ENABLED: bool = False
TRAFFIC_FAST_CHECK_INTERVAL_MINUTES: int = 10 # Интервал проверки в минутах
TRAFFIC_FAST_CHECK_THRESHOLD_GB: float = 5.0 # Порог в ГБ для быстрой проверки
# Суточная проверка (трафик за 24 часа)
TRAFFIC_DAILY_CHECK_ENABLED: bool = False
TRAFFIC_DAILY_CHECK_TIME: str = "00:00" # Время суточной проверки (HH:MM)
TRAFFIC_DAILY_THRESHOLD_GB: float = 50.0 # Порог суточного трафика в ГБ
# Фильтрация по серверам (UUID нод через запятую)
TRAFFIC_MONITORED_NODES: str = "" # Только эти ноды (пусто = все)
TRAFFIC_IGNORED_NODES: str = "" # Исключить эти ноды
TRAFFIC_EXCLUDED_USER_UUIDS: str = "" # Исключить пользователей (UUID через запятую)
# Параллельность и кулдаун
TRAFFIC_CHECK_BATCH_SIZE: int = 1000 # Размер батча для получения пользователей
TRAFFIC_CHECK_CONCURRENCY: int = 10 # Параллельных запросов
TRAFFIC_NOTIFICATION_COOLDOWN_MINUTES: int = 60 # Кулдаун уведомлений (минуты)
TRAFFIC_SNAPSHOT_TTL_HOURS: int = 24 # TTL для snapshot трафика в Redis (часы)
# Настройки суточных подписок
DAILY_SUBSCRIPTIONS_ENABLED: bool = True # Включить автоматическое списание для суточных тарифов
DAILY_SUBSCRIPTIONS_CHECK_INTERVAL_MINUTES: int = 30 # Интервал проверки в минутах
@@ -656,6 +677,12 @@ class Settings(BaseSettings):
SMTP_FROM_NAME: str = "VPN Service"
SMTP_USE_TLS: bool = True
# Ban System Integration (BedolagaBan monitoring)
BAN_SYSTEM_ENABLED: bool = False
BAN_SYSTEM_API_URL: Optional[str] = None # e.g., http://ban-server:8000
BAN_SYSTEM_API_TOKEN: Optional[str] = None
BAN_SYSTEM_REQUEST_TIMEOUT: int = 30
@field_validator('MAIN_MENU_MODE', mode='before')
@classmethod
def normalize_main_menu_mode(cls, value: Optional[str]) -> str:
@@ -923,6 +950,41 @@ class Settings(BaseSettings):
def get_remnawave_auto_sync_times(self) -> List[time]:
return self.parse_daily_time_list(self.REMNAWAVE_AUTO_SYNC_TIMES)
def get_traffic_monitored_nodes(self) -> List[str]:
"""Возвращает список UUID нод для мониторинга (пусто = все)"""
if not self.TRAFFIC_MONITORED_NODES:
return []
# Убираем комментарии (все после #)
value = self.TRAFFIC_MONITORED_NODES.split("#")[0].strip()
if not value:
return []
return [n.strip() for n in value.split(",") if n.strip()]
def get_traffic_ignored_nodes(self) -> List[str]:
"""Возвращает список UUID нод для исключения из мониторинга"""
if not self.TRAFFIC_IGNORED_NODES:
return []
# Убираем комментарии (все после #)
value = self.TRAFFIC_IGNORED_NODES.split("#")[0].strip()
if not value:
return []
return [n.strip() for n in value.split(",") if n.strip()]
def get_traffic_excluded_user_uuids(self) -> List[str]:
"""Возвращает список UUID пользователей для исключения из мониторинга (например, тунельные/служебные)"""
if not self.TRAFFIC_EXCLUDED_USER_UUIDS:
return []
# Убираем комментарии (все после #)
value = self.TRAFFIC_EXCLUDED_USER_UUIDS.split("#")[0].strip()
if not value:
return []
return [uuid.strip().lower() for uuid in value.split(",") if uuid.strip()]
def get_traffic_daily_check_time(self) -> Optional[time]:
"""Возвращает время суточной проверки трафика"""
times = self.parse_daily_time_list(self.TRAFFIC_DAILY_CHECK_TIME)
return times[0] if times else None
def get_display_name_banned_keywords(self) -> List[str]:
raw_value = self.DISPLAY_NAME_BANNED_KEYWORDS
if raw_value is None:
@@ -2334,6 +2396,24 @@ class Settings(BaseSettings):
return self.SMTP_FROM_EMAIL
return self.SMTP_USER
# Ban System helpers
def is_ban_system_enabled(self) -> bool:
return bool(self.BAN_SYSTEM_ENABLED)
def is_ban_system_configured(self) -> bool:
return bool(self.BAN_SYSTEM_API_URL and self.BAN_SYSTEM_API_TOKEN)
def get_ban_system_api_url(self) -> Optional[str]:
if self.BAN_SYSTEM_API_URL:
return self.BAN_SYSTEM_API_URL.rstrip('/')
return None
def get_ban_system_api_token(self) -> Optional[str]:
return self.BAN_SYSTEM_API_TOKEN
def get_ban_system_request_timeout(self) -> int:
return max(1, self.BAN_SYSTEM_REQUEST_TIMEOUT)
model_config = {
"env_file": ".env",
"env_file_encoding": "utf-8",

415
app/external/ban_system_api.py vendored Normal file
View File

@@ -0,0 +1,415 @@
"""
Ban System API Client.
Client for interacting with the BedolagaBan monitoring system.
"""
import asyncio
import logging
from datetime import datetime
from typing import Any, Dict, List, Optional
import aiohttp
logger = logging.getLogger(__name__)
class BanSystemAPIError(Exception):
"""Ban System API error."""
def __init__(self, message: str, status_code: Optional[int] = None, response_data: Optional[dict] = None):
self.message = message
self.status_code = status_code
self.response_data = response_data
super().__init__(self.message)
class BanSystemAPI:
"""HTTP client for Ban System API."""
def __init__(self, base_url: str, api_token: str, timeout: int = 30):
self.base_url = base_url.rstrip('/')
self.api_token = api_token
self.timeout = aiohttp.ClientTimeout(total=timeout)
self.session: Optional[aiohttp.ClientSession] = None
def _get_headers(self) -> Dict[str, str]:
"""Get request headers with authorization."""
return {
"Authorization": f"Bearer {self.api_token}",
"Content-Type": "application/json",
"Accept": "application/json",
}
async def __aenter__(self):
"""Async context manager entry."""
self.session = aiohttp.ClientSession(
timeout=self.timeout,
headers=self._get_headers()
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit."""
if self.session:
await self.session.close()
self.session = None
async def _ensure_session(self):
"""Ensure session is created."""
if self.session is None:
self.session = aiohttp.ClientSession(
timeout=self.timeout,
headers=self._get_headers()
)
async def _request(
self,
method: str,
endpoint: str,
params: Optional[Dict] = None,
json_data: Optional[Dict] = None,
) -> Any:
"""Execute HTTP request."""
await self._ensure_session()
url = f"{self.base_url}{endpoint}"
try:
async with self.session.request(
method=method,
url=url,
params=params,
json=json_data,
) as response:
response_text = await response.text()
if response.status >= 400:
logger.error(f"Ban System API error: {response.status} - {response_text}")
raise BanSystemAPIError(
message=f"API error {response.status}: {response_text}",
status_code=response.status,
response_data={"error": response_text}
)
if response_text:
try:
return await response.json()
except Exception:
return {"raw": response_text}
return {}
except aiohttp.ClientError as e:
logger.error(f"Ban System API connection error: {e}")
raise BanSystemAPIError(
message=f"Connection error: {str(e)}",
status_code=None,
response_data=None
)
except asyncio.TimeoutError:
logger.error("Ban System API request timeout")
raise BanSystemAPIError(
message="Request timeout",
status_code=None,
response_data=None
)
async def close(self):
"""Close the session."""
if self.session:
await self.session.close()
self.session = None
# === Stats ===
async def get_stats(self) -> Dict[str, Any]:
"""
Get overall system statistics.
GET /api/stats
"""
return await self._request("GET", "/api/stats")
async def get_stats_period(self, hours: int = 24) -> Dict[str, Any]:
"""
Get statistics for a specific period.
GET /api/stats/period?hours={hours}
"""
return await self._request("GET", "/api/stats/period", params={"hours": hours})
# === Users ===
async def get_users(
self,
offset: int = 0,
limit: int = 50,
status: Optional[str] = None,
) -> Dict[str, Any]:
"""
Get list of users with pagination.
GET /api/users
Args:
offset: Pagination offset
limit: Number of users per page (max 100)
status: Filter by status (over_limit, with_limit, unlimited)
"""
params = {"offset": offset, "limit": min(limit, 100)}
if status:
params["status"] = status
return await self._request("GET", "/api/users", params=params)
async def get_users_over_limit(self, limit: int = 50, window: bool = True) -> Dict[str, Any]:
"""
Get users who exceeded their device limit.
GET /api/users/over-limit
"""
return await self._request(
"GET",
"/api/users/over-limit",
params={"limit": limit, "window": str(window).lower()}
)
async def search_users(self, query: str) -> Dict[str, Any]:
"""
Search for a user.
GET /api/users/search/{query}
"""
return await self._request("GET", f"/api/users/search/{query}")
async def get_user(self, email: str) -> Dict[str, Any]:
"""
Get detailed user information.
GET /api/users/{email}
"""
return await self._request("GET", f"/api/users/{email}")
async def get_user_network(self, email: str) -> Dict[str, Any]:
"""
Get user network information (WiFi/Mobile detection).
GET /api/users/{email}/network
"""
return await self._request("GET", f"/api/users/{email}/network")
# === Punishments (Bans) ===
async def get_punishments(self) -> List[Dict[str, Any]]:
"""
Get list of active punishments (bans).
GET /api/punishments
"""
return await self._request("GET", "/api/punishments")
async def enable_user(self, user_id: str) -> Dict[str, Any]:
"""
Enable (unban) a user.
POST /api/punishments/{user_id}/enable
"""
return await self._request("POST", f"/api/punishments/{user_id}/enable")
async def ban_user(
self,
username: str,
minutes: int = 30,
reason: Optional[str] = None,
) -> Dict[str, Any]:
"""
Manually ban a user.
POST /api/ban
"""
params = {"username": username, "minutes": minutes}
if reason:
params["reason"] = reason
return await self._request("POST", "/api/ban", params=params)
async def get_punishment_history(self, query: str, limit: int = 20) -> List[Dict[str, Any]]:
"""
Get punishment history for a user.
GET /api/history/{query}
"""
return await self._request(
"GET",
f"/api/history/{query}",
params={"limit": limit}
)
# === Nodes ===
async def get_nodes(self, include_agent_stats: bool = True) -> List[Dict[str, Any]]:
"""
Get list of connected nodes.
GET /api/nodes
"""
return await self._request(
"GET",
"/api/nodes",
params={"include_agent_stats": str(include_agent_stats).lower()}
)
# === Agents ===
async def get_agents(
self,
search: Optional[str] = None,
health: Optional[str] = None,
status: Optional[str] = None,
sort_by: str = "name",
sort_order: str = "asc",
) -> Dict[str, Any]:
"""
Get list of monitoring agents.
GET /api/agents
Args:
search: Search query
health: Filter by health (healthy, warning, critical)
status: Filter by status (online, offline)
sort_by: Sort by field (name, sent, dropped, health)
sort_order: Sort order (asc, desc)
"""
params = {"sort_by": sort_by, "sort_order": sort_order}
if search:
params["search"] = search
if health:
params["health"] = health
if status:
params["status"] = status
return await self._request("GET", "/api/agents", params=params)
async def get_agents_summary(self) -> Dict[str, Any]:
"""
Get summary statistics for all agents.
GET /api/agents/summary
"""
return await self._request("GET", "/api/agents/summary")
async def get_agent_history(
self,
node_name: str,
hours: int = 24,
limit: int = 50,
) -> List[Dict[str, Any]]:
"""
Get agent statistics history.
GET /api/agents/{node_name}/history
"""
return await self._request(
"GET",
f"/api/agents/{node_name}/history",
params={"hours": hours, "limit": limit}
)
# === Traffic ===
async def get_traffic(self) -> Dict[str, Any]:
"""
Get overall traffic statistics.
GET /api/traffic
"""
return await self._request("GET", "/api/traffic")
async def get_traffic_top(self, limit: int = 20) -> List[Dict[str, Any]]:
"""
Get top users by traffic.
GET /api/traffic/top
"""
return await self._request("GET", "/api/traffic/top", params={"limit": limit})
async def get_user_traffic(self, username: str) -> Dict[str, Any]:
"""
Get traffic information for a specific user.
GET /api/traffic/user/{username}
"""
return await self._request("GET", f"/api/traffic/user/{username}")
async def get_traffic_violations(self, limit: int = 50) -> List[Dict[str, Any]]:
"""
Get list of traffic limit violations.
GET /api/traffic/violations
"""
return await self._request("GET", "/api/traffic/violations", params={"limit": limit})
# === Health ===
async def health_check(self) -> Dict[str, Any]:
"""
Check API health.
GET /health
"""
return await self._request("GET", "/health")
async def health_detailed(self) -> Dict[str, Any]:
"""
Get detailed health information.
GET /health/detailed
"""
return await self._request("GET", "/health/detailed")
# === Settings ===
async def get_settings(self) -> Dict[str, Any]:
"""
Get all settings with their definitions.
GET /api/settings
"""
return await self._request("GET", "/api/settings")
async def get_setting(self, key: str) -> Dict[str, Any]:
"""
Get a specific setting value.
GET /api/settings/{key}
"""
return await self._request("GET", f"/api/settings/{key}")
async def set_setting(self, key: str, value: Any) -> Dict[str, Any]:
"""
Set a setting value.
POST /api/settings/{key}?value={value}
"""
return await self._request("POST", f"/api/settings/{key}", params={"value": value})
async def toggle_setting(self, key: str) -> Dict[str, Any]:
"""
Toggle a boolean setting.
POST /api/settings/{key}/toggle
"""
return await self._request("POST", f"/api/settings/{key}/toggle")
async def whitelist_add(self, username: str) -> Dict[str, Any]:
"""
Add user to whitelist.
POST /api/settings/whitelist/add?username={username}
"""
return await self._request("POST", "/api/settings/whitelist/add", params={"username": username})
async def whitelist_remove(self, username: str) -> Dict[str, Any]:
"""
Remove user from whitelist.
POST /api/settings/whitelist/remove?username={username}
"""
return await self._request("POST", "/api/settings/whitelist/remove", params={"username": username})

View File

@@ -774,81 +774,53 @@ async def force_check_callback(callback: CallbackQuery):
@router.callback_query(F.data == "admin_mon_traffic_check")
@admin_required
async def traffic_check_callback(callback: CallbackQuery):
"""Ручная проверка трафика всех пользователей."""
"""Ручная проверка трафика — использует snapshot и дельту."""
try:
# Проверяем, включен ли мониторинг трафика
if not traffic_monitoring_scheduler.is_enabled():
await callback.answer(
"⚠️ Мониторинг трафика отключен в настройках\n"
"Включите TRAFFIC_MONITORING_ENABLED=true в .env",
"Включите TRAFFIC_FAST_CHECK_ENABLED=true в .env",
show_alert=True
)
return
await callback.answer("⏳ Запускаем проверку трафика...")
await callback.answer("⏳ Запускаем проверку трафика (дельта)...")
# Используем run_fast_check — он сравнивает с snapshot и отправляет уведомления
from app.services.traffic_monitoring_service import traffic_monitoring_scheduler_v2
# Устанавливаем бота, если не установлен
if not traffic_monitoring_scheduler.bot:
traffic_monitoring_scheduler.set_bot(callback.bot)
if not traffic_monitoring_scheduler_v2.bot:
traffic_monitoring_scheduler_v2.set_bot(callback.bot)
checked_count = 0
exceeded_count = 0
exceeded_users = []
violations = await traffic_monitoring_scheduler_v2.run_fast_check_now()
async for db in get_db():
from app.database.crud.user import get_users_with_active_subscriptions
users = await get_users_with_active_subscriptions(db)
for user in users:
if user.remnawave_uuid:
is_exceeded, traffic_info = await traffic_monitoring_service.check_user_traffic_threshold(
db,
user.remnawave_uuid,
user.telegram_id
)
checked_count += 1
if is_exceeded:
exceeded_count += 1
total_gb = traffic_info.get('total_gb', 0)
exceeded_users.append({
'telegram_id': user.telegram_id,
'name': user.full_name or str(user.telegram_id),
'traffic_gb': total_gb
})
# Отправляем уведомление админам
if traffic_monitoring_scheduler._should_send_notification(user.remnawave_uuid):
await traffic_monitoring_service.process_suspicious_traffic(
db,
user.remnawave_uuid,
traffic_info,
callback.bot
)
traffic_monitoring_scheduler._record_notification(user.remnawave_uuid)
break
threshold_gb = settings.TRAFFIC_THRESHOLD_GB_PER_DAY
# Получаем информацию о snapshot
snapshot_age = await traffic_monitoring_scheduler_v2.service.get_snapshot_age_minutes()
threshold_gb = traffic_monitoring_scheduler_v2.service.get_fast_check_threshold_gb()
text = f"""
📊 <b>Проверка трафика завершена</b>
🔍 <b>Результаты:</b>
• Проверено пользователей: {checked_count}
• Превышений порога: {exceeded_count}
Порог: {threshold_gb} ГБ/сутки
🔍 <b>Результаты (дельта):</b>
• Превышений за интервал: {len(violations)}
• Порог дельты: {threshold_gb} ГБ
Возраст snapshot: {snapshot_age:.1f} мин
🕐 <b>Время проверки:</b> {datetime.now().strftime('%H:%M:%S')}
"""
if exceeded_users:
text += "\n⚠️ <b>Пользователи с превышением:</b>\n"
for u in exceeded_users[:10]:
text += f"{u['name']}: {u['traffic_gb']:.1f} ГБ\n"
if len(exceeded_users) > 10:
text += f"... и ещё {len(exceeded_users) - 10}\n"
if violations:
text += "\n⚠️ <b>Превышения дельты:</b>\n"
for v in violations[:10]:
name = v.full_name or v.user_uuid[:8]
text += f"{name}: +{v.used_traffic_gb:.1f} ГБ\n"
if len(violations) > 10:
text += f"... и ещё {len(violations) - 10}\n"
text += "\n📨 Уведомления отправлены (с учётом кулдауна)"
else:
text += "\n✅ Превышений не обнаружено"
from aiogram.types import InlineKeyboardMarkup, InlineKeyboardButton
keyboard = InlineKeyboardMarkup(inline_keyboard=[

View File

@@ -131,12 +131,36 @@ class DisplayNameRestrictionMiddleware(BaseMiddleware):
cleaned = ZERO_WIDTH_PATTERN.sub("", value)
lower_value = cleaned.lower()
# Убраны жёсткие проверки на @ и паттерны ссылок - слишком много ложных срабатываний
# Теперь проверяем только по настраиваемым ключевым словам из DISPLAY_NAME_BANNED_KEYWORDS
if "@" in cleaned or "" in cleaned:
return True
if any(pattern.search(lower_value) for pattern in LINK_PATTERNS):
return True
# Проверяем обфусцированные ссылки типа "t . m e" или "т м е"
# Но НЕ блокируем если это часть обычного слова/имени
domain_match = DOMAIN_OBFUSCATION_PATTERN.search(lower_value)
if domain_match:
# Проверяем контекст: если "tme" внутри слова (с буквами с обеих сторон) - пропускаем
start_pos = domain_match.start()
end_pos = domain_match.end()
# Проверяем символ ДО и ПОСЛЕ совпадения
has_letter_before = start_pos > 0 and lower_value[start_pos - 1].isalpha()
has_letter_after = end_pos < len(lower_value) and lower_value[end_pos].isalpha()
# Если с ОБЕИХ сторон буквы - скорее всего это просто имя/фамилия
if not (has_letter_before and has_letter_after):
return True
normalized = self._normalize_text(lower_value)
collapsed = COLLAPSE_PATTERN.sub("", normalized)
# Проверяем "tme" с контекстом (ловим t.me ссылки, но не случайные совпадения в именах)
# Ищем tme в начале, конце, или с пробелами/спецсимволами вокруг
if re.search(r"(?:^|[^a-zа-яё])tme(?:[^a-zа-яё]|$)", collapsed, re.IGNORECASE):
return True
banned_keywords = settings.get_display_name_banned_keywords()
# Если список пустой - не блокируем никого

View File

@@ -267,7 +267,10 @@ class CloudPaymentsPaymentMixin:
# Умная автоактивация если автопокупка не сработала
if not auto_purchase_success:
try:
await auto_activate_subscription_after_topup(db, user, bot=getattr(self, "bot", None))
# Игнорируем notification_sent т.к. здесь нет дополнительных уведомлений
await auto_activate_subscription_after_topup(
db, user, bot=getattr(self, "bot", None), topup_amount=amount_kopeks
)
except Exception as error:
logger.exception("Ошибка умной автоактивации после CloudPayments: %s", error)

View File

@@ -141,19 +141,75 @@ class PaymentCommonMixin:
)
try:
keyboard = await self.build_topup_success_keyboard(user_snapshot)
payment_method = payment_method_title or "Банковская карта (YooKassa)"
message = (
"✅ <b>Платеж успешно завершен!</b>\n\n"
f"💰 Сумма: {settings.format_price(amount_kopeks)}\n"
f"💳 Способ: {payment_method}\n\n"
"Средства зачислены на ваш баланс!\n\n"
"⚠️ <b>Важно:</b> Пополнение баланса не активирует подписку автоматически. "
"Обязательно активируйте подписку отдельно!\n\n"
f"🔄 При наличии сохранённой корзины подписки и включенной автопокупке, "
f"подписка будет приобретена автоматически после пополнения баланса."
)
# Проверяем, нужно ли показывать яркое предупреждение об активации
if settings.SHOW_ACTIVATION_PROMPT_AFTER_TOPUP:
# Определяем статус подписки для выбора правильной кнопки
has_active_subscription = False
if user_snapshot:
try:
subscription = user_snapshot.subscription
has_active_subscription = bool(
subscription
and not getattr(subscription, "is_trial", False)
and getattr(subscription, "is_active", False)
)
except Exception:
pass
# Яркое сообщение с восклицательными знаками
message = (
"✅ <b>Платеж успешно завершен!</b>\n\n"
f"💰 Сумма: {settings.format_price(amount_kopeks)}\n"
f"💳 Способ: {payment_method}\n\n"
"💎 Средства зачислены на ваш баланс!\n\n"
"‼️ <b>ВНИМАНИЕ! ОБЯЗАТЕЛЬНО АКТИВИРУЙТЕ ПОДПИСКУ!</b> ‼️\n\n"
"⚠️ Пополнение баланса <b>НЕ АКТИВИРУЕТ</b> подписку автоматически!\n\n"
"👇 <b>НАЖМИТЕ КНОПКУ НИЖЕ ДЛЯ АКТИВАЦИИ</b> 👇"
)
# Формируем клавиатуру с кнопками действий
keyboard_rows: list[list[InlineKeyboardButton]] = []
# Кнопка активации или продления в зависимости от статуса
if has_active_subscription:
# Активная платная подписка - показываем продление и изменение устройств
keyboard_rows.append([
build_miniapp_or_callback_button(
text="🔄 ПРОДЛИТЬ ПОДПИСКУ",
callback_data="subscription_extend",
)
])
keyboard_rows.append([
build_miniapp_or_callback_button(
text="📱 Изменить количество устройств",
callback_data="subscription_change_devices",
)
])
else:
# Нет подписки или истекла - показываем только активацию
keyboard_rows.append([
build_miniapp_or_callback_button(
text="🔥 АКТИВИРОВАТЬ ПОДПИСКУ",
callback_data="menu_buy",
)
])
keyboard = InlineKeyboardMarkup(inline_keyboard=keyboard_rows)
else:
# Стандартное сообщение с полной клавиатурой
keyboard = await self.build_topup_success_keyboard(user_snapshot)
message = (
"✅ <b>Платеж успешно завершен!</b>\n\n"
f"💰 Сумма: {settings.format_price(amount_kopeks)}\n"
f"💳 Способ: {payment_method}\n\n"
"Средства зачислены на ваш баланс!\n\n"
"⚠️ <b>Важно:</b> Пополнение баланса не активирует подписку автоматически. "
"Обязательно активируйте подписку отдельно!\n\n"
f"🔄 При наличии сохранённой корзины подписки и включенной автопокупке, "
f"подписка будет приобретена автоматически после пополнения баланса."
)
await self.bot.send_message(
chat_id=telegram_id,

View File

@@ -377,12 +377,14 @@ class CryptoBotPaymentMixin:
has_saved_cart = False
# Умная автоактивация если автопокупка не сработала
activation_notification_sent = False
if not auto_purchase_success:
try:
await auto_activate_subscription_after_topup(
_, activation_notification_sent = await auto_activate_subscription_after_topup(
db,
user,
bot=bot_instance,
topup_amount=amount_kopeks,
)
except Exception as auto_activate_error:
logger.error(
@@ -392,7 +394,8 @@ class CryptoBotPaymentMixin:
exc_info=True,
)
if has_saved_cart and bot_instance:
# Отправляем уведомление только если его ещё не отправили
if has_saved_cart and bot_instance and not activation_notification_sent:
from app.localization.texts import get_texts
texts = get_texts(user.language)

View File

@@ -411,9 +411,12 @@ class FreekassaPaymentMixin:
has_saved_cart = False
# Умная автоактивация если автопокупка не сработала
activation_notification_sent = False
if not auto_purchase_success:
try:
await auto_activate_subscription_after_topup(db, user, bot=getattr(self, "bot", None))
_, activation_notification_sent = await auto_activate_subscription_after_topup(
db, user, bot=getattr(self, "bot", None), topup_amount=payment.amount_kopeks
)
except Exception as auto_activate_error:
logger.error(
"Ошибка умной автоактивации для пользователя %s: %s",
@@ -422,7 +425,8 @@ class FreekassaPaymentMixin:
exc_info=True,
)
if has_saved_cart and getattr(self, "bot", None):
# Отправляем уведомление только если его ещё не отправили
if has_saved_cart and getattr(self, "bot", None) and not activation_notification_sent:
from app.localization.texts import get_texts
texts = get_texts(user.language)

View File

@@ -396,9 +396,12 @@ class MulenPayPaymentMixin:
has_saved_cart = False
# Умная автоактивация если автопокупка не сработала
activation_notification_sent = False
if not auto_purchase_success:
try:
await auto_activate_subscription_after_topup(db, user, bot=getattr(self, "bot", None))
_, activation_notification_sent = await auto_activate_subscription_after_topup(
db, user, bot=getattr(self, "bot", None), topup_amount=payment.amount_kopeks
)
except Exception as auto_activate_error:
logger.error(
"Ошибка умной автоактивации для пользователя %s: %s",
@@ -407,7 +410,8 @@ class MulenPayPaymentMixin:
exc_info=True,
)
if has_saved_cart and getattr(self, "bot", None):
# Отправляем уведомление только если его ещё не отправили
if has_saved_cart and getattr(self, "bot", None) and not activation_notification_sent:
# Если у пользователя есть сохраненная корзина,
# отправляем ему уведомление с кнопкой вернуться к оформлению
from app.localization.texts import get_texts

View File

@@ -499,9 +499,12 @@ class Pal24PaymentMixin:
has_saved_cart = False
# Умная автоактивация если автопокупка не сработала
activation_notification_sent = False
if not auto_purchase_success:
try:
await auto_activate_subscription_after_topup(db, user, bot=getattr(self, "bot", None))
_, activation_notification_sent = await auto_activate_subscription_after_topup(
db, user, bot=getattr(self, "bot", None), topup_amount=payment.amount_kopeks
)
except Exception as auto_activate_error:
logger.error(
"Ошибка умной автоактивации для пользователя %s: %s",
@@ -510,7 +513,8 @@ class Pal24PaymentMixin:
exc_info=True,
)
if has_saved_cart and getattr(self, "bot", None):
# Отправляем уведомление только если его ещё не отправили
if has_saved_cart and getattr(self, "bot", None) and not activation_notification_sent:
from app.localization.texts import get_texts
texts = get_texts(user.language)

View File

@@ -485,9 +485,12 @@ class PlategaPaymentMixin:
has_saved_cart = False
# Умная автоактивация если автопокупка не сработала
activation_notification_sent = False
if not auto_purchase_success:
try:
await auto_activate_subscription_after_topup(db, user, bot=getattr(self, "bot", None))
_, activation_notification_sent = await auto_activate_subscription_after_topup(
db, user, bot=getattr(self, "bot", None), topup_amount=payment.amount_kopeks
)
except Exception as auto_activate_error:
logger.error(
"Ошибка умной автоактивации для пользователя %s: %s",
@@ -496,7 +499,8 @@ class PlategaPaymentMixin:
exc_info=True,
)
if has_saved_cart and getattr(self, "bot", None):
# Отправляем уведомление только если его ещё не отправили
if has_saved_cart and getattr(self, "bot", None) and not activation_notification_sent:
from app.localization.texts import get_texts
texts = get_texts(user.language)

View File

@@ -534,12 +534,14 @@ class TelegramStarsMixin:
has_saved_cart = False
# Умная автоактивация если автопокупка не сработала
activation_notification_sent = False
if not auto_purchase_success:
try:
await auto_activate_subscription_after_topup(
_, activation_notification_sent = await auto_activate_subscription_after_topup(
db,
user,
bot=getattr(self, "bot", None),
topup_amount=amount_kopeks,
)
except Exception as auto_activate_error:
logger.error(
@@ -549,7 +551,8 @@ class TelegramStarsMixin:
exc_info=True,
)
if has_saved_cart and getattr(self, "bot", None):
# Отправляем уведомление только если его ещё не отправили
if has_saved_cart and getattr(self, "bot", None) and not activation_notification_sent:
texts = get_texts(user.language)
cart_message = texts.t(
"BALANCE_TOPUP_CART_REMINDER_DETAILED",

View File

@@ -569,9 +569,12 @@ class WataPaymentMixin:
has_saved_cart = False
# Умная автоактивация если автопокупка не сработала
activation_notification_sent = False
if not auto_purchase_success:
try:
await auto_activate_subscription_after_topup(db, user, bot=getattr(self, "bot", None))
_, activation_notification_sent = await auto_activate_subscription_after_topup(
db, user, bot=getattr(self, "bot", None), topup_amount=payment.amount_kopeks
)
except Exception as auto_activate_error:
logger.error(
"Ошибка умной автоактивации для пользователя %s: %s",
@@ -580,7 +583,8 @@ class WataPaymentMixin:
exc_info=True,
)
if has_saved_cart and getattr(self, "bot", None):
# Отправляем уведомление только если его ещё не отправили
if has_saved_cart and getattr(self, "bot", None) and not activation_notification_sent:
from app.localization.texts import get_texts
texts = get_texts(user.language)

View File

@@ -848,58 +848,61 @@ class YooKassaPaymentMixin:
exc_info=True,
)
if has_saved_cart and getattr(self, "bot", None):
# Если у пользователя есть сохраненная корзина,
# отправляем ему уведомление с кнопкой вернуться к оформлению
from app.localization.texts import get_texts
from aiogram import types
# Если включен яркий промпт активации, пропускаем старое уведомление
# т.к. оно будет отправлено через _send_payment_success_notification
if not settings.SHOW_ACTIVATION_PROMPT_AFTER_TOPUP:
if has_saved_cart and getattr(self, "bot", None):
# Если у пользователя есть сохраненная корзина,
# отправляем ему уведомление с кнопкой вернуться к оформлению
from app.localization.texts import get_texts
from aiogram import types
texts = get_texts(user.language)
cart_message = texts.BALANCE_TOPUP_CART_REMINDER_DETAILED.format(
total_amount=settings.format_price(payment.amount_kopeks)
)
texts = get_texts(user.language)
cart_message = texts.BALANCE_TOPUP_CART_REMINDER_DETAILED.format(
total_amount=settings.format_price(payment.amount_kopeks)
)
# Создаем клавиатуру с кнопками
keyboard = types.InlineKeyboardMarkup(
inline_keyboard=[
[
types.InlineKeyboardButton(
text=texts.RETURN_TO_SUBSCRIPTION_CHECKOUT,
callback_data="return_to_saved_cart",
)
],
[
types.InlineKeyboardButton(
text="💰 Мой баланс",
callback_data="menu_balance",
)
],
[
types.InlineKeyboardButton(
text="🏠 Главное меню",
callback_data="back_to_menu",
)
],
]
)
# Создаем клавиатуру с кнопками
keyboard = types.InlineKeyboardMarkup(
inline_keyboard=[
[
types.InlineKeyboardButton(
text=texts.RETURN_TO_SUBSCRIPTION_CHECKOUT,
callback_data="return_to_saved_cart",
)
],
[
types.InlineKeyboardButton(
text="💰 Мой баланс",
callback_data="menu_balance",
)
],
[
types.InlineKeyboardButton(
text="🏠 Главное меню",
callback_data="back_to_menu",
)
],
]
)
await self.bot.send_message(
chat_id=user.telegram_id,
text=f"✅ Баланс пополнен на {settings.format_price(payment.amount_kopeks)}!\n\n"
f"⚠️ <b>Важно:</b> Пополнение баланса не активирует подписку автоматически. "
f"Обязательно активируйте подписку отдельно!\n\n"
f"🔄 При наличии сохранённой корзины подписки и включенной автопокупке, "
f"подписка будет приобретена автоматически после пополнения баланса.\n\n{cart_message}",
reply_markup=keyboard,
)
logger.info(
f"Отправлено уведомление с кнопкой возврата к оформлению подписки пользователю {user.id}"
)
else:
logger.info(
"У пользователя %s нет сохраненной корзины, бот недоступен или покупка уже выполнена",
user.id,
)
await self.bot.send_message(
chat_id=user.telegram_id,
text=f"✅ Баланс пополнен на {settings.format_price(payment.amount_kopeks)}!\n\n"
f"⚠️ <b>Важно:</b> Пополнение баланса не активирует подписку автоматически. "
f"Обязательно активируйте подписку отдельно!\n\n"
f"🔄 При наличии сохранённой корзины подписки и включенной автопокупке, "
f"подписка будет приобретена автоматически после пополнения баланса.\n\n{cart_message}",
reply_markup=keyboard,
)
logger.info(
f"Отправлено уведомление с кнопкой возврата к оформлению подписки пользователю {user.id}"
)
else:
logger.info(
"У пользователя %s нет сохраненной корзины, бот недоступен или покупка уже выполнена",
user.id,
)
except Exception as e:
logger.error(
f"Критическая ошибка при работе с сохраненной корзиной для пользователя {user.id}: {e}",

View File

@@ -717,7 +717,8 @@ async def auto_activate_subscription_after_topup(
user: User,
*,
bot: Optional[Bot] = None,
) -> bool:
topup_amount: Optional[int] = None,
) -> tuple[bool, bool]:
"""
Умная автоактивация после пополнения баланса.
@@ -727,6 +728,14 @@ async def auto_activate_subscription_after_topup(
- Если подписки нет — создаёт новую с дефолтными параметрами
Выбирает максимальный период, который можно оплатить из баланса.
Args:
topup_amount: Сумма пополнения в копейках (для отображения в уведомлении)
Returns:
tuple[bool, bool]: (success, notification_sent)
- success: True если подписка активирована
- notification_sent: True если уведомление отправлено пользователю
"""
from datetime import datetime
from app.database.crud.subscription import get_subscription_by_user_id, create_paid_subscription
@@ -739,70 +748,25 @@ async def auto_activate_subscription_after_topup(
from app.services.admin_notification_service import AdminNotificationService
if not user or not getattr(user, "id", None):
return False
return (False, False)
subscription = await get_subscription_by_user_id(db, user.id)
# Если автоактивация отключена - только отправляем предупреждение
# Если автоактивация отключена - уведомление отправится из _send_payment_success_notification
if not settings.is_auto_activate_after_topup_enabled():
# Отправляем предупреждение если включен режим и нет активной подписки
if (
settings.SHOW_ACTIVATION_PROMPT_AFTER_TOPUP
and bot
and (not subscription or subscription.status not in ("active", "ACTIVE"))
):
try:
texts = get_texts(getattr(user, "language", "ru"))
warning_message = (
f"✅ <b>Баланс пополнен!</b>\n\n"
f"💳 Текущий баланс: {settings.format_price(user.balance_kopeks)}\n\n"
f"{'' * 25}\n\n"
f"⚠️ <b>ВАЖНО!</b> ⚠️\n\n"
f"🔴 <b>ПОДПИСКА НЕ АКТИВНА!</b>\n\n"
f"Пополнение баланса <b>НЕ активирует</b> подписку автоматически!\n\n"
f"👇 <b>Выберите действие:</b>"
)
keyboard = InlineKeyboardMarkup(
inline_keyboard=[
[InlineKeyboardButton(
text="🚀 АКТИВИРОВАТЬ ПОДПИСКУ",
callback_data="subscription_buy",
)],
[InlineKeyboardButton(
text="💎 ПРОДЛИТЬ ПОДПИСКУ",
callback_data="subscription_extend",
)],
[InlineKeyboardButton(
text="📱 ДОБАВИТЬ УСТРОЙСТВА",
callback_data="subscription_add_devices",
)],
]
)
await bot.send_message(
chat_id=user.telegram_id,
text=warning_message,
reply_markup=keyboard,
parse_mode="HTML",
)
logger.info(
"⚠️ Отправлено предупреждение об активации подписки пользователю %s (автоактивация выключена)",
user.telegram_id,
)
except Exception as notify_error:
logger.warning(
"⚠️ Не удалось отправить предупреждение пользователю %s: %s",
user.telegram_id,
notify_error,
)
return False
logger.info(
"⚠️ Автоактивация отключена для пользователя %s, уведомление будет отправлено из payment service",
user.telegram_id,
)
return (False, False)
# Если подписка активна — ничего не делаем
# Если подписка активна — ничего не делаем (автоактивация включена, но подписка уже есть)
if subscription and subscription.status == "ACTIVE" and subscription.end_date > datetime.utcnow():
logger.info(
"🔁 Автоактивация: у пользователя %s уже активная подписка, пропускаем",
user.telegram_id,
)
return False
return (False, False)
# Определяем параметры подписки
if subscription:
@@ -839,7 +803,7 @@ async def auto_activate_subscription_after_topup(
if not available_periods:
logger.warning("🔁 Автоактивация: нет доступных периодов подписки")
return False
return (False, False)
subscription_service = SubscriptionService()
@@ -875,56 +839,12 @@ async def auto_activate_subscription_after_topup(
user.telegram_id,
balance,
)
# Отправляем предупреждение пользователю если включен режим и подписки нет
if (
settings.SHOW_ACTIVATION_PROMPT_AFTER_TOPUP
and bot
and (not subscription or subscription.status not in ("active", "ACTIVE"))
):
try:
texts = get_texts(getattr(user, "language", "ru"))
warning_message = (
f"✅ <b>Баланс пополнен!</b>\n\n"
f"💳 Текущий баланс: {settings.format_price(balance)}\n\n"
f"{'' * 25}\n\n"
f"⚠️ <b>ВАЖНО!</b> ⚠️\n\n"
f"🔴 <b>ПОДПИСКА НЕ АКТИВНА!</b>\n\n"
f"Пополнение баланса <b>НЕ активирует</b> подписку автоматически!\n\n"
f"👇 <b>Выберите действие:</b>"
)
keyboard = InlineKeyboardMarkup(
inline_keyboard=[
[InlineKeyboardButton(
text="🚀 АКТИВИРОВАТЬ ПОДПИСКУ",
callback_data="subscription_buy",
)],
[InlineKeyboardButton(
text="💎 ПРОДЛИТЬ ПОДПИСКУ",
callback_data="subscription_extend",
)],
[InlineKeyboardButton(
text="📱 ДОБАВИТЬ УСТРОЙСТВА",
callback_data="subscription_add_devices",
)],
]
)
await bot.send_message(
chat_id=user.telegram_id,
text=warning_message,
reply_markup=keyboard,
parse_mode="HTML",
)
logger.info(
"⚠️ Отправлено предупреждение об активации подписки пользователю %s",
user.telegram_id,
)
except Exception as notify_error:
logger.warning(
"⚠️ Не удалось отправить предупреждение пользователю %s: %s",
user.telegram_id,
notify_error,
)
return False
# Уведомление отправится из _send_payment_success_notification
logger.info(
"⚠️ Недостаточно средств для автоактивации пользователя %s, уведомление будет отправлено из payment service",
user.telegram_id,
)
return (False, False)
texts = get_texts(getattr(user, "language", "ru"))
@@ -1085,7 +1005,7 @@ async def auto_activate_subscription_after_topup(
notify_error,
)
return True
return (True, True) # success=True, notification_sent=True (об активации)
except Exception as e:
logger.error(
@@ -1094,6 +1014,7 @@ async def auto_activate_subscription_after_topup(
e,
exc_info=True,
)
return (False, False)
await db.rollback()
return False

View File

@@ -259,6 +259,7 @@ class BotConfigurationService:
"PAYMENT_BALANCE_TEMPLATE": "PAYMENT",
"PAYMENT_SUBSCRIPTION_TEMPLATE": "PAYMENT",
"AUTO_PURCHASE_AFTER_TOPUP_ENABLED": "PAYMENT",
"SHOW_ACTIVATION_PROMPT_AFTER_TOPUP": "PAYMENT",
"SIMPLE_SUBSCRIPTION_ENABLED": "SIMPLE_SUBSCRIPTION",
"SIMPLE_SUBSCRIPTION_PERIOD_DAYS": "SIMPLE_SUBSCRIPTION",
"SIMPLE_SUBSCRIPTION_DEVICE_LIMIT": "SIMPLE_SUBSCRIPTION",
@@ -271,6 +272,10 @@ class BotConfigurationService:
"NOTIFICATION_CACHE_HOURS": "NOTIFICATIONS",
"MONITORING_LOGS_RETENTION_DAYS": "MONITORING",
"MONITORING_INTERVAL": "MONITORING",
"TRAFFIC_MONITORING_ENABLED": "MONITORING",
"TRAFFIC_MONITORING_INTERVAL_HOURS": "MONITORING",
"TRAFFIC_MONITORED_NODES": "MONITORING",
"TRAFFIC_SNAPSHOT_TTL_HOURS": "MONITORING",
"ENABLE_LOGO_MODE": "INTERFACE_BRANDING",
"LOGO_FILE": "INTERFACE_BRANDING",
"HIDE_SUBSCRIPTION_LINK": "INTERFACE_SUBSCRIPTION",
@@ -570,6 +575,19 @@ class BotConfigurationService:
"Используйте с осторожностью: средства будут списаны мгновенно, если корзина найдена."
),
},
"SHOW_ACTIVATION_PROMPT_AFTER_TOPUP": {
"description": (
"Включает режим яркого промпта активации подписки после пополнения баланса. "
"Вместо обычного уведомления пользователь получит яркое сообщение с восклицательными знаками "
"и кнопками для активации/продления подписки или изменения количества устройств."
),
"format": "Булево значение.",
"example": "true",
"warning": (
"При включении пользователи будут получать только яркое уведомление без кнопок баланса и главного меню. "
"Эти кнопки появятся после выполнения действия (активация/продление/изменение устройств)."
),
},
"SUPPORT_TICKET_SLA_MINUTES": {
"description": "Лимит времени для ответа модераторов на тикет в минутах.",
"format": "Целое число от 1 до 1440.",
@@ -710,6 +728,58 @@ class BotConfigurationService:
"warning": "Убедитесь, что конфигурация существует в панели и содержит нужные приложения.",
"dependencies": "Настроенное подключение к RemnaWave API",
},
"TRAFFIC_MONITORING_ENABLED": {
"description": (
"Включает автоматический мониторинг трафика пользователей. "
"Система отслеживает изменения трафика (дельту) и сохраняет snapshot в Redis. "
"При превышении порогов отправляются уведомления пользователям и админам."
),
"format": "Булево значение.",
"example": "true",
"warning": (
"Требует настроенного подключения к Redis. "
"При включении будет запущен фоновый мониторинг трафика по расписанию."
),
"dependencies": "Redis, TRAFFIC_MONITORING_INTERVAL_HOURS, TRAFFIC_SNAPSHOT_TTL_HOURS",
},
"TRAFFIC_MONITORING_INTERVAL_HOURS": {
"description": (
"Интервал проверки трафика в часах. "
"Каждые N часов система проверяет трафик всех активных пользователей и сравнивает с предыдущим snapshot."
),
"format": "Целое число часов (минимум 1).",
"example": "24",
"warning": (
"Слишком маленький интервал может создать большую нагрузку на RemnaWave API. "
"Рекомендуется 24 часа для ежедневного мониторинга."
),
"dependencies": "TRAFFIC_MONITORING_ENABLED",
},
"TRAFFIC_MONITORED_NODES": {
"description": (
"Список UUID нод для мониторинга трафика через запятую. "
"Если пусто - мониторятся все ноды. "
"Позволяет ограничить мониторинг только определенными серверами."
),
"format": "UUID через запятую или пусто для всех нод.",
"example": "d4aa2b8c-9a36-4f31-93a2-6f07dad05fba, a1b2c3d4-5678-90ab-cdef-1234567890ab",
"warning": "UUID должны существовать в RemnaWave, иначе мониторинг не будет работать.",
"dependencies": "TRAFFIC_MONITORING_ENABLED",
},
"TRAFFIC_SNAPSHOT_TTL_HOURS": {
"description": (
"Время жизни (TTL) snapshot трафика в Redis в часах. "
"Snapshot используется для вычисления дельты (изменения трафика) между проверками. "
"После истечения TTL snapshot удаляется и создается новый."
),
"format": "Целое число часов (минимум 1).",
"example": "24",
"warning": (
"TTL должен быть >= интервала мониторинга. "
"Если TTL меньше интервала, snapshot будет удален до следующей проверки."
),
"dependencies": "TRAFFIC_MONITORING_ENABLED, Redis",
},
}
@classmethod

File diff suppressed because it is too large Load Diff

View File

@@ -316,9 +316,12 @@ class TributeService:
has_saved_cart = False
# Умная автоактивация если автопокупка не сработала
activation_notification_sent = False
if not auto_purchase_success:
try:
await auto_activate_subscription_after_topup(session, user)
_, activation_notification_sent = await auto_activate_subscription_after_topup(
session, user, bot=self.bot, topup_amount=amount_kopeks
)
except Exception as auto_activate_error:
logger.error(
"Ошибка умной автоактивации для пользователя %s: %s",
@@ -327,7 +330,8 @@ class TributeService:
exc_info=True,
)
if has_saved_cart and self.bot:
# Отправляем уведомление только если его ещё не отправили
if has_saved_cart and self.bot and not activation_notification_sent:
# Если у пользователя есть сохраненная корзина,
# отправляем ему уведомление с кнопкой вернуться к оформлению
from app.localization.texts import get_texts

View File

@@ -622,10 +622,9 @@ async def main():
traffic_monitoring_task = asyncio.create_task(
traffic_monitoring_scheduler.start_monitoring()
)
interval_hours = traffic_monitoring_scheduler.get_interval_hours()
threshold_gb = settings.TRAFFIC_THRESHOLD_GB_PER_DAY
stage.log(f"Интервал проверки: {interval_hours} ч")
stage.log(f"Порог трафика: {threshold_gb} ГБ/сутки")
# Показываем информацию о новом мониторинге v2
status_info = traffic_monitoring_scheduler.get_status_info()
stage.log(status_info)
else:
traffic_monitoring_task = None
stage.skip("Мониторинг трафика отключен настройками")

View File

@@ -0,0 +1,380 @@
"""
Тесты для хранения snapshot трафика в Redis.
"""
import pytest
from datetime import datetime, timedelta
from unittest.mock import AsyncMock, MagicMock, patch
from app.services.traffic_monitoring_service import (
TrafficMonitoringServiceV2,
TRAFFIC_SNAPSHOT_KEY,
TRAFFIC_SNAPSHOT_TIME_KEY,
TRAFFIC_NOTIFICATION_CACHE_KEY,
)
@pytest.fixture
def service():
"""Создаёт экземпляр сервиса для тестов."""
return TrafficMonitoringServiceV2()
@pytest.fixture
def mock_cache():
"""Мок для cache сервиса."""
with patch('app.services.traffic_monitoring_service.cache') as mock:
mock.set = AsyncMock(return_value=True)
mock.get = AsyncMock(return_value=None)
yield mock
@pytest.fixture
def sample_snapshot():
"""Пример snapshot данных."""
return {
"uuid-1": 1073741824.0, # 1 GB
"uuid-2": 2147483648.0, # 2 GB
"uuid-3": 5368709120.0, # 5 GB
}
# ============== Тесты сохранения snapshot в Redis ==============
async def test_save_snapshot_to_redis_success(service, mock_cache, sample_snapshot):
"""Тест успешного сохранения snapshot в Redis."""
mock_cache.set = AsyncMock(return_value=True)
result = await service._save_snapshot_to_redis(sample_snapshot)
assert result is True
assert mock_cache.set.call_count == 2 # snapshot + time
# Проверяем что сохранён snapshot
first_call = mock_cache.set.call_args_list[0]
assert first_call[0][0] == TRAFFIC_SNAPSHOT_KEY
assert first_call[0][1] == sample_snapshot
async def test_save_snapshot_to_redis_failure(service, mock_cache, sample_snapshot):
"""Тест неудачного сохранения snapshot в Redis."""
mock_cache.set = AsyncMock(return_value=False)
result = await service._save_snapshot_to_redis(sample_snapshot)
assert result is False
async def test_save_snapshot_to_redis_exception(service, mock_cache, sample_snapshot):
"""Тест обработки исключения при сохранении."""
mock_cache.set = AsyncMock(side_effect=Exception("Redis error"))
result = await service._save_snapshot_to_redis(sample_snapshot)
assert result is False
# ============== Тесты загрузки snapshot из Redis ==============
async def test_load_snapshot_from_redis_success(service, mock_cache, sample_snapshot):
"""Тест успешной загрузки snapshot из Redis."""
mock_cache.get = AsyncMock(return_value=sample_snapshot)
result = await service._load_snapshot_from_redis()
assert result == sample_snapshot
mock_cache.get.assert_called_once_with(TRAFFIC_SNAPSHOT_KEY)
async def test_load_snapshot_from_redis_empty(service, mock_cache):
"""Тест загрузки когда snapshot отсутствует."""
mock_cache.get = AsyncMock(return_value=None)
result = await service._load_snapshot_from_redis()
assert result is None
async def test_load_snapshot_from_redis_invalid_data(service, mock_cache):
"""Тест загрузки невалидных данных."""
mock_cache.get = AsyncMock(return_value="not a dict")
result = await service._load_snapshot_from_redis()
assert result is None
async def test_load_snapshot_from_redis_exception(service, mock_cache):
"""Тест обработки исключения при загрузке."""
mock_cache.get = AsyncMock(side_effect=Exception("Redis error"))
result = await service._load_snapshot_from_redis()
assert result is None
# ============== Тесты времени snapshot ==============
async def test_get_snapshot_time_from_redis_success(service, mock_cache):
"""Тест получения времени snapshot."""
test_time = datetime(2024, 1, 15, 12, 30, 0)
mock_cache.get = AsyncMock(return_value=test_time.isoformat())
result = await service._get_snapshot_time_from_redis()
assert result == test_time
mock_cache.get.assert_called_once_with(TRAFFIC_SNAPSHOT_TIME_KEY)
async def test_get_snapshot_time_from_redis_empty(service, mock_cache):
"""Тест когда время отсутствует."""
mock_cache.get = AsyncMock(return_value=None)
result = await service._get_snapshot_time_from_redis()
assert result is None
# ============== Тесты has_snapshot ==============
async def test_has_snapshot_redis_exists(service, mock_cache, sample_snapshot):
"""Тест has_snapshot когда snapshot есть в Redis."""
mock_cache.get = AsyncMock(return_value=sample_snapshot)
result = await service.has_snapshot()
assert result is True
async def test_has_snapshot_memory_fallback(service, mock_cache):
"""Тест has_snapshot с fallback на память."""
mock_cache.get = AsyncMock(return_value=None)
# Устанавливаем данные в память
service._memory_snapshot = {"uuid-1": 1000.0}
service._memory_snapshot_time = datetime.utcnow()
result = await service.has_snapshot()
assert result is True
async def test_has_snapshot_none(service, mock_cache):
"""Тест has_snapshot когда snapshot нет нигде."""
mock_cache.get = AsyncMock(return_value=None)
service._memory_snapshot = {}
service._memory_snapshot_time = None
result = await service.has_snapshot()
assert result is False
# ============== Тесты get_snapshot_age_minutes ==============
async def test_get_snapshot_age_minutes_from_redis(service, mock_cache):
"""Тест возраста snapshot из Redis."""
# Snapshot создан 30 минут назад
past_time = datetime.utcnow() - timedelta(minutes=30)
mock_cache.get = AsyncMock(return_value=past_time.isoformat())
result = await service.get_snapshot_age_minutes()
assert 29 <= result <= 31 # Допуск на время выполнения
async def test_get_snapshot_age_minutes_memory_fallback(service, mock_cache):
"""Тест возраста snapshot из памяти."""
mock_cache.get = AsyncMock(return_value=None)
service._memory_snapshot_time = datetime.utcnow() - timedelta(minutes=15)
result = await service.get_snapshot_age_minutes()
assert 14 <= result <= 16
async def test_get_snapshot_age_minutes_no_snapshot(service, mock_cache):
"""Тест возраста когда snapshot нет."""
mock_cache.get = AsyncMock(return_value=None)
service._memory_snapshot_time = None
result = await service.get_snapshot_age_minutes()
assert result == float('inf')
# ============== Тесты _save_snapshot (с fallback) ==============
async def test_save_snapshot_redis_success(service, mock_cache, sample_snapshot):
"""Тест сохранения snapshot в Redis успешно."""
mock_cache.set = AsyncMock(return_value=True)
# Заполняем память чтобы проверить что она очистится
service._memory_snapshot = {"old": 123.0}
service._memory_snapshot_time = datetime.utcnow()
result = await service._save_snapshot(sample_snapshot)
assert result is True
assert service._memory_snapshot == {} # Память очищена
assert service._memory_snapshot_time is None
async def test_save_snapshot_fallback_to_memory(service, mock_cache, sample_snapshot):
"""Тест fallback на память когда Redis недоступен."""
mock_cache.set = AsyncMock(return_value=False)
result = await service._save_snapshot(sample_snapshot)
assert result is True
assert service._memory_snapshot == sample_snapshot
assert service._memory_snapshot_time is not None
# ============== Тесты _get_current_snapshot ==============
async def test_get_current_snapshot_from_redis(service, mock_cache, sample_snapshot):
"""Тест получения snapshot из Redis."""
mock_cache.get = AsyncMock(return_value=sample_snapshot)
result = await service._get_current_snapshot()
assert result == sample_snapshot
async def test_get_current_snapshot_fallback_to_memory(service, mock_cache, sample_snapshot):
"""Тест fallback на память."""
mock_cache.get = AsyncMock(return_value=None)
service._memory_snapshot = sample_snapshot
result = await service._get_current_snapshot()
assert result == sample_snapshot
# ============== Тесты уведомлений ==============
async def test_save_notification_to_redis(service, mock_cache):
"""Тест сохранения времени уведомления."""
mock_cache.set = AsyncMock(return_value=True)
result = await service._save_notification_to_redis("uuid-123")
assert result is True
mock_cache.set.assert_called_once()
call_args = mock_cache.set.call_args
assert "traffic:notifications:uuid-123" in call_args[0][0]
async def test_get_notification_time_from_redis(service, mock_cache):
"""Тест получения времени уведомления."""
test_time = datetime(2024, 1, 15, 10, 0, 0)
mock_cache.get = AsyncMock(return_value=test_time.isoformat())
result = await service._get_notification_time_from_redis("uuid-123")
assert result == test_time
async def test_should_send_notification_no_previous(service, mock_cache):
"""Тест should_send_notification когда уведомлений не было."""
mock_cache.get = AsyncMock(return_value=None)
service._memory_notification_cache = {}
result = await service.should_send_notification("uuid-123")
assert result is True
async def test_should_send_notification_cooldown_active(service, mock_cache):
"""Тест should_send_notification когда кулдаун активен."""
# Уведомление было 5 минут назад, кулдаун 60 минут
recent_time = datetime.utcnow() - timedelta(minutes=5)
mock_cache.get = AsyncMock(return_value=recent_time.isoformat())
result = await service.should_send_notification("uuid-123")
assert result is False
async def test_should_send_notification_cooldown_expired(service, mock_cache):
"""Тест should_send_notification когда кулдаун истёк."""
# Уведомление было 120 минут назад, кулдаун 60 минут
old_time = datetime.utcnow() - timedelta(minutes=120)
mock_cache.get = AsyncMock(return_value=old_time.isoformat())
result = await service.should_send_notification("uuid-123")
assert result is True
async def test_record_notification_redis(service, mock_cache):
"""Тест record_notification сохраняет в Redis."""
mock_cache.set = AsyncMock(return_value=True)
await service.record_notification("uuid-123")
mock_cache.set.assert_called_once()
async def test_record_notification_fallback_to_memory(service, mock_cache):
"""Тест record_notification с fallback на память."""
mock_cache.set = AsyncMock(return_value=False)
await service.record_notification("uuid-123")
assert "uuid-123" in service._memory_notification_cache
# ============== Тесты create_initial_snapshot ==============
async def test_create_initial_snapshot_uses_existing_redis(service, mock_cache, sample_snapshot):
"""Тест что create_initial_snapshot использует существующий snapshot из Redis."""
mock_cache.get = AsyncMock(side_effect=[
sample_snapshot, # _load_snapshot_from_redis
(datetime.utcnow() - timedelta(minutes=10)).isoformat(), # _get_snapshot_time_from_redis
])
with patch.object(service, 'get_all_users_with_traffic', new_callable=AsyncMock) as mock_get_users:
result = await service.create_initial_snapshot()
# Не должен вызывать API - используем существующий snapshot
mock_get_users.assert_not_called()
assert result == len(sample_snapshot)
async def test_create_initial_snapshot_creates_new(service, mock_cache):
"""Тест создания нового snapshot когда в Redis пусто."""
mock_cache.get = AsyncMock(return_value=None)
mock_cache.set = AsyncMock(return_value=True)
# Мокаем пользователей из API
mock_user = MagicMock()
mock_user.uuid = "uuid-1"
mock_user.user_traffic = MagicMock()
mock_user.user_traffic.used_traffic_bytes = 1073741824 # 1 GB
with patch.object(service, 'get_all_users_with_traffic', new_callable=AsyncMock) as mock_get_users:
mock_get_users.return_value = [mock_user]
result = await service.create_initial_snapshot()
mock_get_users.assert_called_once()
assert result == 1
# ============== Тесты cleanup_notification_cache ==============
async def test_cleanup_notification_cache_removes_old(service, mock_cache):
"""Тест очистки старых записей из памяти."""
old_time = datetime.utcnow() - timedelta(hours=25)
recent_time = datetime.utcnow() - timedelta(hours=1)
service._memory_notification_cache = {
"uuid-old": old_time,
"uuid-recent": recent_time,
}
await service.cleanup_notification_cache()
assert "uuid-old" not in service._memory_notification_cache
assert "uuid-recent" in service._memory_notification_cache