diff --git a/.env.example b/.env.example
index 51233971..c5071757 100644
--- a/.env.example
+++ b/.env.example
@@ -66,11 +66,33 @@ ADMIN_REPORTS_CHAT_ID= # Опционально: чат
ADMIN_REPORTS_TOPIC_ID= # ID топика для отчетов
ADMIN_REPORTS_SEND_TIME=10:00 # Время отправки (по МСК) ежедневного отчета
-# Мониторинг трафика
-TRAFFIC_MONITORING_ENABLED=false # Включить мониторинг трафика пользователей
-TRAFFIC_THRESHOLD_GB_PER_DAY=10.0 # Порог трафика в ГБ за сутки (превышение вызывает уведомление)
-TRAFFIC_MONITORING_INTERVAL_HOURS=24 # Интервал проверки трафика в часах (например: 1, 6, 12, 24)
-SUSPICIOUS_NOTIFICATIONS_TOPIC_ID=14 # ID топика для уведомлений о подозрительной активности (0 для отправки в основной чат)
+# ===== МОНИТОРИНГ ТРАФИКА =====
+# Логика: при запуске бота создаётся snapshot трафика всех пользователей.
+# Через указанный интервал проверяется дельта (разница) трафика.
+# Если дельта превышает порог — отправляется уведомление админам.
+
+# Быстрая проверка (дельта трафика за интервал)
+TRAFFIC_FAST_CHECK_ENABLED=false # Включить быструю проверку
+TRAFFIC_FAST_CHECK_INTERVAL_MINUTES=10 # Интервал проверки в минутах
+TRAFFIC_FAST_CHECK_THRESHOLD_GB=5.0 # Порог дельты в ГБ (сколько потрачено за интервал)
+
+# Суточная проверка (трафик за 24 часа через bandwidth API)
+TRAFFIC_DAILY_CHECK_ENABLED=false # Включить суточную проверку
+TRAFFIC_DAILY_CHECK_TIME=00:00 # Время суточной проверки (HH:MM по UTC)
+TRAFFIC_DAILY_THRESHOLD_GB=50.0 # Порог суточного трафика в ГБ
+
+# Куда отправлять уведомления
+SUSPICIOUS_NOTIFICATIONS_TOPIC_ID=14 # ID топика для уведомлений о подозрительной активности
+
+# Фильтрация по серверам (UUID нод через запятую)
+TRAFFIC_MONITORED_NODES= # Только эти ноды (пусто = все)
+TRAFFIC_IGNORED_NODES= # Исключить эти ноды
+
+# Производительность
+TRAFFIC_CHECK_BATCH_SIZE=1000 # Размер батча для получения пользователей
+TRAFFIC_CHECK_CONCURRENCY=10 # Параллельных запросов к API
+TRAFFIC_NOTIFICATION_COOLDOWN_MINUTES=60 # Кулдаун уведомлений на пользователя (минуты)
+TRAFFIC_SNAPSHOT_TTL_HOURS=24 # TTL snapshot трафика в Redis (часы, сохраняется при рестарте)
# Черный список
BLACKLIST_CHECK_ENABLED=false # Включить проверку пользователей по черному списку
diff --git a/app/config.py b/app/config.py
index 58ff2239..2489c969 100644
--- a/app/config.py
+++ b/app/config.py
@@ -237,11 +237,32 @@ class Settings(BaseSettings):
MENU_LAYOUT_ENABLED: bool = False # Включить управление меню через API
# Настройки мониторинга трафика
- TRAFFIC_MONITORING_ENABLED: bool = False
- TRAFFIC_THRESHOLD_GB_PER_DAY: float = 10.0 # Порог трафика в ГБ за сутки
- TRAFFIC_MONITORING_INTERVAL_HOURS: int = 24 # Интервал проверки в часах (по умолчанию - раз в сутки)
+ TRAFFIC_MONITORING_ENABLED: bool = False # Глобальный переключатель (для обратной совместимости)
+ TRAFFIC_THRESHOLD_GB_PER_DAY: float = 10.0 # Порог трафика в ГБ за сутки (для обратной совместимости)
+ TRAFFIC_MONITORING_INTERVAL_HOURS: int = 24 # Интервал проверки в часах (для обратной совместимости)
SUSPICIOUS_NOTIFICATIONS_TOPIC_ID: Optional[int] = None
+ # Новый мониторинг трафика v2
+ # Быстрая проверка (текущий использованный трафик)
+ TRAFFIC_FAST_CHECK_ENABLED: bool = False
+ TRAFFIC_FAST_CHECK_INTERVAL_MINUTES: int = 10 # Интервал проверки в минутах
+ TRAFFIC_FAST_CHECK_THRESHOLD_GB: float = 5.0 # Порог в ГБ для быстрой проверки
+
+ # Суточная проверка (трафик за 24 часа)
+ TRAFFIC_DAILY_CHECK_ENABLED: bool = False
+ TRAFFIC_DAILY_CHECK_TIME: str = "00:00" # Время суточной проверки (HH:MM)
+ TRAFFIC_DAILY_THRESHOLD_GB: float = 50.0 # Порог суточного трафика в ГБ
+
+ # Фильтрация по серверам (UUID нод через запятую)
+ TRAFFIC_MONITORED_NODES: str = "" # Только эти ноды (пусто = все)
+ TRAFFIC_IGNORED_NODES: str = "" # Исключить эти ноды
+
+ # Параллельность и кулдаун
+ TRAFFIC_CHECK_BATCH_SIZE: int = 1000 # Размер батча для получения пользователей
+ TRAFFIC_CHECK_CONCURRENCY: int = 10 # Параллельных запросов
+ TRAFFIC_NOTIFICATION_COOLDOWN_MINUTES: int = 60 # Кулдаун уведомлений (минуты)
+ TRAFFIC_SNAPSHOT_TTL_HOURS: int = 24 # TTL для snapshot трафика в Redis (часы)
+
AUTOPAY_WARNING_DAYS: str = "3,1"
ENABLE_AUTOPAY: bool = False
@@ -829,6 +850,23 @@ class Settings(BaseSettings):
def get_remnawave_auto_sync_times(self) -> List[time]:
return self.parse_daily_time_list(self.REMNAWAVE_AUTO_SYNC_TIMES)
+ def get_traffic_monitored_nodes(self) -> List[str]:
+ """Возвращает список UUID нод для мониторинга (пусто = все)"""
+ if not self.TRAFFIC_MONITORED_NODES:
+ return []
+ return [n.strip() for n in self.TRAFFIC_MONITORED_NODES.split(",") if n.strip()]
+
+ def get_traffic_ignored_nodes(self) -> List[str]:
+ """Возвращает список UUID нод для исключения из мониторинга"""
+ if not self.TRAFFIC_IGNORED_NODES:
+ return []
+ return [n.strip() for n in self.TRAFFIC_IGNORED_NODES.split(",") if n.strip()]
+
+ def get_traffic_daily_check_time(self) -> Optional[time]:
+ """Возвращает время суточной проверки трафика"""
+ times = self.parse_daily_time_list(self.TRAFFIC_DAILY_CHECK_TIME)
+ return times[0] if times else None
+
def get_display_name_banned_keywords(self) -> List[str]:
raw_value = self.DISPLAY_NAME_BANNED_KEYWORDS
if raw_value is None:
diff --git a/app/handlers/admin/monitoring.py b/app/handlers/admin/monitoring.py
index fff926ae..178d3e0c 100644
--- a/app/handlers/admin/monitoring.py
+++ b/app/handlers/admin/monitoring.py
@@ -774,81 +774,53 @@ async def force_check_callback(callback: CallbackQuery):
@router.callback_query(F.data == "admin_mon_traffic_check")
@admin_required
async def traffic_check_callback(callback: CallbackQuery):
- """Ручная проверка трафика всех пользователей."""
+ """Ручная проверка трафика — использует snapshot и дельту."""
try:
# Проверяем, включен ли мониторинг трафика
if not traffic_monitoring_scheduler.is_enabled():
await callback.answer(
"⚠️ Мониторинг трафика отключен в настройках\n"
- "Включите TRAFFIC_MONITORING_ENABLED=true в .env",
+ "Включите TRAFFIC_FAST_CHECK_ENABLED=true в .env",
show_alert=True
)
return
- await callback.answer("⏳ Запускаем проверку трафика...")
+ await callback.answer("⏳ Запускаем проверку трафика (дельта)...")
+
+ # Используем run_fast_check — он сравнивает с snapshot и отправляет уведомления
+ from app.services.traffic_monitoring_service import traffic_monitoring_scheduler_v2
# Устанавливаем бота, если не установлен
- if not traffic_monitoring_scheduler.bot:
- traffic_monitoring_scheduler.set_bot(callback.bot)
+ if not traffic_monitoring_scheduler_v2.bot:
+ traffic_monitoring_scheduler_v2.set_bot(callback.bot)
- checked_count = 0
- exceeded_count = 0
- exceeded_users = []
+ violations = await traffic_monitoring_scheduler_v2.run_fast_check_now()
- async for db in get_db():
- from app.database.crud.user import get_users_with_active_subscriptions
-
- users = await get_users_with_active_subscriptions(db)
-
- for user in users:
- if user.remnawave_uuid:
- is_exceeded, traffic_info = await traffic_monitoring_service.check_user_traffic_threshold(
- db,
- user.remnawave_uuid,
- user.telegram_id
- )
- checked_count += 1
-
- if is_exceeded:
- exceeded_count += 1
- total_gb = traffic_info.get('total_gb', 0)
- exceeded_users.append({
- 'telegram_id': user.telegram_id,
- 'name': user.full_name or str(user.telegram_id),
- 'traffic_gb': total_gb
- })
-
- # Отправляем уведомление админам
- if traffic_monitoring_scheduler._should_send_notification(user.remnawave_uuid):
- await traffic_monitoring_service.process_suspicious_traffic(
- db,
- user.remnawave_uuid,
- traffic_info,
- callback.bot
- )
- traffic_monitoring_scheduler._record_notification(user.remnawave_uuid)
-
- break
-
- threshold_gb = settings.TRAFFIC_THRESHOLD_GB_PER_DAY
+ # Получаем информацию о snapshot
+ snapshot_age = await traffic_monitoring_scheduler_v2.service.get_snapshot_age_minutes()
+ threshold_gb = traffic_monitoring_scheduler_v2.service.get_fast_check_threshold_gb()
text = f"""
📊 Проверка трафика завершена
-🔍 Результаты:
-• Проверено пользователей: {checked_count}
-• Превышений порога: {exceeded_count}
-• Порог: {threshold_gb} ГБ/сутки
+🔍 Результаты (дельта):
+• Превышений за интервал: {len(violations)}
+• Порог дельты: {threshold_gb} ГБ
+• Возраст snapshot: {snapshot_age:.1f} мин
🕐 Время проверки: {datetime.now().strftime('%H:%M:%S')}
"""
- if exceeded_users:
- text += "\n⚠️ Пользователи с превышением:\n"
- for u in exceeded_users[:10]:
- text += f"• {u['name']}: {u['traffic_gb']:.1f} ГБ\n"
- if len(exceeded_users) > 10:
- text += f"... и ещё {len(exceeded_users) - 10}\n"
+ if violations:
+ text += "\n⚠️ Превышения дельты:\n"
+ for v in violations[:10]:
+ name = v.full_name or v.user_uuid[:8]
+ text += f"• {name}: +{v.used_traffic_gb:.1f} ГБ\n"
+ if len(violations) > 10:
+ text += f"... и ещё {len(violations) - 10}\n"
+ text += "\n📨 Уведомления отправлены (с учётом кулдауна)"
+ else:
+ text += "\n✅ Превышений не обнаружено"
from aiogram.types import InlineKeyboardMarkup, InlineKeyboardButton
keyboard = InlineKeyboardMarkup(inline_keyboard=[
diff --git a/app/services/payment/cloudpayments.py b/app/services/payment/cloudpayments.py
index e0d9bca0..dfad134f 100644
--- a/app/services/payment/cloudpayments.py
+++ b/app/services/payment/cloudpayments.py
@@ -267,7 +267,10 @@ class CloudPaymentsPaymentMixin:
# Умная автоактивация если автопокупка не сработала
if not auto_purchase_success:
try:
- await auto_activate_subscription_after_topup(db, user, bot=getattr(self, "bot", None))
+ # Игнорируем notification_sent т.к. здесь нет дополнительных уведомлений
+ await auto_activate_subscription_after_topup(
+ db, user, bot=getattr(self, "bot", None), topup_amount=amount_kopeks
+ )
except Exception as error:
logger.exception("Ошибка умной автоактивации после CloudPayments: %s", error)
diff --git a/app/services/payment/cryptobot.py b/app/services/payment/cryptobot.py
index 95855c56..2252b6a5 100644
--- a/app/services/payment/cryptobot.py
+++ b/app/services/payment/cryptobot.py
@@ -377,12 +377,14 @@ class CryptoBotPaymentMixin:
has_saved_cart = False
# Умная автоактивация если автопокупка не сработала
+ activation_notification_sent = False
if not auto_purchase_success:
try:
- await auto_activate_subscription_after_topup(
+ _, activation_notification_sent = await auto_activate_subscription_after_topup(
db,
user,
bot=bot_instance,
+ topup_amount=amount_kopeks,
)
except Exception as auto_activate_error:
logger.error(
@@ -392,7 +394,8 @@ class CryptoBotPaymentMixin:
exc_info=True,
)
- if has_saved_cart and bot_instance:
+ # Отправляем уведомление только если его ещё не отправили
+ if has_saved_cart and bot_instance and not activation_notification_sent:
from app.localization.texts import get_texts
texts = get_texts(user.language)
diff --git a/app/services/payment/freekassa.py b/app/services/payment/freekassa.py
index 163b2734..95ad19d0 100644
--- a/app/services/payment/freekassa.py
+++ b/app/services/payment/freekassa.py
@@ -411,9 +411,12 @@ class FreekassaPaymentMixin:
has_saved_cart = False
# Умная автоактивация если автопокупка не сработала
+ activation_notification_sent = False
if not auto_purchase_success:
try:
- await auto_activate_subscription_after_topup(db, user, bot=getattr(self, "bot", None))
+ _, activation_notification_sent = await auto_activate_subscription_after_topup(
+ db, user, bot=getattr(self, "bot", None), topup_amount=payment.amount_kopeks
+ )
except Exception as auto_activate_error:
logger.error(
"Ошибка умной автоактивации для пользователя %s: %s",
@@ -422,7 +425,8 @@ class FreekassaPaymentMixin:
exc_info=True,
)
- if has_saved_cart and getattr(self, "bot", None):
+ # Отправляем уведомление только если его ещё не отправили
+ if has_saved_cart and getattr(self, "bot", None) and not activation_notification_sent:
from app.localization.texts import get_texts
texts = get_texts(user.language)
diff --git a/app/services/payment/mulenpay.py b/app/services/payment/mulenpay.py
index 3d1705d6..9dd3b8c7 100644
--- a/app/services/payment/mulenpay.py
+++ b/app/services/payment/mulenpay.py
@@ -396,9 +396,12 @@ class MulenPayPaymentMixin:
has_saved_cart = False
# Умная автоактивация если автопокупка не сработала
+ activation_notification_sent = False
if not auto_purchase_success:
try:
- await auto_activate_subscription_after_topup(db, user, bot=getattr(self, "bot", None))
+ _, activation_notification_sent = await auto_activate_subscription_after_topup(
+ db, user, bot=getattr(self, "bot", None), topup_amount=payment.amount_kopeks
+ )
except Exception as auto_activate_error:
logger.error(
"Ошибка умной автоактивации для пользователя %s: %s",
@@ -407,7 +410,8 @@ class MulenPayPaymentMixin:
exc_info=True,
)
- if has_saved_cart and getattr(self, "bot", None):
+ # Отправляем уведомление только если его ещё не отправили
+ if has_saved_cart and getattr(self, "bot", None) and not activation_notification_sent:
# Если у пользователя есть сохраненная корзина,
# отправляем ему уведомление с кнопкой вернуться к оформлению
from app.localization.texts import get_texts
diff --git a/app/services/payment/pal24.py b/app/services/payment/pal24.py
index 1086c63b..3000155c 100644
--- a/app/services/payment/pal24.py
+++ b/app/services/payment/pal24.py
@@ -499,9 +499,12 @@ class Pal24PaymentMixin:
has_saved_cart = False
# Умная автоактивация если автопокупка не сработала
+ activation_notification_sent = False
if not auto_purchase_success:
try:
- await auto_activate_subscription_after_topup(db, user, bot=getattr(self, "bot", None))
+ _, activation_notification_sent = await auto_activate_subscription_after_topup(
+ db, user, bot=getattr(self, "bot", None), topup_amount=payment.amount_kopeks
+ )
except Exception as auto_activate_error:
logger.error(
"Ошибка умной автоактивации для пользователя %s: %s",
@@ -510,7 +513,8 @@ class Pal24PaymentMixin:
exc_info=True,
)
- if has_saved_cart and getattr(self, "bot", None):
+ # Отправляем уведомление только если его ещё не отправили
+ if has_saved_cart and getattr(self, "bot", None) and not activation_notification_sent:
from app.localization.texts import get_texts
texts = get_texts(user.language)
diff --git a/app/services/payment/platega.py b/app/services/payment/platega.py
index 3149d0bb..eb6d5501 100644
--- a/app/services/payment/platega.py
+++ b/app/services/payment/platega.py
@@ -485,9 +485,12 @@ class PlategaPaymentMixin:
has_saved_cart = False
# Умная автоактивация если автопокупка не сработала
+ activation_notification_sent = False
if not auto_purchase_success:
try:
- await auto_activate_subscription_after_topup(db, user, bot=getattr(self, "bot", None))
+ _, activation_notification_sent = await auto_activate_subscription_after_topup(
+ db, user, bot=getattr(self, "bot", None), topup_amount=payment.amount_kopeks
+ )
except Exception as auto_activate_error:
logger.error(
"Ошибка умной автоактивации для пользователя %s: %s",
@@ -496,7 +499,8 @@ class PlategaPaymentMixin:
exc_info=True,
)
- if has_saved_cart and getattr(self, "bot", None):
+ # Отправляем уведомление только если его ещё не отправили
+ if has_saved_cart and getattr(self, "bot", None) and not activation_notification_sent:
from app.localization.texts import get_texts
texts = get_texts(user.language)
diff --git a/app/services/payment/stars.py b/app/services/payment/stars.py
index a5cff2e7..5cf2841e 100644
--- a/app/services/payment/stars.py
+++ b/app/services/payment/stars.py
@@ -534,12 +534,14 @@ class TelegramStarsMixin:
has_saved_cart = False
# Умная автоактивация если автопокупка не сработала
+ activation_notification_sent = False
if not auto_purchase_success:
try:
- await auto_activate_subscription_after_topup(
+ _, activation_notification_sent = await auto_activate_subscription_after_topup(
db,
user,
bot=getattr(self, "bot", None),
+ topup_amount=amount_kopeks,
)
except Exception as auto_activate_error:
logger.error(
@@ -549,7 +551,8 @@ class TelegramStarsMixin:
exc_info=True,
)
- if has_saved_cart and getattr(self, "bot", None):
+ # Отправляем уведомление только если его ещё не отправили
+ if has_saved_cart and getattr(self, "bot", None) and not activation_notification_sent:
texts = get_texts(user.language)
cart_message = texts.t(
"BALANCE_TOPUP_CART_REMINDER_DETAILED",
diff --git a/app/services/payment/wata.py b/app/services/payment/wata.py
index 44d03369..ebaf2db8 100644
--- a/app/services/payment/wata.py
+++ b/app/services/payment/wata.py
@@ -569,9 +569,12 @@ class WataPaymentMixin:
has_saved_cart = False
# Умная автоактивация если автопокупка не сработала
+ activation_notification_sent = False
if not auto_purchase_success:
try:
- await auto_activate_subscription_after_topup(db, user, bot=getattr(self, "bot", None))
+ _, activation_notification_sent = await auto_activate_subscription_after_topup(
+ db, user, bot=getattr(self, "bot", None), topup_amount=payment.amount_kopeks
+ )
except Exception as auto_activate_error:
logger.error(
"Ошибка умной автоактивации для пользователя %s: %s",
@@ -580,7 +583,8 @@ class WataPaymentMixin:
exc_info=True,
)
- if has_saved_cart and getattr(self, "bot", None):
+ # Отправляем уведомление только если его ещё не отправили
+ if has_saved_cart and getattr(self, "bot", None) and not activation_notification_sent:
from app.localization.texts import get_texts
texts = get_texts(user.language)
diff --git a/app/services/payment/yookassa.py b/app/services/payment/yookassa.py
index 678837c9..6f25c28c 100644
--- a/app/services/payment/yookassa.py
+++ b/app/services/payment/yookassa.py
@@ -687,8 +687,9 @@ class YooKassaPaymentMixin:
exc_info=True, # Добавляем полный стек вызовов для отладки
)
- # Отправляем уведомление пользователю
- if getattr(self, "bot", None):
+ # Отправляем уведомление пользователю (если не включен режим SHOW_ACTIVATION_PROMPT_AFTER_TOPUP,
+ # т.к. в этом случае уведомление будет отправлено из auto_activate_subscription_after_topup)
+ if getattr(self, "bot", None) and not settings.SHOW_ACTIVATION_PROMPT_AFTER_TOPUP:
try:
# Передаем только простые данные, чтобы избежать проблем с ленивой загрузкой
await self._send_payment_success_notification(
@@ -738,12 +739,14 @@ class YooKassaPaymentMixin:
has_saved_cart = False
# Умная автоактивация если автопокупка не сработала
+ activation_notification_sent = False
if not auto_purchase_success:
try:
- await auto_activate_subscription_after_topup(
+ _, activation_notification_sent = await auto_activate_subscription_after_topup(
db,
user,
bot=getattr(self, "bot", None),
+ topup_amount=payment.amount_kopeks,
)
except Exception as auto_activate_error:
logger.error(
@@ -753,7 +756,8 @@ class YooKassaPaymentMixin:
exc_info=True,
)
- if has_saved_cart and getattr(self, "bot", None):
+ # Отправляем уведомление только если его ещё не отправили
+ if has_saved_cart and getattr(self, "bot", None) and not activation_notification_sent:
# Если у пользователя есть сохраненная корзина,
# отправляем ему уведомление с кнопкой вернуться к оформлению
from app.localization.texts import get_texts
diff --git a/app/services/subscription_auto_purchase_service.py b/app/services/subscription_auto_purchase_service.py
index 04628830..10fcec90 100644
--- a/app/services/subscription_auto_purchase_service.py
+++ b/app/services/subscription_auto_purchase_service.py
@@ -717,7 +717,8 @@ async def auto_activate_subscription_after_topup(
user: User,
*,
bot: Optional[Bot] = None,
-) -> bool:
+ topup_amount: Optional[int] = None,
+) -> tuple[bool, bool]:
"""
Умная автоактивация после пополнения баланса.
@@ -727,6 +728,14 @@ async def auto_activate_subscription_after_topup(
- Если подписки нет — создаёт новую с дефолтными параметрами
Выбирает максимальный период, который можно оплатить из баланса.
+
+ Args:
+ topup_amount: Сумма пополнения в копейках (для отображения в уведомлении)
+
+ Returns:
+ tuple[bool, bool]: (success, notification_sent)
+ - success: True если подписка активирована
+ - notification_sent: True если уведомление отправлено пользователю
"""
from datetime import datetime
from app.database.crud.subscription import get_subscription_by_user_id, create_paid_subscription
@@ -739,70 +748,98 @@ async def auto_activate_subscription_after_topup(
from app.services.admin_notification_service import AdminNotificationService
if not user or not getattr(user, "id", None):
- return False
+ return (False, False)
subscription = await get_subscription_by_user_id(db, user.id)
- # Если автоактивация отключена - только отправляем предупреждение
+ # Если автоактивация отключена - только отправляем уведомление
if not settings.is_auto_activate_after_topup_enabled():
- # Отправляем предупреждение если включен режим и нет активной подписки
- if (
- settings.SHOW_ACTIVATION_PROMPT_AFTER_TOPUP
- and bot
- and (not subscription or subscription.status not in ("active", "ACTIVE"))
- ):
+ notification_sent = False
+ # Отправляем уведомление если включен режим
+ if settings.SHOW_ACTIVATION_PROMPT_AFTER_TOPUP and bot:
try:
texts = get_texts(getattr(user, "language", "ru"))
- warning_message = (
- f"✅ Баланс пополнен!\n\n"
- f"💳 Текущий баланс: {settings.format_price(user.balance_kopeks)}\n\n"
- f"{'─' * 25}\n\n"
- f"⚠️ ВАЖНО! ⚠️\n\n"
- f"🔴 ПОДПИСКА НЕ АКТИВНА!\n\n"
- f"Пополнение баланса НЕ активирует подписку автоматически!\n\n"
- f"👇 Выберите действие:"
- )
- keyboard = InlineKeyboardMarkup(
- inline_keyboard=[
- [InlineKeyboardButton(
- text="🚀 АКТИВИРОВАТЬ ПОДПИСКУ",
- callback_data="subscription_buy",
- )],
- [InlineKeyboardButton(
- text="💎 ПРОДЛИТЬ ПОДПИСКУ",
- callback_data="subscription_extend",
- )],
- [InlineKeyboardButton(
- text="📱 ДОБАВИТЬ УСТРОЙСТВА",
- callback_data="subscription_add_devices",
- )],
- ]
+ has_active_subscription = (
+ subscription
+ and subscription.status in ("active", "ACTIVE")
)
+
+ # Формируем строку с суммой пополнения
+ topup_line = ""
+ if topup_amount:
+ topup_line = f"➕ Пополнено: {settings.format_price(topup_amount)}\n"
+
+ # Определяем состояние подписки
+ is_trial = subscription and getattr(subscription, 'is_trial', False)
+
+ if has_active_subscription and not is_trial:
+ # Активная платная подписка — 2 кнопки
+ warning_message = (
+ f"✅ Баланс пополнен!\n\n"
+ f"{topup_line}"
+ f"💳 Текущий баланс: {settings.format_price(user.balance_kopeks)}\n\n"
+ f"👇 Выберите действие:"
+ )
+ keyboard = InlineKeyboardMarkup(
+ inline_keyboard=[
+ [InlineKeyboardButton(
+ text="💎 Продлить подписку",
+ callback_data="subscription_extend",
+ )],
+ [InlineKeyboardButton(
+ text="📱 Изменить устройства",
+ callback_data="subscription_change_devices",
+ )],
+ ]
+ )
+ else:
+ # Триал или подписка закончилась — 1 кнопка
+ warning_message = (
+ f"✅ Баланс пополнен!\n\n"
+ f"{topup_line}"
+ f"💳 Текущий баланс: {settings.format_price(user.balance_kopeks)}\n\n"
+ f"{'━' * 20}\n\n"
+ f"🚨🚨🚨 ВНИМАНИЕ! 🚨🚨🚨\n\n"
+ f"🔴 ПОДПИСКА НЕ АКТИВНА!\n\n"
+ f"⚠️ Пополнение баланса НЕ активирует подписку автоматически!\n\n"
+ f"👇 Обязательно оформите подписку:"
+ )
+ keyboard = InlineKeyboardMarkup(
+ inline_keyboard=[
+ [InlineKeyboardButton(
+ text="🚀 КУПИТЬ ПОДПИСКУ",
+ callback_data="menu_buy",
+ )],
+ ]
+ )
+
await bot.send_message(
chat_id=user.telegram_id,
text=warning_message,
reply_markup=keyboard,
parse_mode="HTML",
)
+ notification_sent = True
logger.info(
- "⚠️ Отправлено предупреждение об активации подписки пользователю %s (автоактивация выключена)",
+ "⚠️ Отправлено уведомление о пополнении баланса пользователю %s (автоактивация выключена, подписка %s)",
user.telegram_id,
+ "активна" if has_active_subscription else "неактивна",
)
except Exception as notify_error:
logger.warning(
- "⚠️ Не удалось отправить предупреждение пользователю %s: %s",
+ "⚠️ Не удалось отправить уведомление пользователю %s: %s",
user.telegram_id,
notify_error,
)
- return False
+ return (False, notification_sent)
- # Если подписка активна — ничего не делаем
+ # Если подписка активна — ничего не делаем (автоактивация включена, но подписка уже есть)
if subscription and subscription.status == "ACTIVE" and subscription.end_date > datetime.utcnow():
logger.info(
"🔁 Автоактивация: у пользователя %s уже активная подписка, пропускаем",
user.telegram_id,
)
- return False
+ return (False, False)
# Определяем параметры подписки
if subscription:
@@ -839,7 +876,7 @@ async def auto_activate_subscription_after_topup(
if not available_periods:
logger.warning("🔁 Автоактивация: нет доступных периодов подписки")
- return False
+ return (False, False)
subscription_service = SubscriptionService()
@@ -875,56 +912,84 @@ async def auto_activate_subscription_after_topup(
user.telegram_id,
balance,
)
- # Отправляем предупреждение пользователю если включен режим и подписки нет
- if (
- settings.SHOW_ACTIVATION_PROMPT_AFTER_TOPUP
- and bot
- and (not subscription or subscription.status not in ("active", "ACTIVE"))
- ):
+ notification_sent = False
+ # Отправляем уведомление если включен режим
+ if settings.SHOW_ACTIVATION_PROMPT_AFTER_TOPUP and bot:
try:
texts = get_texts(getattr(user, "language", "ru"))
- warning_message = (
- f"✅ Баланс пополнен!\n\n"
- f"💳 Текущий баланс: {settings.format_price(balance)}\n\n"
- f"{'─' * 25}\n\n"
- f"⚠️ ВАЖНО! ⚠️\n\n"
- f"🔴 ПОДПИСКА НЕ АКТИВНА!\n\n"
- f"Пополнение баланса НЕ активирует подписку автоматически!\n\n"
- f"👇 Выберите действие:"
- )
- keyboard = InlineKeyboardMarkup(
- inline_keyboard=[
- [InlineKeyboardButton(
- text="🚀 АКТИВИРОВАТЬ ПОДПИСКУ",
- callback_data="subscription_buy",
- )],
- [InlineKeyboardButton(
- text="💎 ПРОДЛИТЬ ПОДПИСКУ",
- callback_data="subscription_extend",
- )],
- [InlineKeyboardButton(
- text="📱 ДОБАВИТЬ УСТРОЙСТВА",
- callback_data="subscription_add_devices",
- )],
- ]
+ has_active_subscription = (
+ subscription
+ and subscription.status in ("active", "ACTIVE")
)
+
+ # Формируем строку с суммой пополнения
+ topup_line2 = ""
+ if topup_amount:
+ topup_line2 = f"➕ Пополнено: {settings.format_price(topup_amount)}\n"
+
+ # Определяем состояние подписки
+ is_trial2 = subscription and getattr(subscription, 'is_trial', False)
+
+ if has_active_subscription and not is_trial2:
+ # Активная платная подписка — 2 кнопки
+ warning_message = (
+ f"✅ Баланс пополнен!\n\n"
+ f"{topup_line2}"
+ f"💳 Текущий баланс: {settings.format_price(balance)}\n\n"
+ f"👇 Выберите действие:"
+ )
+ keyboard = InlineKeyboardMarkup(
+ inline_keyboard=[
+ [InlineKeyboardButton(
+ text="💎 Продлить подписку",
+ callback_data="subscription_extend",
+ )],
+ [InlineKeyboardButton(
+ text="📱 Изменить устройства",
+ callback_data="subscription_change_devices",
+ )],
+ ]
+ )
+ else:
+ # Триал или подписка закончилась — 1 кнопка
+ warning_message = (
+ f"✅ Баланс пополнен!\n\n"
+ f"{topup_line2}"
+ f"💳 Текущий баланс: {settings.format_price(balance)}\n\n"
+ f"{'━' * 20}\n\n"
+ f"🚨🚨🚨 ВНИМАНИЕ! 🚨🚨🚨\n\n"
+ f"🔴 ПОДПИСКА НЕ АКТИВНА!\n\n"
+ f"⚠️ Пополнение баланса НЕ активирует подписку автоматически!\n\n"
+ f"👇 Обязательно оформите подписку:"
+ )
+ keyboard = InlineKeyboardMarkup(
+ inline_keyboard=[
+ [InlineKeyboardButton(
+ text="🚀 КУПИТЬ ПОДПИСКУ",
+ callback_data="menu_buy",
+ )],
+ ]
+ )
+
await bot.send_message(
chat_id=user.telegram_id,
text=warning_message,
reply_markup=keyboard,
parse_mode="HTML",
)
+ notification_sent = True
logger.info(
- "⚠️ Отправлено предупреждение об активации подписки пользователю %s",
+ "⚠️ Отправлено уведомление о пополнении баланса пользователю %s (недостаточно средств, подписка %s)",
user.telegram_id,
+ "активна" if has_active_subscription else "неактивна",
)
except Exception as notify_error:
logger.warning(
- "⚠️ Не удалось отправить предупреждение пользователю %s: %s",
+ "⚠️ Не удалось отправить уведомление пользователю %s: %s",
user.telegram_id,
notify_error,
)
- return False
+ return (False, notification_sent)
texts = get_texts(getattr(user, "language", "ru"))
@@ -1085,7 +1150,7 @@ async def auto_activate_subscription_after_topup(
notify_error,
)
- return True
+ return (True, True) # success=True, notification_sent=True (об активации)
except Exception as e:
logger.error(
@@ -1094,7 +1159,7 @@ async def auto_activate_subscription_after_topup(
e,
exc_info=True,
)
- return False
+ return (False, False)
__all__ = ["auto_purchase_saved_cart_after_topup", "auto_activate_subscription_after_topup"]
diff --git a/app/services/traffic_monitoring_service.py b/app/services/traffic_monitoring_service.py
index e3fc2412..9b583128 100644
--- a/app/services/traffic_monitoring_service.py
+++ b/app/services/traffic_monitoring_service.py
@@ -1,432 +1,912 @@
"""
-Сервис для мониторинга трафика пользователей
-Проверяет, не превышает ли пользователь заданный порог трафика за сутки
+Сервис для мониторинга трафика пользователей v2
+Быстрая проверка текущего трафика + суточная проверка
"""
-import logging
import asyncio
-from datetime import datetime, timedelta
-from typing import Dict, List, Optional, Tuple, Set
+import logging
+from dataclasses import dataclass
+from datetime import datetime, timedelta, time
+from typing import Dict, List, Optional, Set
from app.config import settings
from app.services.admin_notification_service import AdminNotificationService
from app.services.remnawave_service import RemnaWaveService
from app.database.crud.user import get_user_by_remnawave_uuid
-from app.database.database import get_db
-from app.database.models import User
+from app.database.database import AsyncSessionLocal
+from app.utils.cache import cache, cache_key
from sqlalchemy.ext.asyncio import AsyncSession
logger = logging.getLogger(__name__)
+# Ключи для хранения snapshot в Redis
+TRAFFIC_SNAPSHOT_KEY = "traffic:snapshot"
+TRAFFIC_SNAPSHOT_TIME_KEY = "traffic:snapshot:time"
+TRAFFIC_NOTIFICATION_CACHE_KEY = "traffic:notifications"
-class TrafficMonitoringService:
+
+@dataclass
+class TrafficViolation:
+ """Информация о превышении трафика"""
+ user_uuid: str
+ telegram_id: Optional[int]
+ full_name: Optional[str]
+ username: Optional[str]
+ used_traffic_gb: float
+ threshold_gb: float
+ last_node_uuid: Optional[str]
+ last_node_name: Optional[str]
+ check_type: str # "fast" или "daily"
+
+
+class TrafficMonitoringServiceV2:
"""
- Сервис для мониторинга трафика пользователей
+ Улучшенный сервис мониторинга трафика
+ - Батчевая загрузка пользователей
+ - Параллельная обработка
+ - Быстрая проверка (каждые N минут) с дельтой
+ - Суточная проверка
+ - Фильтрация по нодам
+ - Хранение snapshot в Redis (персистентность при перезапуске)
"""
-
+
def __init__(self):
self.remnawave_service = RemnaWaveService()
- self.lock = asyncio.Lock() # Блокировка для предотвращения одновременных проверок
+ self._nodes_cache: Dict[str, str] = {} # {node_uuid: node_name}
+ # Fallback на память если Redis недоступен
+ self._memory_snapshot: Dict[str, float] = {}
+ self._memory_snapshot_time: Optional[datetime] = None
+ self._memory_notification_cache: Dict[str, datetime] = {}
- def is_traffic_monitoring_enabled(self) -> bool:
- """Проверяет, включен ли мониторинг трафика"""
- return getattr(settings, 'TRAFFIC_MONITORING_ENABLED', False)
+ # ============== Настройки ==============
- def get_traffic_threshold_gb(self) -> float:
- """Получает порог трафика в ГБ за сутки"""
- return getattr(settings, 'TRAFFIC_THRESHOLD_GB_PER_DAY', 10.0)
+ def is_fast_check_enabled(self) -> bool:
+ # Поддержка старого параметра TRAFFIC_MONITORING_ENABLED
+ return settings.TRAFFIC_FAST_CHECK_ENABLED or settings.TRAFFIC_MONITORING_ENABLED
- def get_monitoring_interval_hours(self) -> int:
- """Получает интервал мониторинга в часах"""
- return getattr(settings, 'TRAFFIC_MONITORING_INTERVAL_HOURS', 24)
+ def is_daily_check_enabled(self) -> bool:
+ return settings.TRAFFIC_DAILY_CHECK_ENABLED
- def get_suspicious_notifications_topic_id(self) -> Optional[int]:
- """Получает ID топика для уведомлений о подозрительной активности"""
- return getattr(settings, 'SUSPICIOUS_NOTIFICATIONS_TOPIC_ID', None)
+ def get_fast_check_interval_seconds(self) -> int:
+ # Если используется старый параметр — конвертируем часы в секунды
+ if settings.TRAFFIC_MONITORING_ENABLED and not settings.TRAFFIC_FAST_CHECK_ENABLED:
+ return settings.TRAFFIC_MONITORING_INTERVAL_HOURS * 3600
+ return settings.TRAFFIC_FAST_CHECK_INTERVAL_MINUTES * 60
- async def get_user_daily_traffic(self, user_uuid: str) -> Dict:
- """
- Получает статистику трафика пользователя за последние 24 часа
+ def get_fast_check_threshold_gb(self) -> float:
+ # Если используется старый параметр — используем старый порог
+ if settings.TRAFFIC_MONITORING_ENABLED and not settings.TRAFFIC_FAST_CHECK_ENABLED:
+ return settings.TRAFFIC_THRESHOLD_GB_PER_DAY
+ return settings.TRAFFIC_FAST_CHECK_THRESHOLD_GB
- Args:
- user_uuid: UUID пользователя в Remnawave
+ def get_daily_threshold_gb(self) -> float:
+ return settings.TRAFFIC_DAILY_THRESHOLD_GB
- Returns:
- Словарь с информацией о трафике
- """
+ def get_batch_size(self) -> int:
+ return settings.TRAFFIC_CHECK_BATCH_SIZE
+
+ def get_concurrency(self) -> int:
+ return settings.TRAFFIC_CHECK_CONCURRENCY
+
+ def get_notification_cooldown_seconds(self) -> int:
+ return settings.TRAFFIC_NOTIFICATION_COOLDOWN_MINUTES * 60
+
+ def get_monitored_nodes(self) -> List[str]:
+ return settings.get_traffic_monitored_nodes()
+
+ def get_ignored_nodes(self) -> List[str]:
+ return settings.get_traffic_ignored_nodes()
+
+ def get_daily_check_time(self) -> Optional[time]:
+ return settings.get_traffic_daily_check_time()
+
+ def get_snapshot_ttl_seconds(self) -> int:
+ """TTL для snapshot в Redis (по умолчанию 24 часа)"""
+ return getattr(settings, 'TRAFFIC_SNAPSHOT_TTL_HOURS', 24) * 3600
+
+ # ============== Redis операции для snapshot ==============
+
+ async def _save_snapshot_to_redis(self, snapshot: Dict[str, float]) -> bool:
+ """Сохраняет snapshot трафика в Redis"""
try:
- # Получаем время начала и конца суток (сегодня)
- now = datetime.utcnow()
- start_of_day = now.replace(hour=0, minute=0, second=0, microsecond=0)
- end_of_day = now.replace(hour=23, minute=59, second=59, microsecond=999999)
+ # Сохраняем snapshot как JSON
+ snapshot_data = {uuid: bytes_val for uuid, bytes_val in snapshot.items()}
+ ttl = self.get_snapshot_ttl_seconds()
- # Форматируем даты в ISO формат
- start_date = start_of_day.strftime("%Y-%m-%dT%H:%M:%S.000Z")
- end_date = end_of_day.strftime("%Y-%m-%dT%H:%M:%S.999Z")
-
- # Получаем API клиент и вызываем метод получения статистики
- async with self.remnawave_service.get_api_client() as api:
- traffic_data = await api.get_user_stats_usage(user_uuid, start_date, end_date)
-
- # Обрабатываем ответ API
- if traffic_data and 'response' in traffic_data:
- response = traffic_data['response']
-
- # Вычисляем общий трафик
- total_gb = 0
- nodes_info = []
-
- if isinstance(response, list):
- for item in response:
- node_name = item.get('nodeName', 'Unknown')
- total_bytes = item.get('total', 0)
- total_gb_item = round(total_bytes / (1024**3), 2) # Конвертируем в ГБ
- total_gb += total_gb_item
-
- nodes_info.append({
- 'node': node_name,
- 'gb': total_gb_item
- })
- else:
- # Если response - это уже результат обработки (как в примере)
- total_gb = response.get('total_gb', 0)
- nodes_info = response.get('nodes', [])
-
- return {
- 'total_gb': total_gb,
- 'nodes': nodes_info,
- 'date_range': {
- 'start': start_date,
- 'end': end_date
- }
- }
+ success = await cache.set(TRAFFIC_SNAPSHOT_KEY, snapshot_data, expire=ttl)
+ if success:
+ # Сохраняем время создания snapshot
+ await cache.set(
+ TRAFFIC_SNAPSHOT_TIME_KEY,
+ datetime.utcnow().isoformat(),
+ expire=ttl
+ )
+ logger.info(f"📦 Snapshot сохранён в Redis: {len(snapshot)} пользователей, TTL {ttl//3600}ч")
else:
- logger.warning(f"Нет данных о трафике для пользователя {user_uuid}")
- return {
- 'total_gb': 0,
- 'nodes': [],
- 'date_range': {
- 'start': start_date,
- 'end': end_date
- }
- }
+ logger.warning(f"⚠️ Не удалось сохранить snapshot в Redis")
+ return success
+ except Exception as e:
+ logger.error(f"❌ Ошибка сохранения snapshot в Redis: {e}")
+ return False
+
+ async def _load_snapshot_from_redis(self) -> Optional[Dict[str, float]]:
+ """Загружает snapshot трафика из Redis"""
+ try:
+ snapshot_data = await cache.get(TRAFFIC_SNAPSHOT_KEY)
+ if snapshot_data and isinstance(snapshot_data, dict):
+ # Конвертируем обратно в float
+ result = {uuid: float(bytes_val) for uuid, bytes_val in snapshot_data.items()}
+ logger.debug(f"📦 Snapshot загружен из Redis: {len(result)} пользователей")
+ return result
+ return None
+ except Exception as e:
+ logger.error(f"❌ Ошибка загрузки snapshot из Redis: {e}")
+ return None
+
+ async def _get_snapshot_time_from_redis(self) -> Optional[datetime]:
+ """Получает время создания snapshot из Redis"""
+ try:
+ time_str = await cache.get(TRAFFIC_SNAPSHOT_TIME_KEY)
+ if time_str:
+ return datetime.fromisoformat(time_str)
+ return None
+ except Exception as e:
+ logger.error(f"❌ Ошибка получения времени snapshot: {e}")
+ return None
+
+ async def _save_notification_to_redis(self, user_uuid: str) -> bool:
+ """Сохраняет время уведомления в Redis"""
+ try:
+ key = cache_key(TRAFFIC_NOTIFICATION_CACHE_KEY, user_uuid)
+ ttl = 24 * 3600 # 24 часа
+ return await cache.set(key, datetime.utcnow().isoformat(), expire=ttl)
+ except Exception as e:
+ logger.error(f"❌ Ошибка сохранения уведомления в Redis: {e}")
+ return False
+
+ async def _get_notification_time_from_redis(self, user_uuid: str) -> Optional[datetime]:
+ """Получает время последнего уведомления из Redis"""
+ try:
+ key = cache_key(TRAFFIC_NOTIFICATION_CACHE_KEY, user_uuid)
+ time_str = await cache.get(key)
+ if time_str:
+ return datetime.fromisoformat(time_str)
+ return None
+ except Exception as e:
+ logger.error(f"❌ Ошибка получения времени уведомления: {e}")
+ return None
+
+ # ============== Фильтрация по нодам ==============
+
+ def should_monitor_node(self, node_uuid: Optional[str]) -> bool:
+ """Проверяет, нужно ли мониторить пользователя с этой ноды"""
+ if not node_uuid:
+ return True # Если нода неизвестна, мониторим
+
+ monitored = self.get_monitored_nodes()
+ ignored = self.get_ignored_nodes()
+
+ # Если есть список мониторинга — только они
+ if monitored:
+ return node_uuid in monitored
+
+ # Если есть список игнорирования — все кроме них
+ if ignored:
+ return node_uuid not in ignored
+
+ # Иначе мониторим всех
+ return True
+
+ # ============== Кулдаун уведомлений ==============
+
+ async def should_send_notification(self, user_uuid: str) -> bool:
+ """Проверяет, прошёл ли кулдаун для уведомления (Redis + fallback на память)"""
+ # Пробуем Redis
+ last_notification = await self._get_notification_time_from_redis(user_uuid)
+
+ # Fallback на память
+ if last_notification is None:
+ last_notification = self._memory_notification_cache.get(user_uuid)
+
+ if not last_notification:
+ return True
+
+ cooldown = self.get_notification_cooldown_seconds()
+ return (datetime.utcnow() - last_notification).total_seconds() > cooldown
+
+ async def record_notification(self, user_uuid: str):
+ """Записывает время отправки уведомления (Redis + fallback на память)"""
+ # Сохраняем в Redis
+ saved = await self._save_notification_to_redis(user_uuid)
+
+ # Fallback на память
+ if not saved:
+ self._memory_notification_cache[user_uuid] = datetime.utcnow()
+
+ async def cleanup_notification_cache(self):
+ """Очищает старые записи из памяти (Redis очищается автоматически через TTL)"""
+ now = datetime.utcnow()
+ expired = [
+ uuid for uuid, dt in self._memory_notification_cache.items()
+ if (now - dt) > timedelta(hours=24)
+ ]
+ for uuid in expired:
+ del self._memory_notification_cache[uuid]
+ if expired:
+ logger.debug(f"🧹 Очищено {len(expired)} записей из памяти уведомлений о трафике")
+
+ # ============== Получение пользователей ==============
+
+ async def get_all_users_with_traffic(self) -> List[Dict]:
+ """
+ Получает всех пользователей с их трафиком через батчевые запросы
+ Возвращает список словарей с информацией о пользователях
+ """
+ all_users = []
+ batch_size = self.get_batch_size()
+ offset = 0
+
+ try:
+ async with self.remnawave_service.get_api_client() as api:
+ while True:
+ result = await api.get_all_users(start=offset, size=batch_size)
+ users = result.get('users', [])
+
+ if not users:
+ break
+
+ all_users.extend(users)
+ logger.debug(f"📊 Загружено {len(all_users)} пользователей...")
+
+ if len(users) < batch_size:
+ break
+
+ offset += batch_size
+
+ logger.info(f"✅ Всего загружено {len(all_users)} пользователей из Remnawave")
+ return all_users
except Exception as e:
- logger.error(f"Ошибка при получении статистики трафика для {user_uuid}: {e}")
- return {
- 'total_gb': 0,
- 'nodes': [],
- 'date_range': {
- 'start': None,
- 'end': None
- }
- }
+ logger.error(f"❌ Ошибка при получении пользователей: {e}")
+ return []
- async def check_user_traffic_threshold(
- self,
- db: AsyncSession,
- user_uuid: str,
- user_telegram_id: int = None
- ) -> Tuple[bool, Dict]:
+ # ============== Быстрая проверка ==============
+
+ async def has_snapshot(self) -> bool:
+ """Проверяет, есть ли сохранённый snapshot (Redis + fallback на память)"""
+ # Проверяем Redis
+ snapshot = await self._load_snapshot_from_redis()
+ if snapshot:
+ return True
+
+ # Fallback на память
+ return bool(self._memory_snapshot) and self._memory_snapshot_time is not None
+
+ async def get_snapshot_age_minutes(self) -> float:
+ """Возвращает возраст snapshot в минутах (Redis + fallback на память)"""
+ # Пробуем Redis
+ snapshot_time = await self._get_snapshot_time_from_redis()
+
+ # Fallback на память
+ if snapshot_time is None:
+ snapshot_time = self._memory_snapshot_time
+
+ if not snapshot_time:
+ return float('inf')
+ return (datetime.utcnow() - snapshot_time).total_seconds() / 60
+
+ async def _get_current_snapshot(self) -> Dict[str, float]:
+ """Получает текущий snapshot (Redis + fallback на память)"""
+ # Пробуем Redis
+ snapshot = await self._load_snapshot_from_redis()
+ if snapshot:
+ return snapshot
+
+ # Fallback на память
+ return self._memory_snapshot.copy()
+
+ async def _save_snapshot(self, snapshot: Dict[str, float]) -> bool:
+ """Сохраняет snapshot (Redis + fallback на память)"""
+ # Пробуем Redis
+ saved = await self._save_snapshot_to_redis(snapshot)
+
+ if saved:
+ # Очищаем память если Redis доступен
+ self._memory_snapshot.clear()
+ self._memory_snapshot_time = None
+ return True
+
+ # Fallback на память
+ self._memory_snapshot = snapshot.copy()
+ self._memory_snapshot_time = datetime.utcnow()
+ logger.warning("⚠️ Redis недоступен, snapshot сохранён в память")
+ return True
+
+ async def create_initial_snapshot(self) -> int:
"""
- Проверяет, превышает ли трафик пользователя заданный порог
-
- Args:
- db: Сессия базы данных
- user_uuid: UUID пользователя в Remnawave
- user_telegram_id: Telegram ID пользователя (для логирования)
-
- Returns:
- Кортеж (превышен ли порог, информация о трафике)
+ Создаёт начальный snapshot при запуске бота.
+ Если в Redis уже есть snapshot — использует его (персистентность).
+ Возвращает количество пользователей в snapshot.
"""
- if not self.is_traffic_monitoring_enabled():
- return False, {}
+ # Проверяем есть ли snapshot в Redis
+ existing_snapshot = await self._load_snapshot_from_redis()
+ if existing_snapshot:
+ age = await self.get_snapshot_age_minutes()
+ logger.info(
+ f"📦 Найден существующий snapshot в Redis: {len(existing_snapshot)} пользователей, "
+ f"возраст {age:.1f} мин"
+ )
+ return len(existing_snapshot)
- # Получаем статистику трафика
- traffic_info = await self.get_user_daily_traffic(user_uuid)
- total_gb = traffic_info.get('total_gb', 0)
+ logger.info("📸 Создание начального snapshot трафика...")
+ start_time = datetime.utcnow()
- # Получаем порог для сравнения
- threshold_gb = self.get_traffic_threshold_gb()
+ users = await self.get_all_users_with_traffic()
+ new_snapshot: Dict[str, float] = {}
- # Проверяем, превышает ли трафик порог
- is_exceeded = total_gb > threshold_gb
+ for user in users:
+ try:
+ if not user.uuid:
+ continue
- # Логируем проверку
- user_id_info = f"telegram_id={user_telegram_id}" if user_telegram_id else f"uuid={user_uuid}"
- status = "ПРЕВЫШЕНИЕ" if is_exceeded else "норма"
+ user_traffic = user.user_traffic
+ if not user_traffic:
+ continue
+
+ current_bytes = user_traffic.used_traffic_bytes or 0
+ new_snapshot[user.uuid] = current_bytes
+
+ except Exception as e:
+ logger.error(f"❌ Ошибка при создании snapshot для {user.uuid}: {e}")
+
+ # Сохраняем в Redis (с fallback на память)
+ await self._save_snapshot(new_snapshot)
+
+ elapsed = (datetime.utcnow() - start_time).total_seconds()
+ logger.info(f"✅ Snapshot создан за {elapsed:.1f}с: {len(new_snapshot)} пользователей")
+
+ return len(new_snapshot)
+
+ async def run_fast_check(self, bot) -> List[TrafficViolation]:
+ """
+ Быстрая проверка трафика с дельтой
+
+ Логика:
+ 1. Первый запуск — сохраняем snapshot, не отправляем уведомления
+ 2. Следующие запуски — сравниваем с snapshot, ищем превышения дельты
+ 3. После проверки обновляем snapshot (в Redis с fallback на память)
+ """
+ if not self.is_fast_check_enabled():
+ return []
+
+ start_time = datetime.utcnow()
+ is_first_run = not await self.has_snapshot()
+
+ if is_first_run:
+ logger.info("🚀 Первый запуск быстрой проверки — создаём snapshot...")
+ else:
+ age = await self.get_snapshot_age_minutes()
+ logger.info(f"🚀 Быстрая проверка трафика (snapshot {age:.1f} мин назад, порог {self.get_fast_check_threshold_gb()} ГБ)...")
+
+ violations: List[TrafficViolation] = []
+ threshold_bytes = self.get_fast_check_threshold_gb() * (1024 ** 3)
+
+ users = await self.get_all_users_with_traffic()
+ new_snapshot: Dict[str, float] = {}
+
+ # Загружаем предыдущий snapshot (из Redis или памяти)
+ previous_snapshot = await self._get_current_snapshot()
+ logger.debug(f"📦 Предыдущий snapshot: {len(previous_snapshot)} пользователей")
+
+ checked_users = 0
+ users_with_delta = 0
+
+ for user in users:
+ try:
+ if not user.uuid:
+ continue
+
+ # Получаем трафик из user_traffic
+ user_traffic = user.user_traffic
+ if not user_traffic:
+ continue
+
+ current_bytes = user_traffic.used_traffic_bytes or 0
+ new_snapshot[user.uuid] = current_bytes
+
+ # Первый запуск — только сохраняем, не проверяем
+ if is_first_run:
+ continue
+
+ # Пользователя не было в предыдущем snapshot — пропускаем (новый пользователь)
+ if user.uuid not in previous_snapshot:
+ continue
+
+ # Получаем предыдущее значение
+ previous_bytes = previous_snapshot.get(user.uuid, 0)
+
+ # Вычисляем дельту (может быть отрицательной при сбросе трафика)
+ delta_bytes = current_bytes - previous_bytes
+ if delta_bytes <= 0:
+ continue # Трафик сбросился или не изменился
+
+ users_with_delta += 1
+ delta_gb = delta_bytes / (1024 ** 3)
+
+ # Проверяем превышение дельты
+ if delta_bytes < threshold_bytes:
+ continue
+
+ logger.info(f"⚠️ Превышение дельты: {user.uuid[:8]}... +{delta_gb:.2f} ГБ (порог {self.get_fast_check_threshold_gb()} ГБ)")
+
+ # Проверяем фильтр по нодам
+ last_node_uuid = user_traffic.last_connected_node_uuid
+ if not self.should_monitor_node(last_node_uuid):
+ continue
+
+ # Создаём violation
+ delta_gb = round(delta_bytes / (1024 ** 3), 2)
+ violation = TrafficViolation(
+ user_uuid=user.uuid,
+ telegram_id=user.telegram_id,
+ full_name=user.username,
+ username=None,
+ used_traffic_gb=delta_gb, # Это дельта, не общий трафик!
+ threshold_gb=self.get_fast_check_threshold_gb(),
+ last_node_uuid=last_node_uuid,
+ last_node_name=None,
+ check_type="fast"
+ )
+ violations.append(violation)
+
+ except Exception as e:
+ logger.error(f"❌ Ошибка обработки пользователя {user.uuid}: {e}")
+
+ # Обновляем snapshot (в Redis с fallback на память)
+ await self._save_snapshot(new_snapshot)
+
+ elapsed = (datetime.utcnow() - start_time).total_seconds()
+
+ if is_first_run:
+ logger.info(
+ f"✅ Snapshot создан за {elapsed:.1f}с: {len(new_snapshot)} пользователей. "
+ f"Следующая проверка покажет превышения."
+ )
+ else:
+ logger.info(
+ f"✅ Быстрая проверка завершена за {elapsed:.1f}с: "
+ f"{len(users)} пользователей, {users_with_delta} с дельтой >0, {len(violations)} превышений"
+ )
+ # Отправляем уведомления только если это не первый запуск
+ await self._send_violation_notifications(violations, bot)
+
+ return violations
+
+ # ============== Суточная проверка ==============
+
+ async def run_daily_check(self, bot) -> List[TrafficViolation]:
+ """
+ Суточная проверка трафика за последние 24 часа
+ Использует bandwidth-stats API
+ """
+ if not self.is_daily_check_enabled():
+ return []
+
+ logger.info("🚀 Запуск суточной проверки трафика...")
+ start_time = datetime.utcnow()
+
+ violations: List[TrafficViolation] = []
+ threshold_bytes = self.get_daily_threshold_gb() * (1024 ** 3)
+
+ # Получаем период за последние 24 часа
+ now = datetime.utcnow()
+ start_date = (now - timedelta(hours=24)).strftime("%Y-%m-%dT%H:%M:%S.000Z")
+ end_date = now.strftime("%Y-%m-%dT%H:%M:%S.999Z")
+
+ users = await self.get_all_users_with_traffic()
+ semaphore = asyncio.Semaphore(self.get_concurrency())
+
+ async def check_user_daily_traffic(user) -> Optional[TrafficViolation]:
+ async with semaphore:
+ try:
+ if not user.uuid:
+ return None
+
+ # Получаем статистику за период
+ async with self.remnawave_service.get_api_client() as api:
+ stats = await api.get_bandwidth_stats_user(user.uuid, start_date, end_date)
+
+ if not stats:
+ return None
+
+ # Суммируем трафик по нодам
+ total_bytes = 0
+ if isinstance(stats, list):
+ for item in stats:
+ total_bytes += item.get('total', 0)
+ elif isinstance(stats, dict):
+ total_bytes = stats.get('total', 0)
+
+ if total_bytes < threshold_bytes:
+ return None
+
+ # Проверяем фильтр по нодам
+ user_traffic = user.user_traffic
+ last_node_uuid = user_traffic.last_connected_node_uuid if user_traffic else None
+ if not self.should_monitor_node(last_node_uuid):
+ return None
+
+ used_gb = round(total_bytes / (1024 ** 3), 2)
+ return TrafficViolation(
+ user_uuid=user.uuid,
+ telegram_id=user.telegram_id,
+ full_name=user.username,
+ username=None,
+ used_traffic_gb=used_gb,
+ threshold_gb=self.get_daily_threshold_gb(),
+ last_node_uuid=last_node_uuid,
+ last_node_name=None,
+ check_type="daily"
+ )
+
+ except Exception as e:
+ logger.error(f"❌ Ошибка суточной проверки для {user.uuid}: {e}")
+ return None
+
+ # Параллельная проверка
+ tasks = [check_user_daily_traffic(user) for user in users if user.uuid]
+ results = await asyncio.gather(*tasks, return_exceptions=True)
+
+ for result in results:
+ if isinstance(result, TrafficViolation):
+ violations.append(result)
+
+ elapsed = (datetime.utcnow() - start_time).total_seconds()
logger.info(
- f"📊 Проверка трафика для {user_id_info}: {total_gb} ГБ, "
- f"порог: {threshold_gb} ГБ, статус: {status}"
+ f"✅ Суточная проверка завершена за {elapsed:.1f}с: "
+ f"{len(users)} пользователей, {len(violations)} превышений"
)
- return is_exceeded, traffic_info
+ # Отправляем уведомления
+ await self._send_violation_notifications(violations, bot)
+
+ return violations
+
+ # ============== Уведомления ==============
+
+ async def _send_violation_notifications(self, violations: List[TrafficViolation], bot):
+ """Отправляет уведомления о превышениях"""
+ if not violations or not bot:
+ return
+
+ admin_service = AdminNotificationService(bot)
+ topic_id = settings.SUSPICIOUS_NOTIFICATIONS_TOPIC_ID
+
+ # Ограничиваем количество уведомлений за раз (защита от flood)
+ max_notifications = 10
+ if len(violations) > max_notifications:
+ logger.warning(
+ f"⚠️ Слишком много превышений ({len(violations)}), "
+ f"отправляем только первые {max_notifications}"
+ )
+ violations = violations[:max_notifications]
+
+ for i, violation in enumerate(violations):
+ try:
+ if not await self.should_send_notification(violation.user_uuid):
+ logger.debug(f"⏭️ Кулдаун для {violation.user_uuid}, пропускаем")
+ continue
+
+ # Получаем информацию о пользователе из БД
+ user_info = ""
+ async with AsyncSessionLocal() as db:
+ db_user = await get_user_by_remnawave_uuid(db, violation.user_uuid)
+ if db_user:
+ user_info = (
+ f"👤 {db_user.full_name or 'Без имени'}\n"
+ f"🆔 Telegram ID: {db_user.telegram_id}\n"
+ )
+ if db_user.username:
+ user_info += f"📱 Username: @{db_user.username}\n"
+
+ if violation.check_type == "fast":
+ check_type_emoji = "⚡"
+ check_type_name = "Быстрая проверка"
+ traffic_label = "За интервал"
+ elif violation.check_type == "daily":
+ check_type_emoji = "📅"
+ check_type_name = "Суточная проверка"
+ traffic_label = "За 24 часа"
+ else:
+ check_type_emoji = "🔍"
+ check_type_name = "Ручная проверка"
+ traffic_label = "Использовано"
+
+ message = (
+ f"⚠️ Превышение трафика\n\n"
+ f"{user_info}"
+ f"🔑 UUID: {violation.user_uuid}\n\n"
+ f"{check_type_emoji} {check_type_name}\n"
+ f"📊 {traffic_label}: {violation.used_traffic_gb} ГБ\n"
+ f"📈 Порог: {violation.threshold_gb} ГБ\n"
+ f"🚨 Превышение: {violation.used_traffic_gb - violation.threshold_gb:.2f} ГБ\n"
+ )
+
+ if violation.last_node_uuid:
+ message += f"\n🖥 Последняя нода: {violation.last_node_uuid}"
+
+ message += f"\n\n⏰ {datetime.utcnow().strftime('%d.%m.%Y %H:%M:%S')} UTC"
+
+ await admin_service.send_suspicious_traffic_notification(message, bot, topic_id)
+ await self.record_notification(violation.user_uuid)
+
+ logger.info(f"📨 Уведомление отправлено для {violation.user_uuid}")
+
+ # Задержка между отправками (защита от flood)
+ if i < len(violations) - 1:
+ await asyncio.sleep(0.5)
+
+ except Exception as e:
+ logger.error(f"❌ Ошибка отправки уведомления для {violation.user_uuid}: {e}")
+
+
+class TrafficMonitoringSchedulerV2:
+ """
+ Планировщик проверок трафика v2
+ - Быстрая проверка каждые N минут
+ - Суточная проверка в заданное время
+ """
+
+ def __init__(self, service: TrafficMonitoringServiceV2):
+ self.service = service
+ self.bot = None
+ self._fast_check_task: Optional[asyncio.Task] = None
+ self._daily_check_task: Optional[asyncio.Task] = None
+ self._is_running = False
+
+ def set_bot(self, bot):
+ """Устанавливает экземпляр бота"""
+ self.bot = bot
+
+ async def start(self):
+ """Запускает планировщик"""
+ if self._is_running:
+ logger.warning("Планировщик мониторинга трафика уже запущен")
+ return
+
+ if not self.bot:
+ logger.error("Бот не установлен для планировщика мониторинга")
+ return
+
+ self._is_running = True
+
+ # Создаём начальный snapshot при старте (без уведомлений!)
+ if self.service.is_fast_check_enabled():
+ await self.service.create_initial_snapshot()
+
+ # Запускаем быструю проверку
+ if self.service.is_fast_check_enabled():
+ interval = self.service.get_fast_check_interval_seconds()
+ logger.info(f"🚀 Запуск быстрой проверки трафика каждые {interval // 60} мин")
+ self._fast_check_task = asyncio.create_task(self._run_fast_check_loop(interval))
+
+ # Запускаем суточную проверку
+ if self.service.is_daily_check_enabled():
+ check_time = self.service.get_daily_check_time()
+ if check_time:
+ logger.info(f"🚀 Запуск суточной проверки трафика в {check_time.strftime('%H:%M')}")
+ self._daily_check_task = asyncio.create_task(self._run_daily_check_loop(check_time))
+
+ async def stop(self):
+ """Останавливает планировщик"""
+ self._is_running = False
+
+ if self._fast_check_task:
+ self._fast_check_task.cancel()
+ try:
+ await self._fast_check_task
+ except asyncio.CancelledError:
+ pass
+ self._fast_check_task = None
+
+ if self._daily_check_task:
+ self._daily_check_task.cancel()
+ try:
+ await self._daily_check_task
+ except asyncio.CancelledError:
+ pass
+ self._daily_check_task = None
+
+ logger.info("ℹ️ Планировщик мониторинга трафика остановлен")
+
+ async def _run_fast_check_loop(self, interval_seconds: int):
+ """Цикл быстрой проверки"""
+ # Сначала ждём интервал (snapshot уже создан в start())
+ logger.info(f"⏳ Первая проверка через {interval_seconds // 60} минут...")
+ await asyncio.sleep(interval_seconds)
+
+ while self._is_running:
+ try:
+ await self.service.cleanup_notification_cache()
+ await self.service.run_fast_check(self.bot)
+ await asyncio.sleep(interval_seconds)
+ except asyncio.CancelledError:
+ break
+ except Exception as e:
+ logger.error(f"❌ Ошибка в цикле быстрой проверки: {e}")
+ await asyncio.sleep(interval_seconds)
+
+ async def _run_daily_check_loop(self, check_time: time):
+ """Цикл суточной проверки"""
+ while self._is_running:
+ try:
+ # Вычисляем время до следующей проверки
+ now = datetime.utcnow()
+ next_run = datetime.combine(now.date(), check_time)
+ if next_run <= now:
+ next_run += timedelta(days=1)
+
+ delay = (next_run - now).total_seconds()
+ logger.debug(f"⏰ Следующая суточная проверка через {delay / 3600:.1f}ч")
+
+ await asyncio.sleep(delay)
+
+ if self._is_running:
+ await self.service.run_daily_check(self.bot)
+
+ except asyncio.CancelledError:
+ break
+ except Exception as e:
+ logger.error(f"❌ Ошибка в цикле суточной проверки: {e}")
+ await asyncio.sleep(3600) # Ждём час при ошибке
+
+ async def run_fast_check_now(self) -> List[TrafficViolation]:
+ """Запускает быструю проверку немедленно"""
+ return await self.service.run_fast_check(self.bot)
+
+ async def run_daily_check_now(self) -> List[TrafficViolation]:
+ """Запускает суточную проверку немедленно"""
+ return await self.service.run_daily_check(self.bot)
+
+
+# ============== Обратная совместимость ==============
+
+class TrafficMonitoringService:
+ """Обёртка для обратной совместимости со старым API"""
+
+ def __init__(self):
+ self._v2 = TrafficMonitoringServiceV2()
+ self.remnawave_service = self._v2.remnawave_service
+
+ def is_traffic_monitoring_enabled(self) -> bool:
+ # Используем старый параметр или новые
+ return (
+ settings.TRAFFIC_MONITORING_ENABLED or
+ settings.TRAFFIC_FAST_CHECK_ENABLED or
+ settings.TRAFFIC_DAILY_CHECK_ENABLED
+ )
+
+ def get_traffic_threshold_gb(self) -> float:
+ """Возвращает порог трафика"""
+ if settings.TRAFFIC_FAST_CHECK_ENABLED:
+ return settings.TRAFFIC_FAST_CHECK_THRESHOLD_GB
+ return settings.TRAFFIC_THRESHOLD_GB_PER_DAY
+
+ async def check_user_traffic_threshold(
+ self,
+ db: AsyncSession,
+ user_uuid: str,
+ user_telegram_id: int = None
+ ) -> tuple:
+ """Проверяет трафик одного пользователя (для обратной совместимости)"""
+ try:
+ threshold_gb = self.get_traffic_threshold_gb()
+ threshold_bytes = threshold_gb * (1024 ** 3)
+
+ # Получаем пользователя из Remnawave
+ async with self.remnawave_service.get_api_client() as api:
+ user = await api.get_user_by_uuid(user_uuid)
+
+ if not user or not user.user_traffic:
+ return False, {'total_gb': 0, 'nodes': []}
+
+ used_bytes = user.user_traffic.used_traffic_bytes or 0
+ total_gb = round(used_bytes / (1024 ** 3), 2)
+
+ is_exceeded = used_bytes > threshold_bytes
+
+ traffic_info = {
+ 'total_gb': total_gb,
+ 'nodes': [],
+ 'threshold_gb': threshold_gb
+ }
+
+ return is_exceeded, traffic_info
+
+ except Exception as e:
+ logger.error(f"Ошибка проверки трафика для {user_uuid}: {e}")
+ return False, {'total_gb': 0, 'nodes': []}
async def process_suspicious_traffic(
self,
db: AsyncSession,
user_uuid: str,
- traffic_info: Dict,
+ traffic_info: dict,
bot
):
- """
- Обрабатывает подозрительный трафик - отправляет уведомление администраторам
- """
- try:
- # Получаем информацию о пользователе из базы данных
- user = await get_user_by_remnawave_uuid(db, user_uuid)
- if not user:
- logger.warning(f"Пользователь с UUID {user_uuid} не найден в базе данных")
- return
-
- # Формируем сообщение для администраторов
- total_gb = traffic_info.get('total_gb', 0)
- threshold_gb = self.get_traffic_threshold_gb()
-
- message = (
- f"⚠️ Подозрительная активность трафика\n\n"
- f"👤 Пользователь: {user.full_name} (ID: {user.telegram_id})\n"
- f"🔑 UUID: {user_uuid}\n"
- f"📊 Трафик за сутки: {total_gb} ГБ\n"
- f"📈 Порог: {threshold_gb} ГБ\n"
- f"🚨 Превышение: {total_gb - threshold_gb:.2f} ГБ\n\n"
- )
-
- # Добавляем информацию по нодам, если есть
- nodes = traffic_info.get('nodes', [])
- if nodes:
- message += "Разбивка по нодам:\n"
- for node_info in nodes[:5]: # Показываем первые 5 нод
- message += f" • {node_info.get('node', 'Unknown')}: {node_info.get('gb', 0)} ГБ\n"
- if len(nodes) > 5:
- message += f" • и ещё {len(nodes) - 5} нод(ы)\n"
-
- message += f"\n⏰ Время проверки: {datetime.utcnow().strftime('%d.%m.%Y %H:%M:%S UTC')}"
-
- # Создаем AdminNotificationService с ботом
- admin_notification_service = AdminNotificationService(bot)
-
- # Отправляем уведомление администраторам
- topic_id = self.get_suspicious_notifications_topic_id()
-
- await admin_notification_service.send_suspicious_traffic_notification(
- message,
- bot,
- topic_id
- )
-
- logger.info(
- f"✅ Уведомление о подозрительном трафике отправлено для пользователя {user.telegram_id}"
- )
-
- except Exception as e:
- logger.error(f"❌ Ошибка при обработке подозрительного трафика для {user_uuid}: {e}")
+ """Отправляет уведомление о подозрительном трафике"""
+ violation = TrafficViolation(
+ user_uuid=user_uuid,
+ telegram_id=None,
+ full_name=None,
+ username=None,
+ used_traffic_gb=traffic_info.get('total_gb', 0),
+ threshold_gb=traffic_info.get('threshold_gb', self.get_traffic_threshold_gb()),
+ last_node_uuid=None,
+ last_node_name=None,
+ check_type="manual"
+ )
+ await self._v2._send_violation_notifications([violation], bot)
async def check_all_users_traffic(self, db: AsyncSession, bot):
- """
- Проверяет трафик всех пользователей с активной подпиской
- """
- if not self.is_traffic_monitoring_enabled():
- logger.info("Мониторинг трафика отключен, пропускаем проверку всех пользователей")
- return
+ """Старый метод — теперь вызывает быструю проверку"""
+ await self._v2.run_fast_check(bot)
- try:
- from app.database.crud.user import get_users_with_active_subscriptions
- # Получаем всех пользователей с активной подпиской
- users = await get_users_with_active_subscriptions(db)
-
- logger.info(f"Начинаем проверку трафика для {len(users)} пользователей")
-
- # Проверяем трафик для каждого пользователя
- for user in users:
- if user.remnawave_uuid: # Проверяем только пользователей с UUID
- is_exceeded, traffic_info = await self.check_user_traffic_threshold(
- db,
- user.remnawave_uuid,
- user.telegram_id
- )
-
- if is_exceeded:
- await self.process_suspicious_traffic(
- db,
- user.remnawave_uuid,
- traffic_info,
- bot
- )
-
- logger.info("Завершена проверка трафика всех пользователей")
-
- except Exception as e:
- logger.error(f"❌ Ошибка при проверке трафика всех пользователей: {e}")
+# Глобальные экземпляры (создаём до класса-обёртки)
+traffic_monitoring_service_v2 = TrafficMonitoringServiceV2()
+traffic_monitoring_scheduler_v2 = TrafficMonitoringSchedulerV2(traffic_monitoring_service_v2)
class TrafficMonitoringScheduler:
- """
- Класс для планирования периодических проверок трафика
- """
- def __init__(self, traffic_service: TrafficMonitoringService):
- self.traffic_service = traffic_service
- self.check_task = None
- self.is_running = False
+ """Обёртка для обратной совместимости — использует глобальные v2 экземпляры"""
+
+ def __init__(self, traffic_service: TrafficMonitoringService = None):
+ # Используем глобальные экземпляры!
+ self._v2_service = traffic_monitoring_service_v2
+ self._v2_scheduler = traffic_monitoring_scheduler_v2
self.bot = None
- # Кэш уведомлений: {user_uuid: дата_последнего_уведомления}
- self._notification_cache: Dict[str, datetime] = {}
def set_bot(self, bot):
- """Устанавливает экземпляр бота для отправки уведомлений"""
self.bot = bot
+ self._v2_scheduler.set_bot(bot)
def is_enabled(self) -> bool:
- """Проверяет, включен ли мониторинг трафика"""
- return self.traffic_service.is_traffic_monitoring_enabled()
+ return self._v2_service.is_fast_check_enabled() or self._v2_service.is_daily_check_enabled()
def get_interval_hours(self) -> int:
- """Получает интервал проверки в часах"""
- return self.traffic_service.get_monitoring_interval_hours()
+ """Для обратной совместимости — возвращает интервал быстрой проверки в часах"""
+ return max(1, self._v2_service.get_fast_check_interval_seconds() // 3600)
+
+ def get_status_info(self) -> str:
+ """Возвращает информацию о статусе мониторинга"""
+ info = []
+ if self._v2_service.is_fast_check_enabled():
+ interval_min = self._v2_service.get_fast_check_interval_seconds() // 60
+ threshold = self._v2_service.get_fast_check_threshold_gb()
+ info.append(f"Быстрая: каждые {interval_min} мин, порог {threshold} ГБ")
+ if self._v2_service.is_daily_check_enabled():
+ check_time = self._v2_service.get_daily_check_time()
+ threshold = self._v2_service.get_daily_threshold_gb()
+ time_str = check_time.strftime('%H:%M') if check_time else "00:00"
+ info.append(f"Суточная: в {time_str}, порог {threshold} ГБ")
+ return "; ".join(info) if info else "Отключен"
+
+ async def _should_send_notification(self, user_uuid: str) -> bool:
+ """Для обратной совместимости"""
+ return await self._v2_service.should_send_notification(user_uuid)
+
+ async def _record_notification(self, user_uuid: str):
+ """Для обратной совместимости"""
+ await self._v2_service.record_notification(user_uuid)
async def start_monitoring(self):
- """
- Запускает периодическую проверку трафика
- """
- if self.is_running:
- logger.warning("Мониторинг трафика уже запущен")
- return
-
- if not self.is_enabled():
- logger.info("Мониторинг трафика отключен в настройках")
- return
-
- if not self.bot:
- logger.error("Бот не установлен для мониторинга трафика")
- return
-
- self.is_running = True
- interval_hours = self.get_interval_hours()
- interval_seconds = interval_hours * 3600
-
- logger.info(f"🚀 Запуск мониторинга трафика с интервалом {interval_hours} ч")
-
- # Запускаем задачу с интервалом
- self.check_task = asyncio.create_task(self._periodic_check(interval_seconds))
+ await self._v2_scheduler.start()
def stop_monitoring(self):
- """
- Останавливает периодическую проверку трафика
- """
- self.is_running = False
- if self.check_task:
- self.check_task.cancel()
- logger.info("ℹ️ Мониторинг трафика остановлен")
-
- def _should_send_notification(self, user_uuid: str) -> bool:
- """
- Проверяет, нужно ли отправлять уведомление для пользователя.
- Защита от спама: одно уведомление в сутки на пользователя.
- """
- now = datetime.utcnow()
- last_notification = self._notification_cache.get(user_uuid)
-
- if last_notification is None:
- return True
-
- # Если прошло больше 24 часов с последнего уведомления
- return (now - last_notification) > timedelta(hours=24)
-
- def _record_notification(self, user_uuid: str):
- """Записывает факт отправки уведомления"""
- self._notification_cache[user_uuid] = datetime.utcnow()
-
- def _cleanup_notification_cache(self):
- """Очищает старые записи из кэша (старше 48 часов)"""
- now = datetime.utcnow()
- expired = [
- uuid for uuid, dt in self._notification_cache.items()
- if (now - dt) > timedelta(hours=48)
- ]
- for uuid in expired:
- del self._notification_cache[uuid]
- if expired:
- logger.debug(f"🧹 Очищено {len(expired)} старых записей из кэша уведомлений о трафике")
-
- async def _periodic_check(self, interval_seconds: int):
- """
- Выполняет периодическую проверку трафика
- """
- while self.is_running:
- try:
- logger.info("📊 Запуск периодической проверки трафика")
-
- # Очищаем старый кэш
- self._cleanup_notification_cache()
-
- # Получаем сессию БД внутри цикла
- async for db in get_db():
- try:
- await self._check_all_users_traffic(db)
- finally:
- break
-
- # Ждем указанный интервал перед следующей проверкой
- await asyncio.sleep(interval_seconds)
-
- except asyncio.CancelledError:
- logger.info("Задача периодической проверки трафика отменена")
- break
- except Exception as e:
- logger.error(f"❌ Ошибка в периодической проверке трафика: {e}")
- # Даже при ошибке продолжаем цикл, ждем интервал и пробуем снова
- await asyncio.sleep(interval_seconds)
-
- async def _check_all_users_traffic(self, db: AsyncSession):
- """
- Проверяет трафик всех пользователей с активной подпиской
- """
- try:
- from app.database.crud.user import get_users_with_active_subscriptions
-
- # Получаем всех пользователей с активной подпиской
- users = await get_users_with_active_subscriptions(db)
-
- checked_count = 0
- exceeded_count = 0
-
- logger.info(f"📊 Начинаем проверку трафика для {len(users)} пользователей")
-
- # Проверяем трафик для каждого пользователя
- for user in users:
- if user.remnawave_uuid:
- is_exceeded, traffic_info = await self.traffic_service.check_user_traffic_threshold(
- db,
- user.remnawave_uuid,
- user.telegram_id
- )
- checked_count += 1
-
- if is_exceeded:
- exceeded_count += 1
- # Проверяем, не отправляли ли уже уведомление
- if self._should_send_notification(user.remnawave_uuid):
- await self.traffic_service.process_suspicious_traffic(
- db,
- user.remnawave_uuid,
- traffic_info,
- self.bot
- )
- self._record_notification(user.remnawave_uuid)
- else:
- logger.debug(
- f"⏭️ Пропуск уведомления для {user.telegram_id} — уже отправляли сегодня"
- )
-
- logger.info(
- f"✅ Проверка трафика завершена: проверено {checked_count}, превышений {exceeded_count}"
- )
-
- except Exception as e:
- logger.error(f"❌ Ошибка при проверке трафика всех пользователей: {e}")
+ asyncio.create_task(self._v2_scheduler.stop())
-# Глобальные экземпляры сервисов
+# Обратная совместимость
traffic_monitoring_service = TrafficMonitoringService()
-traffic_monitoring_scheduler = TrafficMonitoringScheduler(traffic_monitoring_service)
\ No newline at end of file
+traffic_monitoring_scheduler = TrafficMonitoringScheduler()
diff --git a/app/services/tribute_service.py b/app/services/tribute_service.py
index 7cba2ae0..fa4c3e54 100644
--- a/app/services/tribute_service.py
+++ b/app/services/tribute_service.py
@@ -316,9 +316,12 @@ class TributeService:
has_saved_cart = False
# Умная автоактивация если автопокупка не сработала
+ activation_notification_sent = False
if not auto_purchase_success:
try:
- await auto_activate_subscription_after_topup(session, user)
+ _, activation_notification_sent = await auto_activate_subscription_after_topup(
+ session, user, bot=self.bot, topup_amount=amount_kopeks
+ )
except Exception as auto_activate_error:
logger.error(
"Ошибка умной автоактивации для пользователя %s: %s",
@@ -327,7 +330,8 @@ class TributeService:
exc_info=True,
)
- if has_saved_cart and self.bot:
+ # Отправляем уведомление только если его ещё не отправили
+ if has_saved_cart and self.bot and not activation_notification_sent:
# Если у пользователя есть сохраненная корзина,
# отправляем ему уведомление с кнопкой вернуться к оформлению
from app.localization.texts import get_texts
diff --git a/main.py b/main.py
index 0d622271..d20cbcd4 100644
--- a/main.py
+++ b/main.py
@@ -589,10 +589,9 @@ async def main():
traffic_monitoring_task = asyncio.create_task(
traffic_monitoring_scheduler.start_monitoring()
)
- interval_hours = traffic_monitoring_scheduler.get_interval_hours()
- threshold_gb = settings.TRAFFIC_THRESHOLD_GB_PER_DAY
- stage.log(f"Интервал проверки: {interval_hours} ч")
- stage.log(f"Порог трафика: {threshold_gb} ГБ/сутки")
+ # Показываем информацию о новом мониторинге v2
+ status_info = traffic_monitoring_scheduler.get_status_info()
+ stage.log(status_info)
else:
traffic_monitoring_task = None
stage.skip("Мониторинг трафика отключен настройками")
diff --git a/tests/services/test_traffic_monitoring_redis.py b/tests/services/test_traffic_monitoring_redis.py
new file mode 100644
index 00000000..30993e31
--- /dev/null
+++ b/tests/services/test_traffic_monitoring_redis.py
@@ -0,0 +1,380 @@
+"""
+Тесты для хранения snapshot трафика в Redis.
+"""
+import pytest
+from datetime import datetime, timedelta
+from unittest.mock import AsyncMock, MagicMock, patch
+
+from app.services.traffic_monitoring_service import (
+ TrafficMonitoringServiceV2,
+ TRAFFIC_SNAPSHOT_KEY,
+ TRAFFIC_SNAPSHOT_TIME_KEY,
+ TRAFFIC_NOTIFICATION_CACHE_KEY,
+)
+
+
+@pytest.fixture
+def service():
+ """Создаёт экземпляр сервиса для тестов."""
+ return TrafficMonitoringServiceV2()
+
+
+@pytest.fixture
+def mock_cache():
+ """Мок для cache сервиса."""
+ with patch('app.services.traffic_monitoring_service.cache') as mock:
+ mock.set = AsyncMock(return_value=True)
+ mock.get = AsyncMock(return_value=None)
+ yield mock
+
+
+@pytest.fixture
+def sample_snapshot():
+ """Пример snapshot данных."""
+ return {
+ "uuid-1": 1073741824.0, # 1 GB
+ "uuid-2": 2147483648.0, # 2 GB
+ "uuid-3": 5368709120.0, # 5 GB
+ }
+
+
+# ============== Тесты сохранения snapshot в Redis ==============
+
+async def test_save_snapshot_to_redis_success(service, mock_cache, sample_snapshot):
+ """Тест успешного сохранения snapshot в Redis."""
+ mock_cache.set = AsyncMock(return_value=True)
+
+ result = await service._save_snapshot_to_redis(sample_snapshot)
+
+ assert result is True
+ assert mock_cache.set.call_count == 2 # snapshot + time
+
+ # Проверяем что сохранён snapshot
+ first_call = mock_cache.set.call_args_list[0]
+ assert first_call[0][0] == TRAFFIC_SNAPSHOT_KEY
+ assert first_call[0][1] == sample_snapshot
+
+
+async def test_save_snapshot_to_redis_failure(service, mock_cache, sample_snapshot):
+ """Тест неудачного сохранения snapshot в Redis."""
+ mock_cache.set = AsyncMock(return_value=False)
+
+ result = await service._save_snapshot_to_redis(sample_snapshot)
+
+ assert result is False
+
+
+async def test_save_snapshot_to_redis_exception(service, mock_cache, sample_snapshot):
+ """Тест обработки исключения при сохранении."""
+ mock_cache.set = AsyncMock(side_effect=Exception("Redis error"))
+
+ result = await service._save_snapshot_to_redis(sample_snapshot)
+
+ assert result is False
+
+
+# ============== Тесты загрузки snapshot из Redis ==============
+
+async def test_load_snapshot_from_redis_success(service, mock_cache, sample_snapshot):
+ """Тест успешной загрузки snapshot из Redis."""
+ mock_cache.get = AsyncMock(return_value=sample_snapshot)
+
+ result = await service._load_snapshot_from_redis()
+
+ assert result == sample_snapshot
+ mock_cache.get.assert_called_once_with(TRAFFIC_SNAPSHOT_KEY)
+
+
+async def test_load_snapshot_from_redis_empty(service, mock_cache):
+ """Тест загрузки когда snapshot отсутствует."""
+ mock_cache.get = AsyncMock(return_value=None)
+
+ result = await service._load_snapshot_from_redis()
+
+ assert result is None
+
+
+async def test_load_snapshot_from_redis_invalid_data(service, mock_cache):
+ """Тест загрузки невалидных данных."""
+ mock_cache.get = AsyncMock(return_value="not a dict")
+
+ result = await service._load_snapshot_from_redis()
+
+ assert result is None
+
+
+async def test_load_snapshot_from_redis_exception(service, mock_cache):
+ """Тест обработки исключения при загрузке."""
+ mock_cache.get = AsyncMock(side_effect=Exception("Redis error"))
+
+ result = await service._load_snapshot_from_redis()
+
+ assert result is None
+
+
+# ============== Тесты времени snapshot ==============
+
+async def test_get_snapshot_time_from_redis_success(service, mock_cache):
+ """Тест получения времени snapshot."""
+ test_time = datetime(2024, 1, 15, 12, 30, 0)
+ mock_cache.get = AsyncMock(return_value=test_time.isoformat())
+
+ result = await service._get_snapshot_time_from_redis()
+
+ assert result == test_time
+ mock_cache.get.assert_called_once_with(TRAFFIC_SNAPSHOT_TIME_KEY)
+
+
+async def test_get_snapshot_time_from_redis_empty(service, mock_cache):
+ """Тест когда время отсутствует."""
+ mock_cache.get = AsyncMock(return_value=None)
+
+ result = await service._get_snapshot_time_from_redis()
+
+ assert result is None
+
+
+# ============== Тесты has_snapshot ==============
+
+async def test_has_snapshot_redis_exists(service, mock_cache, sample_snapshot):
+ """Тест has_snapshot когда snapshot есть в Redis."""
+ mock_cache.get = AsyncMock(return_value=sample_snapshot)
+
+ result = await service.has_snapshot()
+
+ assert result is True
+
+
+async def test_has_snapshot_memory_fallback(service, mock_cache):
+ """Тест has_snapshot с fallback на память."""
+ mock_cache.get = AsyncMock(return_value=None)
+
+ # Устанавливаем данные в память
+ service._memory_snapshot = {"uuid-1": 1000.0}
+ service._memory_snapshot_time = datetime.utcnow()
+
+ result = await service.has_snapshot()
+
+ assert result is True
+
+
+async def test_has_snapshot_none(service, mock_cache):
+ """Тест has_snapshot когда snapshot нет нигде."""
+ mock_cache.get = AsyncMock(return_value=None)
+ service._memory_snapshot = {}
+ service._memory_snapshot_time = None
+
+ result = await service.has_snapshot()
+
+ assert result is False
+
+
+# ============== Тесты get_snapshot_age_minutes ==============
+
+async def test_get_snapshot_age_minutes_from_redis(service, mock_cache):
+ """Тест возраста snapshot из Redis."""
+ # Snapshot создан 30 минут назад
+ past_time = datetime.utcnow() - timedelta(minutes=30)
+ mock_cache.get = AsyncMock(return_value=past_time.isoformat())
+
+ result = await service.get_snapshot_age_minutes()
+
+ assert 29 <= result <= 31 # Допуск на время выполнения
+
+
+async def test_get_snapshot_age_minutes_memory_fallback(service, mock_cache):
+ """Тест возраста snapshot из памяти."""
+ mock_cache.get = AsyncMock(return_value=None)
+ service._memory_snapshot_time = datetime.utcnow() - timedelta(minutes=15)
+
+ result = await service.get_snapshot_age_minutes()
+
+ assert 14 <= result <= 16
+
+
+async def test_get_snapshot_age_minutes_no_snapshot(service, mock_cache):
+ """Тест возраста когда snapshot нет."""
+ mock_cache.get = AsyncMock(return_value=None)
+ service._memory_snapshot_time = None
+
+ result = await service.get_snapshot_age_minutes()
+
+ assert result == float('inf')
+
+
+# ============== Тесты _save_snapshot (с fallback) ==============
+
+async def test_save_snapshot_redis_success(service, mock_cache, sample_snapshot):
+ """Тест сохранения snapshot в Redis успешно."""
+ mock_cache.set = AsyncMock(return_value=True)
+
+ # Заполняем память чтобы проверить что она очистится
+ service._memory_snapshot = {"old": 123.0}
+ service._memory_snapshot_time = datetime.utcnow()
+
+ result = await service._save_snapshot(sample_snapshot)
+
+ assert result is True
+ assert service._memory_snapshot == {} # Память очищена
+ assert service._memory_snapshot_time is None
+
+
+async def test_save_snapshot_fallback_to_memory(service, mock_cache, sample_snapshot):
+ """Тест fallback на память когда Redis недоступен."""
+ mock_cache.set = AsyncMock(return_value=False)
+
+ result = await service._save_snapshot(sample_snapshot)
+
+ assert result is True
+ assert service._memory_snapshot == sample_snapshot
+ assert service._memory_snapshot_time is not None
+
+
+# ============== Тесты _get_current_snapshot ==============
+
+async def test_get_current_snapshot_from_redis(service, mock_cache, sample_snapshot):
+ """Тест получения snapshot из Redis."""
+ mock_cache.get = AsyncMock(return_value=sample_snapshot)
+
+ result = await service._get_current_snapshot()
+
+ assert result == sample_snapshot
+
+
+async def test_get_current_snapshot_fallback_to_memory(service, mock_cache, sample_snapshot):
+ """Тест fallback на память."""
+ mock_cache.get = AsyncMock(return_value=None)
+ service._memory_snapshot = sample_snapshot
+
+ result = await service._get_current_snapshot()
+
+ assert result == sample_snapshot
+
+
+# ============== Тесты уведомлений ==============
+
+async def test_save_notification_to_redis(service, mock_cache):
+ """Тест сохранения времени уведомления."""
+ mock_cache.set = AsyncMock(return_value=True)
+
+ result = await service._save_notification_to_redis("uuid-123")
+
+ assert result is True
+ mock_cache.set.assert_called_once()
+ call_args = mock_cache.set.call_args
+ assert "traffic:notifications:uuid-123" in call_args[0][0]
+
+
+async def test_get_notification_time_from_redis(service, mock_cache):
+ """Тест получения времени уведомления."""
+ test_time = datetime(2024, 1, 15, 10, 0, 0)
+ mock_cache.get = AsyncMock(return_value=test_time.isoformat())
+
+ result = await service._get_notification_time_from_redis("uuid-123")
+
+ assert result == test_time
+
+
+async def test_should_send_notification_no_previous(service, mock_cache):
+ """Тест should_send_notification когда уведомлений не было."""
+ mock_cache.get = AsyncMock(return_value=None)
+ service._memory_notification_cache = {}
+
+ result = await service.should_send_notification("uuid-123")
+
+ assert result is True
+
+
+async def test_should_send_notification_cooldown_active(service, mock_cache):
+ """Тест should_send_notification когда кулдаун активен."""
+ # Уведомление было 5 минут назад, кулдаун 60 минут
+ recent_time = datetime.utcnow() - timedelta(minutes=5)
+ mock_cache.get = AsyncMock(return_value=recent_time.isoformat())
+
+ result = await service.should_send_notification("uuid-123")
+
+ assert result is False
+
+
+async def test_should_send_notification_cooldown_expired(service, mock_cache):
+ """Тест should_send_notification когда кулдаун истёк."""
+ # Уведомление было 120 минут назад, кулдаун 60 минут
+ old_time = datetime.utcnow() - timedelta(minutes=120)
+ mock_cache.get = AsyncMock(return_value=old_time.isoformat())
+
+ result = await service.should_send_notification("uuid-123")
+
+ assert result is True
+
+
+async def test_record_notification_redis(service, mock_cache):
+ """Тест record_notification сохраняет в Redis."""
+ mock_cache.set = AsyncMock(return_value=True)
+
+ await service.record_notification("uuid-123")
+
+ mock_cache.set.assert_called_once()
+
+
+async def test_record_notification_fallback_to_memory(service, mock_cache):
+ """Тест record_notification с fallback на память."""
+ mock_cache.set = AsyncMock(return_value=False)
+
+ await service.record_notification("uuid-123")
+
+ assert "uuid-123" in service._memory_notification_cache
+
+
+# ============== Тесты create_initial_snapshot ==============
+
+async def test_create_initial_snapshot_uses_existing_redis(service, mock_cache, sample_snapshot):
+ """Тест что create_initial_snapshot использует существующий snapshot из Redis."""
+ mock_cache.get = AsyncMock(side_effect=[
+ sample_snapshot, # _load_snapshot_from_redis
+ (datetime.utcnow() - timedelta(minutes=10)).isoformat(), # _get_snapshot_time_from_redis
+ ])
+
+ with patch.object(service, 'get_all_users_with_traffic', new_callable=AsyncMock) as mock_get_users:
+ result = await service.create_initial_snapshot()
+
+ # Не должен вызывать API - используем существующий snapshot
+ mock_get_users.assert_not_called()
+ assert result == len(sample_snapshot)
+
+
+async def test_create_initial_snapshot_creates_new(service, mock_cache):
+ """Тест создания нового snapshot когда в Redis пусто."""
+ mock_cache.get = AsyncMock(return_value=None)
+ mock_cache.set = AsyncMock(return_value=True)
+
+ # Мокаем пользователей из API
+ mock_user = MagicMock()
+ mock_user.uuid = "uuid-1"
+ mock_user.user_traffic = MagicMock()
+ mock_user.user_traffic.used_traffic_bytes = 1073741824 # 1 GB
+
+ with patch.object(service, 'get_all_users_with_traffic', new_callable=AsyncMock) as mock_get_users:
+ mock_get_users.return_value = [mock_user]
+
+ result = await service.create_initial_snapshot()
+
+ mock_get_users.assert_called_once()
+ assert result == 1
+
+
+# ============== Тесты cleanup_notification_cache ==============
+
+async def test_cleanup_notification_cache_removes_old(service, mock_cache):
+ """Тест очистки старых записей из памяти."""
+ old_time = datetime.utcnow() - timedelta(hours=25)
+ recent_time = datetime.utcnow() - timedelta(hours=1)
+
+ service._memory_notification_cache = {
+ "uuid-old": old_time,
+ "uuid-recent": recent_time,
+ }
+
+ await service.cleanup_notification_cache()
+
+ assert "uuid-old" not in service._memory_notification_cache
+ assert "uuid-recent" in service._memory_notification_cache