diff --git a/.env.example b/.env.example
index 73b5a2b0..9969cf43 100644
--- a/.env.example
+++ b/.env.example
@@ -19,6 +19,19 @@ ADMIN_REPORTS_ENABLED=false
ADMIN_REPORTS_CHAT_ID= # Опционально: чат для отчетов (по умолчанию ADMIN_NOTIFICATIONS_CHAT_ID)
ADMIN_REPORTS_TOPIC_ID= # ID топика для отчетов
ADMIN_REPORTS_SEND_TIME=10:00 # Время отправки (по МСК) ежедневного отчета
+
+# Мониторинг трафика
+TRAFFIC_MONITORING_ENABLED=false # Включить мониторинг трафика пользователей
+TRAFFIC_THRESHOLD_GB_PER_DAY=10.0 # Порог трафика в ГБ за сутки (превышение вызывает уведомление)
+TRAFFIC_MONITORING_INTERVAL_HOURS=24 # Интервал проверки трафика в часах (например: 1, 6, 12, 24)
+SUSPICIOUS_NOTIFICATIONS_TOPIC_ID=14 # ID топика для уведомлений о подозрительной активности (0 для отправки в основной чат)
+
+# Черный список
+BLACKLIST_CHECK_ENABLED=false # Включить проверку пользователей по черному списку
+BLACKLIST_GITHUB_URL=https://raw.githubusercontent.com/username/repository/main/blacklist.txt # URL к файлу черного списка на GitHub
+BLACKLIST_UPDATE_INTERVAL_HOURS=24 # Интервал обновления черного списка с GitHub (в часах)
+BLACKLIST_IGNORE_ADMINS=true # Игнорировать администраторов (из ADMIN_IDS) при проверке черного списка
+
# Обязательная подписка на канал
CHANNEL_SUB_ID= # Опционально ID твоего канала (-100)
CHANNEL_IS_REQUIRED_SUB=false # Обязательна ли подписка на канал
diff --git a/app/bot.py b/app/bot.py
index 4ecac835..e95cf2ef 100644
--- a/app/bot.py
+++ b/app/bot.py
@@ -31,6 +31,8 @@ from app.handlers import polls as user_polls
from app.handlers import simple_subscription
from app.handlers.admin import (
main as admin_main,
+ blacklist as admin_blacklist,
+ bulk_ban as admin_bulk_ban,
users as admin_users,
subscriptions as admin_subscriptions,
promocodes as admin_promocodes,
@@ -176,6 +178,8 @@ async def setup_bot() -> tuple[Bot, Dispatcher]:
admin_faq.register_handlers(dp)
admin_payments.register_handlers(dp)
admin_trials.register_handlers(dp)
+ admin_bulk_ban.register_bulk_ban_handlers(dp)
+ admin_blacklist.register_blacklist_handlers(dp)
common.register_handlers(dp)
register_stars_handlers(dp)
user_polls.register_handlers(dp)
diff --git a/app/config.py b/app/config.py
index f75e7f49..00321dd6 100644
--- a/app/config.py
+++ b/app/config.py
@@ -155,7 +155,18 @@ class Settings(BaseSettings):
REFERRAL_PROGRAM_ENABLED: bool = True
REFERRAL_NOTIFICATIONS_ENABLED: bool = True
REFERRAL_NOTIFICATION_RETRY_ATTEMPTS: int = 3
-
+
+ BLACKLIST_CHECK_ENABLED: bool = False
+ BLACKLIST_GITHUB_URL: Optional[str] = None
+ BLACKLIST_UPDATE_INTERVAL_HOURS: int = 24
+ BLACKLIST_IGNORE_ADMINS: bool = True
+
+ # Настройки мониторинга трафика
+ TRAFFIC_MONITORING_ENABLED: bool = False
+ TRAFFIC_THRESHOLD_GB_PER_DAY: float = 10.0 # Порог трафика в ГБ за сутки
+ TRAFFIC_MONITORING_INTERVAL_HOURS: int = 24 # Интервал проверки в часах (по умолчанию - раз в сутки)
+ SUSPICIOUS_NOTIFICATIONS_TOPIC_ID: Optional[int] = None
+
AUTOPAY_WARNING_DAYS: str = "3,1"
DEFAULT_AUTOPAY_ENABLED: bool = False
diff --git a/app/database/crud/user.py b/app/database/crud/user.py
index 2f944df2..27fa7c1d 100644
--- a/app/database/crud/user.py
+++ b/app/database/crud/user.py
@@ -119,6 +119,25 @@ async def get_user_by_referral_code(db: AsyncSession, referral_code: str) -> Opt
return user
+async def get_user_by_remnawave_uuid(db: AsyncSession, remnawave_uuid: str) -> Optional[User]:
+ result = await db.execute(
+ select(User)
+ .options(
+ selectinload(User.subscription),
+ selectinload(User.promo_group),
+ selectinload(User.referrer),
+ )
+ .where(User.remnawave_uuid == remnawave_uuid)
+ )
+ user = result.scalar_one_or_none()
+
+ if user and user.subscription:
+ # Загружаем дополнительные зависимости для subscription
+ _ = user.subscription.is_active
+
+ return user
+
+
async def create_unique_referral_code(db: AsyncSession) -> str:
max_attempts = 10
diff --git a/app/external/remnawave_api.py b/app/external/remnawave_api.py
index 2fccbeb4..8875437f 100644
--- a/app/external/remnawave_api.py
+++ b/app/external/remnawave_api.py
@@ -656,6 +656,25 @@ class RemnaWaveAPI:
async def get_nodes_realtime_usage(self) -> List[Dict[str, Any]]:
response = await self._make_request('GET', '/api/nodes/usage/realtime')
return response['response']
+
+ async def get_user_stats_usage(self, user_uuid: str, start_date: str, end_date: str) -> Dict[str, Any]:
+ """
+ Получает статистику использования трафика пользователем за указанный период
+
+ Args:
+ user_uuid: UUID пользователя
+ start_date: Начальная дата в формате ISO (например, "2025-12-09T00:00:00.000Z")
+ end_date: Конечная дата в формате ISO (например, "2025-12-09T23:59:59.999Z")
+
+ Returns:
+ Словарь с информацией о трафике пользователя за указанный период
+ """
+ params = {
+ 'start': start_date,
+ 'end': end_date
+ }
+ response = await self._make_request('GET', f'/api/users/stats/usage/{user_uuid}/range', params=params)
+ return response
async def get_user_devices(self, user_uuid: str) -> Dict[str, Any]:
diff --git a/app/handlers/admin/__init__.py b/app/handlers/admin/__init__.py
index f6894672..2b5aca75 100644
--- a/app/handlers/admin/__init__.py
+++ b/app/handlers/admin/__init__.py
@@ -1 +1,36 @@
-# Инициализация админских обработчиков
\ No newline at end of file
+# Инициализация админских обработчиков
+from . import (
+ backup,
+ blacklist,
+ bot_configuration,
+ bulk_ban,
+ campaigns,
+ faq,
+ main,
+ maintenance,
+ messages,
+ monitoring,
+ payments,
+ polls,
+ pricing,
+ privacy_policy,
+ promo_groups,
+ promo_offers,
+ promocodes,
+ public_offer,
+ referrals,
+ remnawave,
+ reports,
+ rules,
+ servers,
+ statistics,
+ subscriptions,
+ support_settings,
+ system_logs,
+ tickets,
+ trials,
+ updates,
+ user_messages,
+ users,
+ welcome_text,
+)
\ No newline at end of file
diff --git a/app/handlers/admin/blacklist.py b/app/handlers/admin/blacklist.py
new file mode 100644
index 00000000..38201916
--- /dev/null
+++ b/app/handlers/admin/blacklist.py
@@ -0,0 +1,363 @@
+"""
+Обработчики админ-панели для управления черным списком
+"""
+import logging
+from aiogram import types
+from aiogram.fsm.context import FSMContext
+from sqlalchemy.ext.asyncio import AsyncSession
+
+from app.config import settings
+from app.database.models import User
+from app.services.blacklist_service import blacklist_service
+from app.utils.decorators import admin_required, error_handler
+from app.keyboards.admin import get_admin_users_keyboard
+
+logger = logging.getLogger(__name__)
+
+
+@admin_required
+@error_handler
+async def show_blacklist_settings(
+ callback: types.CallbackQuery,
+ db_user: User,
+ state: FSMContext
+):
+ """
+ Показывает настройки черного списка
+ """
+ is_enabled = blacklist_service.is_blacklist_check_enabled()
+ github_url = blacklist_service.get_blacklist_github_url()
+ blacklist_count = len(await blacklist_service.get_all_blacklisted_users())
+
+ status_text = "✅ Включена" if is_enabled else "❌ Отключена"
+ url_text = github_url if github_url else "Не задан"
+
+ text = f"""
+🔐 Настройки черного списка
+
+Статус: {status_text}
+URL к черному списку: {url_text}
+Количество записей: {blacklist_count}
+
+Действия:
+"""
+
+ keyboard = [
+ [
+ types.InlineKeyboardButton(
+ text="🔄 Обновить список" if is_enabled else "🔄 Обновить (откл.)",
+ callback_data="admin_blacklist_update"
+ )
+ ],
+ [
+ types.InlineKeyboardButton(
+ text="📋 Просмотреть список" if is_enabled else "📋 Просмотр (откл.)",
+ callback_data="admin_blacklist_view"
+ )
+ ],
+ [
+ types.InlineKeyboardButton(
+ text="✏️ URL к GitHub" if not github_url else "✏️ Изменить URL",
+ callback_data="admin_blacklist_set_url"
+ )
+ ],
+ [
+ types.InlineKeyboardButton(
+ text="✅ Включить" if not is_enabled else "❌ Отключить",
+ callback_data="admin_blacklist_toggle"
+ )
+ ],
+ [
+ types.InlineKeyboardButton(
+ text="⬅️ Назад к пользователям",
+ callback_data="admin_users"
+ )
+ ]
+ ]
+
+ await callback.message.edit_text(
+ text,
+ reply_markup=types.InlineKeyboardMarkup(inline_keyboard=keyboard)
+ )
+ await callback.answer()
+
+
+@admin_required
+@error_handler
+async def toggle_blacklist(
+ callback: types.CallbackQuery,
+ db_user: User,
+ state: FSMContext
+):
+ """
+ Переключает статус проверки черного списка
+ """
+ # Текущая реализация использует настройки из .env
+ # Для полной реализации нужно будет создать сервис настроек
+ is_enabled = blacklist_service.is_blacklist_check_enabled()
+
+ # В реальной реализации нужно будет изменить настройку в базе данных
+ # или в системе настроек, но сейчас просто покажем статус
+ new_status = not is_enabled
+ status_text = "включена" if new_status else "отключена"
+
+ await callback.message.edit_text(
+ f"Статус проверки черного списка: {status_text}\n\n"
+ f"Для изменения статуса проверки черного списка измените значение\n"
+ f"BLACKLIST_CHECK_ENABLED в файле .env",
+ reply_markup=types.InlineKeyboardMarkup(inline_keyboard=[
+ [
+ types.InlineKeyboardButton(
+ text="🔄 Обновить статус",
+ callback_data="admin_blacklist_settings"
+ )
+ ],
+ [
+ types.InlineKeyboardButton(
+ text="⬅️ Назад",
+ callback_data="admin_blacklist_settings"
+ )
+ ]
+ ])
+ )
+ await callback.answer()
+
+
+@admin_required
+@error_handler
+async def update_blacklist(
+ callback: types.CallbackQuery,
+ db_user: User,
+ state: FSMContext
+):
+ """
+ Обновляет черный список из GitHub
+ """
+ success, message = await blacklist_service.force_update_blacklist()
+
+ if success:
+ await callback.message.edit_text(
+ f"✅ {message}",
+ reply_markup=types.InlineKeyboardMarkup(inline_keyboard=[
+ [
+ types.InlineKeyboardButton(
+ text="📋 Просмотреть список",
+ callback_data="admin_blacklist_view"
+ )
+ ],
+ [
+ types.InlineKeyboardButton(
+ text="🔄 Ручное обновление",
+ callback_data="admin_blacklist_update"
+ )
+ ],
+ [
+ types.InlineKeyboardButton(
+ text="⬅️ Назад",
+ callback_data="admin_blacklist_settings"
+ )
+ ]
+ ])
+ )
+ else:
+ await callback.message.edit_text(
+ f"❌ Ошибка обновления: {message}",
+ reply_markup=types.InlineKeyboardMarkup(inline_keyboard=[
+ [
+ types.InlineKeyboardButton(
+ text="🔄 Повторить",
+ callback_data="admin_blacklist_update"
+ )
+ ],
+ [
+ types.InlineKeyboardButton(
+ text="⬅️ Назад",
+ callback_data="admin_blacklist_settings"
+ )
+ ]
+ ])
+ )
+ await callback.answer()
+
+
+@admin_required
+@error_handler
+async def show_blacklist_users(
+ callback: types.CallbackQuery,
+ db_user: User,
+ state: FSMContext
+):
+ """
+ Показывает список пользователей в черном списке
+ """
+ blacklist_users = await blacklist_service.get_all_blacklisted_users()
+
+ if not blacklist_users:
+ text = "Черный список пуст"
+ else:
+ text = f"🔐 Черный список ({len(blacklist_users)} записей)\n\n"
+
+ # Показываем первые 20 записей
+ for i, (tg_id, username, reason) in enumerate(blacklist_users[:20], 1):
+ text += f"{i}. {tg_id} {username or ''} — {reason}\n"
+
+ if len(blacklist_users) > 20:
+ text += f"\n... и еще {len(blacklist_users) - 20} записей"
+
+ await callback.message.edit_text(
+ text,
+ reply_markup=types.InlineKeyboardMarkup(inline_keyboard=[
+ [
+ types.InlineKeyboardButton(
+ text="🔄 Обновить",
+ callback_data="admin_blacklist_view"
+ )
+ ],
+ [
+ types.InlineKeyboardButton(
+ text="⬅️ Назад",
+ callback_data="admin_blacklist_settings"
+ )
+ ]
+ ])
+ )
+ await callback.answer()
+
+
+@admin_required
+@error_handler
+async def start_set_blacklist_url(
+ callback: types.CallbackQuery,
+ db_user: User,
+ state: FSMContext
+):
+ """
+ Начинает процесс установки URL к черному списку
+ """
+ current_url = blacklist_service.get_blacklist_github_url() or "не задан"
+
+ await callback.message.edit_text(
+ f"Введите новый URL к файлу черного списка на GitHub\n\n"
+ f"Текущий URL: {current_url}\n\n"
+ f"Пример: https://raw.githubusercontent.com/username/repository/main/blacklist.txt\n\n"
+ f"Для отмены используйте команду /cancel",
+ reply_markup=types.InlineKeyboardMarkup(inline_keyboard=[
+ [
+ types.InlineKeyboardButton(
+ text="⬅️ Назад",
+ callback_data="admin_blacklist_settings"
+ )
+ ]
+ ])
+ )
+
+ await state.set_state("waiting_for_blacklist_url")
+ await callback.answer()
+
+
+@admin_required
+@error_handler
+async def process_blacklist_url(
+ message: types.Message,
+ db_user: User,
+ state: FSMContext
+):
+ """
+ Обрабатывает введенный URL к черному списку
+ """
+ url = message.text.strip()
+
+ # В реальной реализации нужно сохранить URL в систему настроек
+ # В текущей реализации просто выводим сообщение
+ if url.lower() in ['/cancel', 'отмена', 'cancel']:
+ await message.answer(
+ "Настройка URL отменена",
+ reply_markup=types.InlineKeyboardMarkup(inline_keyboard=[
+ [
+ types.InlineKeyboardButton(
+ text="🔐 Настройки черного списка",
+ callback_data="admin_blacklist_settings"
+ )
+ ]
+ ])
+ )
+ await state.clear()
+ return
+
+ # Проверяем, что URL выглядит корректно
+ if not url.startswith(('http://', 'https://')):
+ await message.answer(
+ "❌ Некорректный URL. URL должен начинаться с http:// или https://",
+ reply_markup=types.InlineKeyboardMarkup(inline_keyboard=[
+ [
+ types.InlineKeyboardButton(
+ text="🔐 Настройки черного списка",
+ callback_data="admin_blacklist_settings"
+ )
+ ]
+ ])
+ )
+ return
+
+ # В реальной системе здесь нужно сохранить URL в базу данных настроек
+ # или в систему конфигурации
+
+ await message.answer(
+ f"✅ URL к черному списку установлен:\n{url}\n\n"
+ f"Для применения изменений перезапустите бота или измените значение\n"
+ f"BLACKLIST_GITHUB_URL в файле .env",
+ reply_markup=types.InlineKeyboardMarkup(inline_keyboard=[
+ [
+ types.InlineKeyboardButton(
+ text="🔄 Обновить список",
+ callback_data="admin_blacklist_update"
+ )
+ ],
+ [
+ types.InlineKeyboardButton(
+ text="🔐 Настройки черного списка",
+ callback_data="admin_blacklist_settings"
+ )
+ ]
+ ])
+ )
+ await state.clear()
+
+
+async def register_blacklist_handlers(dp):
+ """
+ Регистрация обработчиков черного списка
+ """
+ # Обработчик показа настроек черного списка
+ # Этот обработчик нужно будет вызывать из меню пользователей или отдельно
+ dp.callback_query.register(
+ show_blacklist_settings,
+ lambda c: c.data == "admin_blacklist_settings"
+ )
+
+ # Обработчики для взаимодействия с черным списком
+ dp.callback_query.register(
+ toggle_blacklist,
+ lambda c: c.data == "admin_blacklist_toggle"
+ )
+
+ dp.callback_query.register(
+ update_blacklist,
+ lambda c: c.data == "admin_blacklist_update"
+ )
+
+ dp.callback_query.register(
+ show_blacklist_users,
+ lambda c: c.data == "admin_blacklist_view"
+ )
+
+ dp.callback_query.register(
+ start_set_blacklist_url,
+ lambda c: c.data == "admin_blacklist_set_url"
+ )
+
+ # Обработчик сообщений для установки URL (работает только в нужном состоянии)
+ dp.message.register(
+ process_blacklist_url,
+ lambda m: True # Фильтр будет внутри функции
+ )
\ No newline at end of file
diff --git a/app/handlers/admin/bulk_ban.py b/app/handlers/admin/bulk_ban.py
new file mode 100644
index 00000000..37d82794
--- /dev/null
+++ b/app/handlers/admin/bulk_ban.py
@@ -0,0 +1,169 @@
+"""
+Обработчики команд для массовой блокировки пользователей
+"""
+import logging
+from aiogram import types
+from aiogram.fsm.context import FSMContext
+from sqlalchemy.ext.asyncio import AsyncSession
+from app.config import settings
+from app.database.models import User
+from app.services.bulk_ban_service import bulk_ban_service
+from app.states import AdminStates
+from app.utils.decorators import admin_required, error_handler
+from app.keyboards.admin import get_admin_users_keyboard
+
+logger = logging.getLogger(__name__)
+
+
+@admin_required
+@error_handler
+async def start_bulk_ban_process(
+ callback: types.CallbackQuery,
+ db_user: User,
+ state: FSMContext
+):
+ """
+ Начало процесса массовой блокировки пользователей
+ """
+ await callback.message.edit_text(
+ "🛑 Массовая блокировка пользователей\n\n"
+ "Введите список Telegram ID для блокировки.\n\n"
+ "Форматы ввода:\n"
+ "• По одному ID на строку\n"
+ "• Через запятую\n"
+ "• Через пробел\n\n"
+ "Пример:\n"
+ "123456789\n"
+ "987654321\n"
+ "111222333\n\n"
+ "Или:\n"
+ "123456789, 987654321, 111222333\n\n"
+ "Для отмены используйте команду /cancel",
+ parse_mode="HTML",
+ reply_markup=types.InlineKeyboardMarkup(inline_keyboard=[
+ [types.InlineKeyboardButton(text="❌ Отмена", callback_data="admin_users")]
+ ])
+ )
+
+ await state.set_state(AdminStates.waiting_for_bulk_ban_list)
+ await callback.answer()
+
+
+@admin_required
+@error_handler
+async def process_bulk_ban_list(
+ message: types.Message,
+ db_user: User,
+ state: FSMContext,
+ db: AsyncSession
+):
+ """
+ Обработка списка Telegram ID и выполнение массовой блокировки
+ """
+ input_text = message.text.strip()
+
+ if not input_text:
+ await message.answer(
+ "❌ Введите корректный список Telegram ID",
+ reply_markup=types.InlineKeyboardMarkup(inline_keyboard=[
+ [types.InlineKeyboardButton(text="🔙 Назад", callback_data="admin_users")]
+ ])
+ )
+ return
+
+ # Парсим ID из текста
+ try:
+ telegram_ids = await bulk_ban_service.parse_telegram_ids_from_text(input_text)
+ except Exception as e:
+ logger.error(f"Ошибка парсинга Telegram ID: {e}")
+ await message.answer(
+ "❌ Ошибка при обработке списка ID. Проверьте формат ввода.",
+ reply_markup=types.InlineKeyboardMarkup(inline_keyboard=[
+ [types.InlineKeyboardButton(text="🔙 Назад", callback_data="admin_users")]
+ ])
+ )
+ return
+
+ if not telegram_ids:
+ await message.answer(
+ "❌ Не найдено корректных Telegram ID в списке",
+ reply_markup=types.InlineKeyboardMarkup(inline_keyboard=[
+ [types.InlineKeyboardButton(text="🔙 Назад", callback_data="admin_users")]
+ ])
+ )
+ return
+
+ if len(telegram_ids) > 1000: # Ограничение на количество ID за раз
+ await message.answer(
+ f"❌ Слишком много ID в списке ({len(telegram_ids)}). Максимум: 1000",
+ reply_markup=types.InlineKeyboardMarkup(inline_keyboard=[
+ [types.InlineKeyboardButton(text="🔙 Назад", callback_data="admin_users")]
+ ])
+ )
+ return
+
+ # Выполняем массовую блокировку
+ try:
+ successfully_banned, not_found, error_ids = await bulk_ban_service.ban_users_by_telegram_ids(
+ db=db,
+ admin_user_id=db_user.id,
+ telegram_ids=telegram_ids,
+ reason="Массовая блокировка администратором",
+ bot=message.bot,
+ notify_admin=True,
+ admin_name=db_user.full_name
+ )
+
+ # Подготавливаем сообщение с результатами
+ result_text = f"✅ Массовая блокировка завершена\n\n"
+ result_text += f"📊 Результаты:\n"
+ result_text += f"✅ Успешно заблокировано: {successfully_banned}\n"
+ result_text += f"❌ Не найдено: {not_found}\n"
+ result_text += f"💥 Ошибок: {len(error_ids)}\n\n"
+ result_text += f"📈 Всего обработано: {len(telegram_ids)}"
+
+ if successfully_banned > 0:
+ result_text += f"\n🎯 Процент успеха: {round((successfully_banned/len(telegram_ids))*100, 1)}%"
+
+ # Добавляем информацию об ошибках, если есть
+ if error_ids:
+ result_text += f"\n\n⚠️ Telegram ID с ошибками:\n"
+ result_text += f"{', '.join(map(str, error_ids[:10]))}" # Показываем первые 10
+ if len(error_ids) > 10:
+ result_text += f" и еще {len(error_ids) - 10}..."
+
+ await message.answer(
+ result_text,
+ parse_mode="HTML",
+ reply_markup=types.InlineKeyboardMarkup(inline_keyboard=[
+ [types.InlineKeyboardButton(text="👥 К пользователям", callback_data="admin_users")]
+ ])
+ )
+
+ except Exception as e:
+ logger.error(f"Ошибка при выполнении массовой блокировки: {e}")
+ await message.answer(
+ "❌ Произошла ошибка при выполнении массовой блокировки",
+ reply_markup=types.InlineKeyboardMarkup(inline_keyboard=[
+ [types.InlineKeyboardButton(text="🔙 Назад", callback_data="admin_users")]
+ ])
+ )
+
+ await state.clear()
+
+
+async def register_bulk_ban_handlers(dp):
+ """
+ Регистрация обработчиков команд для массовой блокировки
+ """
+ # Обработчик команды начала массовой блокировки
+ dp.callback_query.register(
+ start_bulk_ban_process,
+ lambda c: c.data == "admin_bulk_ban_start"
+ )
+
+ # Обработчик текстового сообщения с ID для блокировки
+ dp.message.register(
+ process_bulk_ban_list,
+ lambda m: m.text and AdminStates.waiting_for_bulk_ban_list
+ )
\ No newline at end of file
diff --git a/app/handlers/start.py b/app/handlers/start.py
index 7c1c8a57..9fce80cc 100644
--- a/app/handlers/start.py
+++ b/app/handlers/start.py
@@ -42,6 +42,7 @@ from app.utils.promo_offer import (
from app.utils.timezone import format_local_datetime
from app.database.crud.user_message import get_random_active_message
from app.database.crud.subscription import decrement_subscription_server_counts
+from app.services.blacklist_service import blacklist_service
logger = logging.getLogger(__name__)
@@ -1000,13 +1001,33 @@ async def process_referral_code_skip(
async def complete_registration_from_callback(
callback: types.CallbackQuery,
- state: FSMContext,
+ state: FSMContext,
db: AsyncSession
):
logger.info(f"🎯 COMPLETE: Завершение регистрации для пользователя {callback.from_user.id}")
-
+
+ # Проверяем, находится ли пользователь в черном списке
+ is_blacklisted, blacklist_reason = await blacklist_service.is_user_blacklisted(
+ callback.from_user.id,
+ callback.from_user.username
+ )
+
+ if is_blacklisted:
+ logger.warning(f"🚫 Пользователь {callback.from_user.id} находится в черном списке: {blacklist_reason}")
+ try:
+ await callback.message.answer(
+ f"🚫 Регистрация невозможна\n\n"
+ f"Причина: {blacklist_reason}\n\n"
+ f"Если вы считаете, что это ошибка, обратитесь в поддержку."
+ )
+ except Exception as e:
+ logger.error(f"Ошибка при отправке сообщения о блокировке: {e}")
+
+ await state.clear()
+ return
+
from sqlalchemy.orm import selectinload
-
+
existing_user = await get_user_by_telegram_id(db, callback.from_user.id)
if existing_user and existing_user.status == UserStatus.ACTIVE.value:
@@ -1255,12 +1276,32 @@ async def complete_registration_from_callback(
async def complete_registration(
- message: types.Message,
- state: FSMContext,
+ message: types.Message,
+ state: FSMContext,
db: AsyncSession
):
logger.info(f"🎯 COMPLETE: Завершение регистрации для пользователя {message.from_user.id}")
-
+
+ # Проверяем, находится ли пользователь в черном списке
+ is_blacklisted, blacklist_reason = await blacklist_service.is_user_blacklisted(
+ message.from_user.id,
+ message.from_user.username
+ )
+
+ if is_blacklisted:
+ logger.warning(f"🚫 Пользователь {message.from_user.id} находится в черном списке: {blacklist_reason}")
+ try:
+ await message.answer(
+ f"🚫 Регистрация невозможна\n\n"
+ f"Причина: {blacklist_reason}\n\n"
+ f"Если вы считаете, что это ошибка, обратитесь в поддержку."
+ )
+ except Exception as e:
+ logger.error(f"Ошибка при отправке сообщения о блокировке: {e}")
+
+ await state.clear()
+ return
+
existing_user = await get_user_by_telegram_id(db, message.from_user.id)
if existing_user and existing_user.status == UserStatus.ACTIVE.value:
diff --git a/app/keyboards/admin.py b/app/keyboards/admin.py
index 964984fb..9a08d6d3 100644
--- a/app/keyboards/admin.py
+++ b/app/keyboards/admin.py
@@ -320,6 +320,18 @@ def get_admin_users_keyboard(language: str = "ru") -> InlineKeyboardMarkup:
callback_data="admin_users_filters"
)
],
+ [
+ InlineKeyboardButton(
+ text=_t(texts, "ADMIN_USERS_BLACKLIST", "🔐 Черный список"),
+ callback_data="admin_blacklist_settings"
+ )
+ ],
+ [
+ InlineKeyboardButton(
+ text=_t(texts, "ADMIN_USERS_BULK_BAN", "🛑 Массовый бан"),
+ callback_data="admin_bulk_ban_start"
+ )
+ ],
[
InlineKeyboardButton(text=texts.BACK, callback_data="admin_submenu_users")
]
diff --git a/app/localization/locales/en.json b/app/localization/locales/en.json
index ab1b9dcd..6ec6a478 100644
--- a/app/localization/locales/en.json
+++ b/app/localization/locales/en.json
@@ -1528,5 +1528,7 @@
"POLL_EMPTY": "Poll is not available yet.",
"POLL_ERROR": "Unable to process the poll. Please try again later.",
"POLL_COMPLETED": "🙏 Thanks for completing the poll!",
- "POLL_REWARD_GRANTED": "Reward {amount} has been credited to your balance."
+ "POLL_REWARD_GRANTED": "Reward {amount} has been credited to your balance.",
+ "ADMIN_USERS_BULK_BAN": "🛑 Bulk Ban",
+ "ADMIN_USERS_BLACKLIST": "🔐 Blacklist"
}
diff --git a/app/localization/locales/ru.json b/app/localization/locales/ru.json
index 5a24611e..23304180 100644
--- a/app/localization/locales/ru.json
+++ b/app/localization/locales/ru.json
@@ -1540,5 +1540,7 @@
"POLL_EMPTY": "Опрос пока недоступен.",
"POLL_ERROR": "Не удалось обработать опрос. Попробуйте позже.",
"POLL_COMPLETED": "🙏 Спасибо за участие в опросе!",
- "POLL_REWARD_GRANTED": "Награда {amount} зачислена на ваш баланс."
+ "POLL_REWARD_GRANTED": "Награда {amount} зачислена на ваш баланс.",
+ "ADMIN_USERS_BULK_BAN": "🛑 Массовый бан",
+ "ADMIN_USERS_BLACKLIST": "🔐 Черный список"
}
diff --git a/app/services/admin_notification_service.py b/app/services/admin_notification_service.py
index f12e0664..dc9ab2d3 100644
--- a/app/services/admin_notification_service.py
+++ b/app/services/admin_notification_service.py
@@ -1451,6 +1451,50 @@ class AdminNotificationService:
return str(value)
return str(value)
+ async def send_bulk_ban_notification(
+ self,
+ admin_user_id: int,
+ successfully_banned: int,
+ not_found: int,
+ errors: int,
+ admin_name: str = "Администратор"
+ ) -> bool:
+ """Отправляет уведомление о массовой блокировке пользователей"""
+ if not self._is_enabled():
+ return False
+
+ try:
+ message_lines = [
+ "🛑 МАССОВАЯ БЛОКИРОВКА ПОЛЬЗОВАТЕЛЕЙ",
+ "",
+ f"👮 Администратор: {admin_name}",
+ f"🆔 ID администратора: {admin_user_id}",
+ "",
+ "📊 Результаты:",
+ f"✅ Успешно заблокировано: {successfully_banned}",
+ f"❌ Не найдено: {not_found}",
+ f"💥 Ошибок: {errors}"
+ ]
+
+ total_processed = successfully_banned + not_found + errors
+ if total_processed > 0:
+ success_rate = (successfully_banned / total_processed) * 100
+ message_lines.append(f"📈 Успешность: {success_rate:.1f}%")
+
+ message_lines.extend(
+ [
+ "",
+ f"⏰ {format_local_datetime(datetime.utcnow(), '%d.%m.%Y %H:%M:%S')}",
+ ]
+ )
+
+ message = "\n".join(message_lines)
+ return await self._send_message(message)
+
+ except Exception as e:
+ logger.error(f"Ошибка отправки уведомления о массовой блокировке: {e}")
+ return False
+
async def send_ticket_event_notification(
self,
text: str,
@@ -1468,3 +1512,49 @@ class AdminNotificationService:
if not (self._is_enabled() and runtime_enabled):
return False
return await self._send_message(text, reply_markup=keyboard, ticket_event=True)
+
+ async def send_suspicious_traffic_notification(
+ self,
+ message: str,
+ bot: Bot,
+ topic_id: Optional[int] = None
+ ) -> bool:
+ """
+ Отправляет уведомление о подозрительной активности трафика
+
+ Args:
+ message: текст уведомления
+ bot: экземпляр бота для отправки сообщения
+ topic_id: ID топика для отправки уведомления (если не указан, использует стандартный)
+ """
+ if not self.chat_id:
+ logger.warning("ADMIN_NOTIFICATIONS_CHAT_ID не настроен")
+ return False
+
+ # Используем специальный топик для подозрительной активности, если он задан
+ notification_topic_id = topic_id or self.topic_id
+
+ try:
+ message_kwargs = {
+ 'chat_id': self.chat_id,
+ 'text': message,
+ 'parse_mode': 'HTML',
+ 'disable_web_page_preview': True
+ }
+
+ if notification_topic_id:
+ message_kwargs['message_thread_id'] = notification_topic_id
+
+ await bot.send_message(**message_kwargs)
+ logger.info(f"Уведомление о подозрительной активности отправлено в чат {self.chat_id}, топик {notification_topic_id}")
+ return True
+
+ except TelegramForbiddenError:
+ logger.error(f"Бот не имеет прав для отправки в чат {self.chat_id}")
+ return False
+ except TelegramBadRequest as e:
+ logger.error(f"Ошибка отправки уведомления о подозрительной активности: {e}")
+ return False
+ except Exception as e:
+ logger.error(f"Неожиданная ошибка при отправке уведомления о подозрительной активности: {e}")
+ return False
diff --git a/app/services/blacklist_service.py b/app/services/blacklist_service.py
new file mode 100644
index 00000000..f21ec5dd
--- /dev/null
+++ b/app/services/blacklist_service.py
@@ -0,0 +1,230 @@
+"""
+Сервис для работы с черным списком пользователей
+Проверяет пользователей по списку из GitHub репозитория
+"""
+import asyncio
+import logging
+import re
+from typing import List, Dict, Optional, Tuple
+from datetime import datetime, timedelta
+import aiohttp
+from app.config import settings
+
+
+logger = logging.getLogger(__name__)
+
+
+class BlacklistService:
+ """
+ Сервис для проверки пользователей по черному списку
+ """
+
+ def __init__(self):
+ self.blacklist_data = [] # Список в формате [(telegram_id, username, reason), ...]
+ self.last_update = None
+ # Используем интервал из настроек, по умолчанию 24 часа
+ interval_hours = self.get_blacklist_update_interval_hours()
+ self.update_interval = timedelta(hours=interval_hours)
+ self.lock = asyncio.Lock() # Блокировка для предотвращения одновременных обновлений
+
+ def is_blacklist_check_enabled(self) -> bool:
+ """Проверяет, включена ли проверка черного списка"""
+ return getattr(settings, 'BLACKLIST_CHECK_ENABLED', False)
+
+ def get_blacklist_github_url(self) -> Optional[str]:
+ """Получает URL к файлу черного списка на GitHub"""
+ return getattr(settings, 'BLACKLIST_GITHUB_URL', None)
+
+ def get_blacklist_update_interval_hours(self) -> int:
+ """Получает интервал обновления черного списка в часах"""
+ return getattr(settings, 'BLACKLIST_UPDATE_INTERVAL_HOURS', 24)
+
+ def should_ignore_admins(self) -> bool:
+ """Проверяет, нужно ли игнорировать администраторов при проверке черного списка"""
+ return getattr(settings, 'BLACKLIST_IGNORE_ADMINS', True)
+
+ def is_admin(self, telegram_id: int) -> bool:
+ """Проверяет, является ли пользователь администратором"""
+ return settings.is_admin(telegram_id)
+
+ async def update_blacklist(self) -> bool:
+ """
+ Обновляет черный список из GitHub репозитория
+ """
+ async with self.lock:
+ github_url = self.get_blacklist_github_url()
+ if not github_url:
+ logger.warning("URL к черному списку не задан в настройках")
+ return False
+
+ try:
+ # Заменяем github.com на raw.githubusercontent.com для получения raw содержимого
+ if "github.com" in github_url:
+ raw_url = github_url.replace("github.com", "raw.githubusercontent.com").replace("/blob/", "/")
+ else:
+ raw_url = github_url
+
+ # Получаем содержимое файла
+ async with aiohttp.ClientSession() as session:
+ async with session.get(raw_url) as response:
+ if response.status != 200:
+ logger.error(f"Ошибка при получении черного списка: статус {response.status}")
+ return False
+
+ content = await response.text()
+
+ # Разбираем содержимое файла
+ blacklist_data = []
+ lines = content.splitlines()
+
+ for line_num, line in enumerate(lines, 1):
+ line = line.strip()
+ if not line or line.startswith('#'):
+ continue # Пропускаем пустые строки и комментарии
+
+ # В формате '7021477105 #@MAMYT_PAXAL2016, перепродажа подписок'
+ # только первая часть до пробела - это Telegram ID, всё остальное комментарий
+ parts = line.split()
+ if not parts:
+ continue
+
+ try:
+ telegram_id = int(parts[0]) # Первое число - это Telegram ID
+ # Всё остальное - просто комментарий, не используем его для логики
+ # Но можем использовать первую часть после ID как username для отображения
+ username = ""
+ if len(parts) > 1:
+ # Берем вторую часть как username (если начинается с @)
+ if parts[1].startswith('@'):
+ username = parts[1]
+
+ # По умолчанию используем "Занесен в черный список", если нет другой информации
+ reason = "Занесен в черный список"
+
+ # Если есть запятая в строке, можем использовать часть после нее как причину
+ full_line_after_id = line[len(str(telegram_id)):].strip()
+ if ',' in full_line_after_id:
+ # Извлекаем причину после запятой
+ after_comma = full_line_after_id.split(',', 1)[1].strip()
+ reason = after_comma
+
+ blacklist_data.append((telegram_id, username, reason))
+ except ValueError:
+ # Если не удается преобразовать в число, это не ID
+ logger.warning(f"Неверный формат строки {line_num} в черном списке - первое значение не является числом: {line}")
+
+ self.blacklist_data = blacklist_data
+ self.last_update = datetime.utcnow()
+ logger.info(f"Черный список успешно обновлен. Найдено {len(blacklist_data)} записей")
+ return True
+
+ except ValueError as e:
+ logger.error(f"Ошибка при парсинге ID из черного списка: {e}")
+ return False
+ except Exception as e:
+ logger.error(f"Ошибка при обновлении черного списка: {e}")
+ return False
+
+ async def is_user_blacklisted(self, telegram_id: int, username: Optional[str] = None) -> Tuple[bool, Optional[str]]:
+ """
+ Проверяет, находится ли пользователь в черном списке
+
+ Args:
+ telegram_id: Telegram ID пользователя
+ username: Username пользователя (опционально)
+
+ Returns:
+ Кортеж (в черном списке, причина)
+ """
+ if not self.is_blacklist_check_enabled():
+ return False, None
+
+ # Проверяем, является ли пользователь администратором и нужно ли его игнорировать
+ if self.should_ignore_admins() and self.is_admin(telegram_id):
+ logger.info(f"Пользователь {telegram_id} является администратором, игнорируем проверку черного списка")
+ return False, None
+
+ # Если черный список пуст или устарел, обновляем его
+ interval_hours = self.get_blacklist_update_interval_hours()
+ required_interval = timedelta(hours=interval_hours)
+ if not self.blacklist_data or (self.last_update and
+ datetime.utcnow() - self.last_update > required_interval):
+ await self.update_blacklist()
+
+ # Проверяем по Telegram ID
+ for bl_id, bl_username, bl_reason in self.blacklist_data:
+ if bl_id == telegram_id:
+ logger.info(f"Пользователь {telegram_id} найден в черном списке по ID: {bl_reason}")
+ return True, bl_reason
+
+ # Проверяем по username, если он передан
+ if username:
+ for bl_id, bl_username, bl_reason in self.blacklist_data:
+ if bl_username and (bl_username == username or bl_username == f"@{username}"):
+ logger.info(f"Пользователь {username} ({telegram_id}) найден в черном списке по username: {bl_reason}")
+ return True, bl_reason
+
+ return False, None
+
+ async def get_all_blacklisted_users(self) -> List[Tuple[int, str, str]]:
+ """
+ Возвращает весь черный список
+ """
+ interval_hours = self.get_blacklist_update_interval_hours()
+ required_interval = timedelta(hours=interval_hours)
+ if not self.blacklist_data or (self.last_update and
+ datetime.utcnow() - self.last_update > required_interval):
+ await self.update_blacklist()
+
+ return self.blacklist_data.copy()
+
+ async def get_user_by_telegram_id(self, telegram_id: int) -> Optional[Tuple[int, str, str]]:
+ """
+ Возвращает информацию о пользователе из черного списка по Telegram ID
+
+ Args:
+ telegram_id: Telegram ID пользователя
+
+ Returns:
+ Кортеж (telegram_id, username, reason) или None, если не найден
+ """
+ for bl_id, bl_username, bl_reason in self.blacklist_data:
+ if bl_id == telegram_id:
+ return (bl_id, bl_username, bl_reason)
+ return None
+
+ async def get_user_by_username(self, username: str) -> Optional[Tuple[int, str, str]]:
+ """
+ Возвращает информацию о пользователе из черного списка по username
+
+ Args:
+ username: Username пользователя
+
+ Returns:
+ Кортеж (telegram_id, username, reason) или None, если не найден
+ """
+ # Проверяем как с @, так и без
+ username_with_at = f"@{username}" if not username.startswith('@') else username
+ username_without_at = username.lstrip('@')
+
+ for bl_id, bl_username, bl_reason in self.blacklist_data:
+ if bl_username == username_with_at or bl_username.lstrip('@') == username_without_at:
+ return (bl_id, bl_username, bl_reason)
+ return None
+
+ async def force_update_blacklist(self) -> Tuple[bool, str]:
+ """
+ Принудительно обновляет черный список
+
+ Returns:
+ Кортеж (успешно, сообщение)
+ """
+ success = await self.update_blacklist()
+ if success:
+ return True, f"Черный список обновлен успешно. Записей: {len(self.blacklist_data)}"
+ else:
+ return False, "Ошибка обновления черного списка"
+
+
+# Глобальный экземпляр сервиса
+blacklist_service = BlacklistService()
\ No newline at end of file
diff --git a/app/services/bulk_ban_service.py b/app/services/bulk_ban_service.py
new file mode 100644
index 00000000..e07f0386
--- /dev/null
+++ b/app/services/bulk_ban_service.py
@@ -0,0 +1,177 @@
+"""
+Модуль для массовой блокировки пользователей по списку Telegram ID
+"""
+
+import logging
+from typing import List, Tuple
+from sqlalchemy.ext.asyncio import AsyncSession
+from aiogram import Bot
+
+from app.database.crud.user import get_user_by_telegram_id
+from app.services.user_service import UserService
+from app.services.admin_notification_service import AdminNotificationService
+from app.config import settings
+from app.database.models import UserStatus
+
+
+logger = logging.getLogger(__name__)
+
+
+class BulkBanService:
+ """
+ Сервис для массовой блокировки пользователей по списку Telegram ID
+ """
+
+ def __init__(self):
+ self.user_service = UserService()
+
+ async def ban_users_by_telegram_ids(
+ self,
+ db: AsyncSession,
+ admin_user_id: int,
+ telegram_ids: List[int],
+ reason: str = "Заблокирован администратором по списку",
+ bot: Bot = None,
+ notify_admin: bool = True,
+ admin_name: str = "Администратор"
+ ) -> Tuple[int, int, List[int]]:
+ """
+ Массовая блокировка пользователей по Telegram ID
+
+ Args:
+ db: Асинхронная сессия базы данных
+ admin_user_id: ID администратора, который осуществляет блокировку
+ telegram_ids: Список Telegram ID для блокировки
+ reason: Причина блокировки
+ bot: Бот для отправки уведомлений
+ notify_admin: Отправлять ли уведомления администратору
+ admin_name: Имя администратора для логирования
+
+ Returns:
+ Кортеж из (успешно заблокированных, не найденных, список ID с ошибками)
+ """
+ successfully_banned = 0
+ not_found_users = []
+ error_ids = []
+
+ for telegram_id in telegram_ids:
+ try:
+ # Получаем пользователя по Telegram ID
+ user = await get_user_by_telegram_id(db, telegram_id)
+
+ if not user:
+ logger.warning(f"Пользователь с Telegram ID {telegram_id} не найден")
+ not_found_users.append(telegram_id)
+ continue
+
+ # Проверяем, что пользователь не заблокирован уже
+ if user.status == UserStatus.BLOCKED.value:
+ logger.info(f"Пользователь {telegram_id} уже заблокирован")
+ continue
+
+ # Блокируем пользователя
+ ban_success = await self.user_service.block_user(
+ db, user.id, admin_user_id, reason
+ )
+
+ if ban_success:
+ successfully_banned += 1
+ logger.info(f"Пользователь {telegram_id} успешно заблокирован")
+
+ # Отправляем уведомление пользователю, если возможно
+ if bot:
+ try:
+ await bot.send_message(
+ chat_id=telegram_id,
+ text=(
+ f"🚫 Ваш аккаунт заблокирован\n\n"
+ f"Причина: {reason}\n\n"
+ f"Если вы считаете, что блокировка произошла ошибочно, "
+ f"обратитесь в поддержку."
+ ),
+ parse_mode="HTML"
+ )
+ except Exception as e:
+ logger.warning(f"Не удалось отправить уведомление пользователю {telegram_id}: {e}")
+ else:
+ logger.error(f"Не удалось заблокировать пользователя {telegram_id}")
+ error_ids.append(telegram_id)
+
+ except Exception as e:
+ logger.error(f"Ошибка при блокировке пользователя {telegram_id}: {e}")
+ error_ids.append(telegram_id)
+
+ # Отправляем уведомление администратору
+ if notify_admin and bot:
+ try:
+ admin_notification_service = AdminNotificationService(bot)
+ await admin_notification_service.send_bulk_ban_notification(
+ admin_user_id,
+ successfully_banned,
+ len(not_found_users),
+ len(error_ids),
+ admin_name
+ )
+ except Exception as e:
+ logger.error(f"Ошибка при отправке уведомления администратору: {e}")
+
+ logger.info(
+ f"Массовая блокировка завершена: успешно={successfully_banned}, "
+ f"не найдено={len(not_found_users)}, ошибки={len(error_ids)}"
+ )
+
+ return successfully_banned, len(not_found_users), error_ids
+
+ async def parse_telegram_ids_from_text(self, text: str) -> List[int]:
+ """
+ Парсит Telegram ID из текста. Поддерживает различные форматы:
+ - по одному ID на строку
+ - через запятую
+ - через пробелы
+ - с @username (если username соответствует формату ID)
+ """
+ if not text:
+ return []
+
+ # Удаляем лишние пробелы и разбиваем по переносам строк
+ lines = text.strip().split('\n')
+ ids = []
+
+ for line in lines:
+ # Убираем комментарии и лишние пробелы
+ line = line.strip()
+ if not line or line.startswith('#'):
+ continue
+
+ # Разбиваем строку по запятым или пробелам
+ tokens = line.replace(',', ' ').split()
+
+ for token in tokens:
+ token = token.strip()
+
+ # Убираем символ @ если присутствует
+ if token.startswith('@'):
+ token = token[1:]
+
+ # Проверяем, является ли токен числом (Telegram ID)
+ try:
+ telegram_id = int(token)
+ if telegram_id > 0: # Telegram ID должны быть положительными
+ ids.append(telegram_id)
+ except ValueError:
+ # Пропускаем, если не является числом
+ continue
+
+ # Убираем дубликаты, сохранив порядок
+ unique_ids = []
+ seen = set()
+ for tid in ids:
+ if tid not in seen:
+ unique_ids.append(tid)
+ seen.add(tid)
+
+ return unique_ids
+
+
+# Создаем глобальный экземпляр сервиса
+bulk_ban_service = BulkBanService()
\ No newline at end of file
diff --git a/app/services/traffic_monitoring_service.py b/app/services/traffic_monitoring_service.py
new file mode 100644
index 00000000..09e27262
--- /dev/null
+++ b/app/services/traffic_monitoring_service.py
@@ -0,0 +1,332 @@
+"""
+Сервис для мониторинга трафика пользователей
+Проверяет, не превышает ли пользователь заданный порог трафика за сутки
+"""
+import logging
+import asyncio
+from datetime import datetime, timedelta
+from typing import Dict, List, Optional, Tuple
+from decimal import Decimal
+
+import aiohttp
+
+from app.config import settings
+from app.services.admin_notification_service import AdminNotificationService
+from app.services.remnawave_service import RemnaWaveService
+from app.database.crud.user import get_user_by_remnawave_uuid
+from app.database.models import User
+from sqlalchemy.ext.asyncio import AsyncSession
+
+
+logger = logging.getLogger(__name__)
+
+
+class TrafficMonitoringService:
+ """
+ Сервис для мониторинга трафика пользователей
+ """
+
+ def __init__(self):
+ self.remnawave_service = RemnaWaveService()
+ self.lock = asyncio.Lock() # Блокировка для предотвращения одновременных проверок
+
+ def is_traffic_monitoring_enabled(self) -> bool:
+ """Проверяет, включен ли мониторинг трафика"""
+ return getattr(settings, 'TRAFFIC_MONITORING_ENABLED', False)
+
+ def get_traffic_threshold_gb(self) -> float:
+ """Получает порог трафика в ГБ за сутки"""
+ return getattr(settings, 'TRAFFIC_THRESHOLD_GB_PER_DAY', 10.0)
+
+ def get_monitoring_interval_hours(self) -> int:
+ """Получает интервал мониторинга в часах"""
+ return getattr(settings, 'TRAFFIC_MONITORING_INTERVAL_HOURS', 24)
+
+ def get_suspicious_notifications_topic_id(self) -> Optional[int]:
+ """Получает ID топика для уведомлений о подозрительной активности"""
+ return getattr(settings, 'SUSPICIOUS_NOTIFICATIONS_TOPIC_ID', None)
+
+ async def get_user_daily_traffic(self, user_uuid: str) -> Dict:
+ """
+ Получает статистику трафика пользователя за последние 24 часа
+
+ Args:
+ user_uuid: UUID пользователя в Remnawave
+
+ Returns:
+ Словарь с информацией о трафике
+ """
+ try:
+ # Получаем время начала и конца суток (сегодня)
+ now = datetime.utcnow()
+ start_of_day = now.replace(hour=0, minute=0, second=0, microsecond=0)
+ end_of_day = now.replace(hour=23, minute=59, second=59, microsecond=999999)
+
+ # Форматируем даты в ISO формат
+ start_date = start_of_day.strftime("%Y-%m-%dT%H:%M:%S.000Z")
+ end_date = end_of_day.strftime("%Y-%m-%dT%H:%M:%S.999Z")
+
+ # Получаем API клиент и вызываем метод получения статистики
+ async with self.remnawave_service.get_api_client() as api:
+ traffic_data = await api.get_user_stats_usage(user_uuid, start_date, end_date)
+
+ # Обрабатываем ответ API
+ if traffic_data and 'response' in traffic_data:
+ response = traffic_data['response']
+
+ # Вычисляем общий трафик
+ total_gb = 0
+ nodes_info = []
+
+ if isinstance(response, list):
+ for item in response:
+ node_name = item.get('nodeName', 'Unknown')
+ total_bytes = item.get('total', 0)
+ total_gb_item = round(total_bytes / (1024**3), 2) # Конвертируем в ГБ
+ total_gb += total_gb_item
+
+ nodes_info.append({
+ 'node': node_name,
+ 'gb': total_gb_item
+ })
+ else:
+ # Если response - это уже результат обработки (как в примере)
+ total_gb = response.get('total_gb', 0)
+ nodes_info = response.get('nodes', [])
+
+ return {
+ 'total_gb': total_gb,
+ 'nodes': nodes_info,
+ 'date_range': {
+ 'start': start_date,
+ 'end': end_date
+ }
+ }
+ else:
+ logger.warning(f"Нет данных о трафике для пользователя {user_uuid}")
+ return {
+ 'total_gb': 0,
+ 'nodes': [],
+ 'date_range': {
+ 'start': start_date,
+ 'end': end_date
+ }
+ }
+
+ except Exception as e:
+ logger.error(f"Ошибка при получении статистики трафика для {user_uuid}: {e}")
+ return {
+ 'total_gb': 0,
+ 'nodes': [],
+ 'date_range': {
+ 'start': None,
+ 'end': None
+ }
+ }
+
+ async def check_user_traffic_threshold(
+ self,
+ db: AsyncSession,
+ user_uuid: str,
+ user_telegram_id: int = None
+ ) -> Tuple[bool, Dict]:
+ """
+ Проверяет, превышает ли трафик пользователя заданный порог
+
+ Args:
+ db: Сессия базы данных
+ user_uuid: UUID пользователя в Remnawave
+ user_telegram_id: Telegram ID пользователя (для логирования)
+
+ Returns:
+ Кортеж (превышен ли порог, информация о трафике)
+ """
+ if not self.is_traffic_monitoring_enabled():
+ return False, {}
+
+ # Получаем статистику трафика
+ traffic_info = await self.get_user_daily_traffic(user_uuid)
+ total_gb = traffic_info.get('total_gb', 0)
+
+ # Получаем порог для сравнения
+ threshold_gb = self.get_traffic_threshold_gb()
+
+ # Проверяем, превышает ли трафик порог
+ is_exceeded = total_gb > threshold_gb
+
+ # Логируем проверку
+ user_id_info = f"telegram_id={user_telegram_id}" if user_telegram_id else f"uuid={user_uuid}"
+ status = "ПРЕВЫШЕНИЕ" if is_exceeded else "норма"
+ logger.info(
+ f"📊 Проверка трафика для {user_id_info}: {total_gb} ГБ, "
+ f"порог: {threshold_gb} ГБ, статус: {status}"
+ )
+
+ return is_exceeded, traffic_info
+
+ async def process_suspicious_traffic(
+ self,
+ db: AsyncSession,
+ user_uuid: str,
+ traffic_info: Dict,
+ bot
+ ):
+ """
+ Обрабатывает подозрительный трафик - отправляет уведомление администраторам
+ """
+ try:
+ # Получаем информацию о пользователе из базы данных
+ user = await get_user_by_remnawave_uuid(db, user_uuid)
+ if not user:
+ logger.warning(f"Пользователь с UUID {user_uuid} не найден в базе данных")
+ return
+
+ # Формируем сообщение для администраторов
+ total_gb = traffic_info.get('total_gb', 0)
+ threshold_gb = self.get_traffic_threshold_gb()
+
+ message = (
+ f"⚠️ Подозрительная активность трафика\n\n"
+ f"👤 Пользователь: {user.full_name} (ID: {user.telegram_id})\n"
+ f"🔑 UUID: {user_uuid}\n"
+ f"📊 Трафик за сутки: {total_gb} ГБ\n"
+ f"📈 Порог: {threshold_gb} ГБ\n"
+ f"🚨 Превышение: {total_gb - threshold_gb:.2f} ГБ\n\n"
+ )
+
+ # Добавляем информацию по нодам, если есть
+ nodes = traffic_info.get('nodes', [])
+ if nodes:
+ message += "Разбивка по нодам:\n"
+ for node_info in nodes[:5]: # Показываем первые 5 нод
+ message += f" • {node_info.get('node', 'Unknown')}: {node_info.get('gb', 0)} ГБ\n"
+ if len(nodes) > 5:
+ message += f" • и ещё {len(nodes) - 5} нод(ы)\n"
+
+ message += f"\n⏰ Время проверки: {datetime.utcnow().strftime('%d.%m.%Y %H:%M:%S UTC')}"
+
+ # Создаем AdminNotificationService с ботом
+ admin_notification_service = AdminNotificationService(bot)
+
+ # Отправляем уведомление администраторам
+ topic_id = self.get_suspicious_notifications_topic_id()
+
+ await admin_notification_service.send_suspicious_traffic_notification(
+ message,
+ bot,
+ topic_id
+ )
+
+ logger.info(
+ f"✅ Уведомление о подозрительном трафике отправлено для пользователя {user.telegram_id}"
+ )
+
+ except Exception as e:
+ logger.error(f"❌ Ошибка при обработке подозрительного трафика для {user_uuid}: {e}")
+
+ async def check_all_users_traffic(self, db: AsyncSession, bot):
+ """
+ Проверяет трафик всех пользователей с активной подпиской
+ """
+ if not self.is_traffic_monitoring_enabled():
+ logger.info("Мониторинг трафика отключен, пропускаем проверку всех пользователей")
+ return
+
+ try:
+ from app.database.crud.user import get_users_with_active_subscriptions
+
+ # Получаем всех пользователей с активной подпиской
+ users = await get_users_with_active_subscriptions(db)
+
+ logger.info(f"Начинаем проверку трафика для {len(users)} пользователей")
+
+ # Проверяем трафик для каждого пользователя
+ for user in users:
+ if user.remnawave_uuid: # Проверяем только пользователей с UUID
+ is_exceeded, traffic_info = await self.check_user_traffic_threshold(
+ db,
+ user.remnawave_uuid,
+ user.telegram_id
+ )
+
+ if is_exceeded:
+ await self.process_suspicious_traffic(
+ db,
+ user.remnawave_uuid,
+ traffic_info,
+ bot
+ )
+
+ logger.info("Завершена проверка трафика всех пользователей")
+
+ except Exception as e:
+ logger.error(f"❌ Ошибка при проверке трафика всех пользователей: {e}")
+
+
+class TrafficMonitoringScheduler:
+ """
+ Класс для планирования периодических проверок трафика
+ """
+ def __init__(self, traffic_service: TrafficMonitoringService):
+ self.traffic_service = traffic_service
+ self.check_task = None
+ self.is_running = False
+
+ async def start_monitoring(self, db: AsyncSession, bot):
+ """
+ Запускает периодическую проверку трафика
+ """
+ if self.is_running:
+ logger.warning("Мониторинг трафика уже запущен")
+ return
+
+ if not self.traffic_service.is_traffic_monitoring_enabled():
+ logger.info("Мониторинг трафика отключен в настройках")
+ return
+
+ self.is_running = True
+ interval_hours = self.traffic_service.get_monitoring_interval_hours()
+ interval_seconds = interval_hours * 3600
+
+ logger.info(f"Запуск мониторинга трафика с интервалом {interval_hours} часов")
+
+ # Запускаем задачу с интервалом
+ self.check_task = asyncio.create_task(self._periodic_check(db, bot, interval_seconds))
+
+ async def stop_monitoring(self):
+ """
+ Останавливает периодическую проверку трафика
+ """
+ if self.check_task:
+ self.check_task.cancel()
+ try:
+ await self.check_task
+ except asyncio.CancelledError:
+ pass
+ self.is_running = False
+ logger.info("Мониторинг трафика остановлен")
+
+ async def _periodic_check(self, db: AsyncSession, bot, interval_seconds: int):
+ """
+ Выполняет периодическую проверку трафика
+ """
+ while self.is_running:
+ try:
+ logger.info("Запуск периодической проверки трафика")
+ await self.traffic_service.check_all_users_traffic(db, bot)
+
+ # Ждем указанный интервал перед следующей проверкой
+ await asyncio.sleep(interval_seconds)
+
+ except asyncio.CancelledError:
+ logger.info("Задача периодической проверки трафика отменена")
+ break
+ except Exception as e:
+ logger.error(f"Ошибка в периодической проверке трафика: {e}")
+ # Даже при ошибке продолжаем цикл, ждем интервал и пробуем снова
+ await asyncio.sleep(interval_seconds)
+
+
+# Глобальные экземпляры сервисов
+traffic_monitoring_service = TrafficMonitoringService()
+traffic_monitoring_scheduler = TrafficMonitoringScheduler(traffic_monitoring_service)
\ No newline at end of file