From 80785f22b08dd0b264eb9b887b5a13ce08272a17 Mon Sep 17 00:00:00 2001 From: gy9vin Date: Wed, 10 Dec 2025 19:13:52 +0300 Subject: [PATCH] =?UTF-8?q?=D0=A7=D0=B5=D1=80=D0=BD=D1=8B=D0=B9=20=D1=81?= =?UTF-8?q?=D0=BF=D0=B8=D1=81=D0=BE=D0=BA,=20=D0=BC=D0=BE=D0=BD=D0=B8?= =?UTF-8?q?=D1=82=D0=BE=D1=80=D0=B8=D0=BD=D0=B3=20=D1=81=D1=83=D1=82=D0=BE?= =?UTF-8?q?=D1=87=D0=BD=D0=BE=20=D0=B3=D1=80=D0=B0=D1=84=D0=B8=D0=BA=D0=B0?= =?UTF-8?q?=20=D0=BF=D0=BE=20=D1=80=D0=B5=D0=B3=D0=BB=D0=B0=D0=BC=D0=B5?= =?UTF-8?q?=D0=BD=D1=82=D1=83?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .env.example | 13 + app/bot.py | 4 + app/config.py | 13 +- app/database/crud/user.py | 19 ++ app/external/remnawave_api.py | 19 ++ app/handlers/admin/__init__.py | 37 ++- app/handlers/admin/blacklist.py | 363 +++++++++++++++++++++ app/handlers/admin/bulk_ban.py | 169 ++++++++++ app/handlers/start.py | 53 ++- app/keyboards/admin.py | 12 + app/localization/locales/en.json | 4 +- app/localization/locales/ru.json | 4 +- app/services/admin_notification_service.py | 90 +++++ app/services/blacklist_service.py | 230 +++++++++++++ app/services/bulk_ban_service.py | 177 ++++++++++ app/services/traffic_monitoring_service.py | 332 +++++++++++++++++++ 16 files changed, 1529 insertions(+), 10 deletions(-) create mode 100644 app/handlers/admin/blacklist.py create mode 100644 app/handlers/admin/bulk_ban.py create mode 100644 app/services/blacklist_service.py create mode 100644 app/services/bulk_ban_service.py create mode 100644 app/services/traffic_monitoring_service.py diff --git a/.env.example b/.env.example index 73b5a2b0..9969cf43 100644 --- a/.env.example +++ b/.env.example @@ -19,6 +19,19 @@ ADMIN_REPORTS_ENABLED=false ADMIN_REPORTS_CHAT_ID= # Опционально: чат для отчетов (по умолчанию ADMIN_NOTIFICATIONS_CHAT_ID) ADMIN_REPORTS_TOPIC_ID= # ID топика для отчетов ADMIN_REPORTS_SEND_TIME=10:00 # Время отправки (по МСК) ежедневного отчета + +# Мониторинг трафика +TRAFFIC_MONITORING_ENABLED=false # Включить мониторинг трафика пользователей +TRAFFIC_THRESHOLD_GB_PER_DAY=10.0 # Порог трафика в ГБ за сутки (превышение вызывает уведомление) +TRAFFIC_MONITORING_INTERVAL_HOURS=24 # Интервал проверки трафика в часах (например: 1, 6, 12, 24) +SUSPICIOUS_NOTIFICATIONS_TOPIC_ID=14 # ID топика для уведомлений о подозрительной активности (0 для отправки в основной чат) + +# Черный список +BLACKLIST_CHECK_ENABLED=false # Включить проверку пользователей по черному списку +BLACKLIST_GITHUB_URL=https://raw.githubusercontent.com/username/repository/main/blacklist.txt # URL к файлу черного списка на GitHub +BLACKLIST_UPDATE_INTERVAL_HOURS=24 # Интервал обновления черного списка с GitHub (в часах) +BLACKLIST_IGNORE_ADMINS=true # Игнорировать администраторов (из ADMIN_IDS) при проверке черного списка + # Обязательная подписка на канал CHANNEL_SUB_ID= # Опционально ID твоего канала (-100) CHANNEL_IS_REQUIRED_SUB=false # Обязательна ли подписка на канал diff --git a/app/bot.py b/app/bot.py index 4ecac835..e95cf2ef 100644 --- a/app/bot.py +++ b/app/bot.py @@ -31,6 +31,8 @@ from app.handlers import polls as user_polls from app.handlers import simple_subscription from app.handlers.admin import ( main as admin_main, + blacklist as admin_blacklist, + bulk_ban as admin_bulk_ban, users as admin_users, subscriptions as admin_subscriptions, promocodes as admin_promocodes, @@ -176,6 +178,8 @@ async def setup_bot() -> tuple[Bot, Dispatcher]: admin_faq.register_handlers(dp) admin_payments.register_handlers(dp) admin_trials.register_handlers(dp) + admin_bulk_ban.register_bulk_ban_handlers(dp) + admin_blacklist.register_blacklist_handlers(dp) common.register_handlers(dp) register_stars_handlers(dp) user_polls.register_handlers(dp) diff --git a/app/config.py b/app/config.py index f75e7f49..00321dd6 100644 --- a/app/config.py +++ b/app/config.py @@ -155,7 +155,18 @@ class Settings(BaseSettings): REFERRAL_PROGRAM_ENABLED: bool = True REFERRAL_NOTIFICATIONS_ENABLED: bool = True REFERRAL_NOTIFICATION_RETRY_ATTEMPTS: int = 3 - + + BLACKLIST_CHECK_ENABLED: bool = False + BLACKLIST_GITHUB_URL: Optional[str] = None + BLACKLIST_UPDATE_INTERVAL_HOURS: int = 24 + BLACKLIST_IGNORE_ADMINS: bool = True + + # Настройки мониторинга трафика + TRAFFIC_MONITORING_ENABLED: bool = False + TRAFFIC_THRESHOLD_GB_PER_DAY: float = 10.0 # Порог трафика в ГБ за сутки + TRAFFIC_MONITORING_INTERVAL_HOURS: int = 24 # Интервал проверки в часах (по умолчанию - раз в сутки) + SUSPICIOUS_NOTIFICATIONS_TOPIC_ID: Optional[int] = None + AUTOPAY_WARNING_DAYS: str = "3,1" DEFAULT_AUTOPAY_ENABLED: bool = False diff --git a/app/database/crud/user.py b/app/database/crud/user.py index 2f944df2..27fa7c1d 100644 --- a/app/database/crud/user.py +++ b/app/database/crud/user.py @@ -119,6 +119,25 @@ async def get_user_by_referral_code(db: AsyncSession, referral_code: str) -> Opt return user +async def get_user_by_remnawave_uuid(db: AsyncSession, remnawave_uuid: str) -> Optional[User]: + result = await db.execute( + select(User) + .options( + selectinload(User.subscription), + selectinload(User.promo_group), + selectinload(User.referrer), + ) + .where(User.remnawave_uuid == remnawave_uuid) + ) + user = result.scalar_one_or_none() + + if user and user.subscription: + # Загружаем дополнительные зависимости для subscription + _ = user.subscription.is_active + + return user + + async def create_unique_referral_code(db: AsyncSession) -> str: max_attempts = 10 diff --git a/app/external/remnawave_api.py b/app/external/remnawave_api.py index 2fccbeb4..8875437f 100644 --- a/app/external/remnawave_api.py +++ b/app/external/remnawave_api.py @@ -656,6 +656,25 @@ class RemnaWaveAPI: async def get_nodes_realtime_usage(self) -> List[Dict[str, Any]]: response = await self._make_request('GET', '/api/nodes/usage/realtime') return response['response'] + + async def get_user_stats_usage(self, user_uuid: str, start_date: str, end_date: str) -> Dict[str, Any]: + """ + Получает статистику использования трафика пользователем за указанный период + + Args: + user_uuid: UUID пользователя + start_date: Начальная дата в формате ISO (например, "2025-12-09T00:00:00.000Z") + end_date: Конечная дата в формате ISO (например, "2025-12-09T23:59:59.999Z") + + Returns: + Словарь с информацией о трафике пользователя за указанный период + """ + params = { + 'start': start_date, + 'end': end_date + } + response = await self._make_request('GET', f'/api/users/stats/usage/{user_uuid}/range', params=params) + return response async def get_user_devices(self, user_uuid: str) -> Dict[str, Any]: diff --git a/app/handlers/admin/__init__.py b/app/handlers/admin/__init__.py index f6894672..2b5aca75 100644 --- a/app/handlers/admin/__init__.py +++ b/app/handlers/admin/__init__.py @@ -1 +1,36 @@ -# Инициализация админских обработчиков \ No newline at end of file +# Инициализация админских обработчиков +from . import ( + backup, + blacklist, + bot_configuration, + bulk_ban, + campaigns, + faq, + main, + maintenance, + messages, + monitoring, + payments, + polls, + pricing, + privacy_policy, + promo_groups, + promo_offers, + promocodes, + public_offer, + referrals, + remnawave, + reports, + rules, + servers, + statistics, + subscriptions, + support_settings, + system_logs, + tickets, + trials, + updates, + user_messages, + users, + welcome_text, +) \ No newline at end of file diff --git a/app/handlers/admin/blacklist.py b/app/handlers/admin/blacklist.py new file mode 100644 index 00000000..38201916 --- /dev/null +++ b/app/handlers/admin/blacklist.py @@ -0,0 +1,363 @@ +""" +Обработчики админ-панели для управления черным списком +""" +import logging +from aiogram import types +from aiogram.fsm.context import FSMContext +from sqlalchemy.ext.asyncio import AsyncSession + +from app.config import settings +from app.database.models import User +from app.services.blacklist_service import blacklist_service +from app.utils.decorators import admin_required, error_handler +from app.keyboards.admin import get_admin_users_keyboard + +logger = logging.getLogger(__name__) + + +@admin_required +@error_handler +async def show_blacklist_settings( + callback: types.CallbackQuery, + db_user: User, + state: FSMContext +): + """ + Показывает настройки черного списка + """ + is_enabled = blacklist_service.is_blacklist_check_enabled() + github_url = blacklist_service.get_blacklist_github_url() + blacklist_count = len(await blacklist_service.get_all_blacklisted_users()) + + status_text = "✅ Включена" if is_enabled else "❌ Отключена" + url_text = github_url if github_url else "Не задан" + + text = f""" +🔐 Настройки черного списка + +Статус: {status_text} +URL к черному списку: {url_text} +Количество записей: {blacklist_count} + +Действия: +""" + + keyboard = [ + [ + types.InlineKeyboardButton( + text="🔄 Обновить список" if is_enabled else "🔄 Обновить (откл.)", + callback_data="admin_blacklist_update" + ) + ], + [ + types.InlineKeyboardButton( + text="📋 Просмотреть список" if is_enabled else "📋 Просмотр (откл.)", + callback_data="admin_blacklist_view" + ) + ], + [ + types.InlineKeyboardButton( + text="✏️ URL к GitHub" if not github_url else "✏️ Изменить URL", + callback_data="admin_blacklist_set_url" + ) + ], + [ + types.InlineKeyboardButton( + text="✅ Включить" if not is_enabled else "❌ Отключить", + callback_data="admin_blacklist_toggle" + ) + ], + [ + types.InlineKeyboardButton( + text="⬅️ Назад к пользователям", + callback_data="admin_users" + ) + ] + ] + + await callback.message.edit_text( + text, + reply_markup=types.InlineKeyboardMarkup(inline_keyboard=keyboard) + ) + await callback.answer() + + +@admin_required +@error_handler +async def toggle_blacklist( + callback: types.CallbackQuery, + db_user: User, + state: FSMContext +): + """ + Переключает статус проверки черного списка + """ + # Текущая реализация использует настройки из .env + # Для полной реализации нужно будет создать сервис настроек + is_enabled = blacklist_service.is_blacklist_check_enabled() + + # В реальной реализации нужно будет изменить настройку в базе данных + # или в системе настроек, но сейчас просто покажем статус + new_status = not is_enabled + status_text = "включена" if new_status else "отключена" + + await callback.message.edit_text( + f"Статус проверки черного списка: {status_text}\n\n" + f"Для изменения статуса проверки черного списка измените значение\n" + f"BLACKLIST_CHECK_ENABLED в файле .env", + reply_markup=types.InlineKeyboardMarkup(inline_keyboard=[ + [ + types.InlineKeyboardButton( + text="🔄 Обновить статус", + callback_data="admin_blacklist_settings" + ) + ], + [ + types.InlineKeyboardButton( + text="⬅️ Назад", + callback_data="admin_blacklist_settings" + ) + ] + ]) + ) + await callback.answer() + + +@admin_required +@error_handler +async def update_blacklist( + callback: types.CallbackQuery, + db_user: User, + state: FSMContext +): + """ + Обновляет черный список из GitHub + """ + success, message = await blacklist_service.force_update_blacklist() + + if success: + await callback.message.edit_text( + f"✅ {message}", + reply_markup=types.InlineKeyboardMarkup(inline_keyboard=[ + [ + types.InlineKeyboardButton( + text="📋 Просмотреть список", + callback_data="admin_blacklist_view" + ) + ], + [ + types.InlineKeyboardButton( + text="🔄 Ручное обновление", + callback_data="admin_blacklist_update" + ) + ], + [ + types.InlineKeyboardButton( + text="⬅️ Назад", + callback_data="admin_blacklist_settings" + ) + ] + ]) + ) + else: + await callback.message.edit_text( + f"❌ Ошибка обновления: {message}", + reply_markup=types.InlineKeyboardMarkup(inline_keyboard=[ + [ + types.InlineKeyboardButton( + text="🔄 Повторить", + callback_data="admin_blacklist_update" + ) + ], + [ + types.InlineKeyboardButton( + text="⬅️ Назад", + callback_data="admin_blacklist_settings" + ) + ] + ]) + ) + await callback.answer() + + +@admin_required +@error_handler +async def show_blacklist_users( + callback: types.CallbackQuery, + db_user: User, + state: FSMContext +): + """ + Показывает список пользователей в черном списке + """ + blacklist_users = await blacklist_service.get_all_blacklisted_users() + + if not blacklist_users: + text = "Черный список пуст" + else: + text = f"🔐 Черный список ({len(blacklist_users)} записей)\n\n" + + # Показываем первые 20 записей + for i, (tg_id, username, reason) in enumerate(blacklist_users[:20], 1): + text += f"{i}. {tg_id} {username or ''} — {reason}\n" + + if len(blacklist_users) > 20: + text += f"\n... и еще {len(blacklist_users) - 20} записей" + + await callback.message.edit_text( + text, + reply_markup=types.InlineKeyboardMarkup(inline_keyboard=[ + [ + types.InlineKeyboardButton( + text="🔄 Обновить", + callback_data="admin_blacklist_view" + ) + ], + [ + types.InlineKeyboardButton( + text="⬅️ Назад", + callback_data="admin_blacklist_settings" + ) + ] + ]) + ) + await callback.answer() + + +@admin_required +@error_handler +async def start_set_blacklist_url( + callback: types.CallbackQuery, + db_user: User, + state: FSMContext +): + """ + Начинает процесс установки URL к черному списку + """ + current_url = blacklist_service.get_blacklist_github_url() or "не задан" + + await callback.message.edit_text( + f"Введите новый URL к файлу черного списка на GitHub\n\n" + f"Текущий URL: {current_url}\n\n" + f"Пример: https://raw.githubusercontent.com/username/repository/main/blacklist.txt\n\n" + f"Для отмены используйте команду /cancel", + reply_markup=types.InlineKeyboardMarkup(inline_keyboard=[ + [ + types.InlineKeyboardButton( + text="⬅️ Назад", + callback_data="admin_blacklist_settings" + ) + ] + ]) + ) + + await state.set_state("waiting_for_blacklist_url") + await callback.answer() + + +@admin_required +@error_handler +async def process_blacklist_url( + message: types.Message, + db_user: User, + state: FSMContext +): + """ + Обрабатывает введенный URL к черному списку + """ + url = message.text.strip() + + # В реальной реализации нужно сохранить URL в систему настроек + # В текущей реализации просто выводим сообщение + if url.lower() in ['/cancel', 'отмена', 'cancel']: + await message.answer( + "Настройка URL отменена", + reply_markup=types.InlineKeyboardMarkup(inline_keyboard=[ + [ + types.InlineKeyboardButton( + text="🔐 Настройки черного списка", + callback_data="admin_blacklist_settings" + ) + ] + ]) + ) + await state.clear() + return + + # Проверяем, что URL выглядит корректно + if not url.startswith(('http://', 'https://')): + await message.answer( + "❌ Некорректный URL. URL должен начинаться с http:// или https://", + reply_markup=types.InlineKeyboardMarkup(inline_keyboard=[ + [ + types.InlineKeyboardButton( + text="🔐 Настройки черного списка", + callback_data="admin_blacklist_settings" + ) + ] + ]) + ) + return + + # В реальной системе здесь нужно сохранить URL в базу данных настроек + # или в систему конфигурации + + await message.answer( + f"✅ URL к черному списку установлен:\n{url}\n\n" + f"Для применения изменений перезапустите бота или измените значение\n" + f"BLACKLIST_GITHUB_URL в файле .env", + reply_markup=types.InlineKeyboardMarkup(inline_keyboard=[ + [ + types.InlineKeyboardButton( + text="🔄 Обновить список", + callback_data="admin_blacklist_update" + ) + ], + [ + types.InlineKeyboardButton( + text="🔐 Настройки черного списка", + callback_data="admin_blacklist_settings" + ) + ] + ]) + ) + await state.clear() + + +async def register_blacklist_handlers(dp): + """ + Регистрация обработчиков черного списка + """ + # Обработчик показа настроек черного списка + # Этот обработчик нужно будет вызывать из меню пользователей или отдельно + dp.callback_query.register( + show_blacklist_settings, + lambda c: c.data == "admin_blacklist_settings" + ) + + # Обработчики для взаимодействия с черным списком + dp.callback_query.register( + toggle_blacklist, + lambda c: c.data == "admin_blacklist_toggle" + ) + + dp.callback_query.register( + update_blacklist, + lambda c: c.data == "admin_blacklist_update" + ) + + dp.callback_query.register( + show_blacklist_users, + lambda c: c.data == "admin_blacklist_view" + ) + + dp.callback_query.register( + start_set_blacklist_url, + lambda c: c.data == "admin_blacklist_set_url" + ) + + # Обработчик сообщений для установки URL (работает только в нужном состоянии) + dp.message.register( + process_blacklist_url, + lambda m: True # Фильтр будет внутри функции + ) \ No newline at end of file diff --git a/app/handlers/admin/bulk_ban.py b/app/handlers/admin/bulk_ban.py new file mode 100644 index 00000000..37d82794 --- /dev/null +++ b/app/handlers/admin/bulk_ban.py @@ -0,0 +1,169 @@ +""" +Обработчики команд для массовой блокировки пользователей +""" +import logging +from aiogram import types +from aiogram.fsm.context import FSMContext +from sqlalchemy.ext.asyncio import AsyncSession +from app.config import settings +from app.database.models import User +from app.services.bulk_ban_service import bulk_ban_service +from app.states import AdminStates +from app.utils.decorators import admin_required, error_handler +from app.keyboards.admin import get_admin_users_keyboard + +logger = logging.getLogger(__name__) + + +@admin_required +@error_handler +async def start_bulk_ban_process( + callback: types.CallbackQuery, + db_user: User, + state: FSMContext +): + """ + Начало процесса массовой блокировки пользователей + """ + await callback.message.edit_text( + "🛑 Массовая блокировка пользователей\n\n" + "Введите список Telegram ID для блокировки.\n\n" + "Форматы ввода:\n" + "• По одному ID на строку\n" + "• Через запятую\n" + "• Через пробел\n\n" + "Пример:\n" + "123456789\n" + "987654321\n" + "111222333\n\n" + "Или:\n" + "123456789, 987654321, 111222333\n\n" + "Для отмены используйте команду /cancel", + parse_mode="HTML", + reply_markup=types.InlineKeyboardMarkup(inline_keyboard=[ + [types.InlineKeyboardButton(text="❌ Отмена", callback_data="admin_users")] + ]) + ) + + await state.set_state(AdminStates.waiting_for_bulk_ban_list) + await callback.answer() + + +@admin_required +@error_handler +async def process_bulk_ban_list( + message: types.Message, + db_user: User, + state: FSMContext, + db: AsyncSession +): + """ + Обработка списка Telegram ID и выполнение массовой блокировки + """ + input_text = message.text.strip() + + if not input_text: + await message.answer( + "❌ Введите корректный список Telegram ID", + reply_markup=types.InlineKeyboardMarkup(inline_keyboard=[ + [types.InlineKeyboardButton(text="🔙 Назад", callback_data="admin_users")] + ]) + ) + return + + # Парсим ID из текста + try: + telegram_ids = await bulk_ban_service.parse_telegram_ids_from_text(input_text) + except Exception as e: + logger.error(f"Ошибка парсинга Telegram ID: {e}") + await message.answer( + "❌ Ошибка при обработке списка ID. Проверьте формат ввода.", + reply_markup=types.InlineKeyboardMarkup(inline_keyboard=[ + [types.InlineKeyboardButton(text="🔙 Назад", callback_data="admin_users")] + ]) + ) + return + + if not telegram_ids: + await message.answer( + "❌ Не найдено корректных Telegram ID в списке", + reply_markup=types.InlineKeyboardMarkup(inline_keyboard=[ + [types.InlineKeyboardButton(text="🔙 Назад", callback_data="admin_users")] + ]) + ) + return + + if len(telegram_ids) > 1000: # Ограничение на количество ID за раз + await message.answer( + f"❌ Слишком много ID в списке ({len(telegram_ids)}). Максимум: 1000", + reply_markup=types.InlineKeyboardMarkup(inline_keyboard=[ + [types.InlineKeyboardButton(text="🔙 Назад", callback_data="admin_users")] + ]) + ) + return + + # Выполняем массовую блокировку + try: + successfully_banned, not_found, error_ids = await bulk_ban_service.ban_users_by_telegram_ids( + db=db, + admin_user_id=db_user.id, + telegram_ids=telegram_ids, + reason="Массовая блокировка администратором", + bot=message.bot, + notify_admin=True, + admin_name=db_user.full_name + ) + + # Подготавливаем сообщение с результатами + result_text = f"✅ Массовая блокировка завершена\n\n" + result_text += f"📊 Результаты:\n" + result_text += f"✅ Успешно заблокировано: {successfully_banned}\n" + result_text += f"❌ Не найдено: {not_found}\n" + result_text += f"💥 Ошибок: {len(error_ids)}\n\n" + result_text += f"📈 Всего обработано: {len(telegram_ids)}" + + if successfully_banned > 0: + result_text += f"\n🎯 Процент успеха: {round((successfully_banned/len(telegram_ids))*100, 1)}%" + + # Добавляем информацию об ошибках, если есть + if error_ids: + result_text += f"\n\n⚠️ Telegram ID с ошибками:\n" + result_text += f"{', '.join(map(str, error_ids[:10]))}" # Показываем первые 10 + if len(error_ids) > 10: + result_text += f" и еще {len(error_ids) - 10}..." + + await message.answer( + result_text, + parse_mode="HTML", + reply_markup=types.InlineKeyboardMarkup(inline_keyboard=[ + [types.InlineKeyboardButton(text="👥 К пользователям", callback_data="admin_users")] + ]) + ) + + except Exception as e: + logger.error(f"Ошибка при выполнении массовой блокировки: {e}") + await message.answer( + "❌ Произошла ошибка при выполнении массовой блокировки", + reply_markup=types.InlineKeyboardMarkup(inline_keyboard=[ + [types.InlineKeyboardButton(text="🔙 Назад", callback_data="admin_users")] + ]) + ) + + await state.clear() + + +async def register_bulk_ban_handlers(dp): + """ + Регистрация обработчиков команд для массовой блокировки + """ + # Обработчик команды начала массовой блокировки + dp.callback_query.register( + start_bulk_ban_process, + lambda c: c.data == "admin_bulk_ban_start" + ) + + # Обработчик текстового сообщения с ID для блокировки + dp.message.register( + process_bulk_ban_list, + lambda m: m.text and AdminStates.waiting_for_bulk_ban_list + ) \ No newline at end of file diff --git a/app/handlers/start.py b/app/handlers/start.py index 7c1c8a57..9fce80cc 100644 --- a/app/handlers/start.py +++ b/app/handlers/start.py @@ -42,6 +42,7 @@ from app.utils.promo_offer import ( from app.utils.timezone import format_local_datetime from app.database.crud.user_message import get_random_active_message from app.database.crud.subscription import decrement_subscription_server_counts +from app.services.blacklist_service import blacklist_service logger = logging.getLogger(__name__) @@ -1000,13 +1001,33 @@ async def process_referral_code_skip( async def complete_registration_from_callback( callback: types.CallbackQuery, - state: FSMContext, + state: FSMContext, db: AsyncSession ): logger.info(f"🎯 COMPLETE: Завершение регистрации для пользователя {callback.from_user.id}") - + + # Проверяем, находится ли пользователь в черном списке + is_blacklisted, blacklist_reason = await blacklist_service.is_user_blacklisted( + callback.from_user.id, + callback.from_user.username + ) + + if is_blacklisted: + logger.warning(f"🚫 Пользователь {callback.from_user.id} находится в черном списке: {blacklist_reason}") + try: + await callback.message.answer( + f"🚫 Регистрация невозможна\n\n" + f"Причина: {blacklist_reason}\n\n" + f"Если вы считаете, что это ошибка, обратитесь в поддержку." + ) + except Exception as e: + logger.error(f"Ошибка при отправке сообщения о блокировке: {e}") + + await state.clear() + return + from sqlalchemy.orm import selectinload - + existing_user = await get_user_by_telegram_id(db, callback.from_user.id) if existing_user and existing_user.status == UserStatus.ACTIVE.value: @@ -1255,12 +1276,32 @@ async def complete_registration_from_callback( async def complete_registration( - message: types.Message, - state: FSMContext, + message: types.Message, + state: FSMContext, db: AsyncSession ): logger.info(f"🎯 COMPLETE: Завершение регистрации для пользователя {message.from_user.id}") - + + # Проверяем, находится ли пользователь в черном списке + is_blacklisted, blacklist_reason = await blacklist_service.is_user_blacklisted( + message.from_user.id, + message.from_user.username + ) + + if is_blacklisted: + logger.warning(f"🚫 Пользователь {message.from_user.id} находится в черном списке: {blacklist_reason}") + try: + await message.answer( + f"🚫 Регистрация невозможна\n\n" + f"Причина: {blacklist_reason}\n\n" + f"Если вы считаете, что это ошибка, обратитесь в поддержку." + ) + except Exception as e: + logger.error(f"Ошибка при отправке сообщения о блокировке: {e}") + + await state.clear() + return + existing_user = await get_user_by_telegram_id(db, message.from_user.id) if existing_user and existing_user.status == UserStatus.ACTIVE.value: diff --git a/app/keyboards/admin.py b/app/keyboards/admin.py index 964984fb..9a08d6d3 100644 --- a/app/keyboards/admin.py +++ b/app/keyboards/admin.py @@ -320,6 +320,18 @@ def get_admin_users_keyboard(language: str = "ru") -> InlineKeyboardMarkup: callback_data="admin_users_filters" ) ], + [ + InlineKeyboardButton( + text=_t(texts, "ADMIN_USERS_BLACKLIST", "🔐 Черный список"), + callback_data="admin_blacklist_settings" + ) + ], + [ + InlineKeyboardButton( + text=_t(texts, "ADMIN_USERS_BULK_BAN", "🛑 Массовый бан"), + callback_data="admin_bulk_ban_start" + ) + ], [ InlineKeyboardButton(text=texts.BACK, callback_data="admin_submenu_users") ] diff --git a/app/localization/locales/en.json b/app/localization/locales/en.json index ab1b9dcd..6ec6a478 100644 --- a/app/localization/locales/en.json +++ b/app/localization/locales/en.json @@ -1528,5 +1528,7 @@ "POLL_EMPTY": "Poll is not available yet.", "POLL_ERROR": "Unable to process the poll. Please try again later.", "POLL_COMPLETED": "🙏 Thanks for completing the poll!", - "POLL_REWARD_GRANTED": "Reward {amount} has been credited to your balance." + "POLL_REWARD_GRANTED": "Reward {amount} has been credited to your balance.", + "ADMIN_USERS_BULK_BAN": "🛑 Bulk Ban", + "ADMIN_USERS_BLACKLIST": "🔐 Blacklist" } diff --git a/app/localization/locales/ru.json b/app/localization/locales/ru.json index 5a24611e..23304180 100644 --- a/app/localization/locales/ru.json +++ b/app/localization/locales/ru.json @@ -1540,5 +1540,7 @@ "POLL_EMPTY": "Опрос пока недоступен.", "POLL_ERROR": "Не удалось обработать опрос. Попробуйте позже.", "POLL_COMPLETED": "🙏 Спасибо за участие в опросе!", - "POLL_REWARD_GRANTED": "Награда {amount} зачислена на ваш баланс." + "POLL_REWARD_GRANTED": "Награда {amount} зачислена на ваш баланс.", + "ADMIN_USERS_BULK_BAN": "🛑 Массовый бан", + "ADMIN_USERS_BLACKLIST": "🔐 Черный список" } diff --git a/app/services/admin_notification_service.py b/app/services/admin_notification_service.py index f12e0664..dc9ab2d3 100644 --- a/app/services/admin_notification_service.py +++ b/app/services/admin_notification_service.py @@ -1451,6 +1451,50 @@ class AdminNotificationService: return str(value) return str(value) + async def send_bulk_ban_notification( + self, + admin_user_id: int, + successfully_banned: int, + not_found: int, + errors: int, + admin_name: str = "Администратор" + ) -> bool: + """Отправляет уведомление о массовой блокировке пользователей""" + if not self._is_enabled(): + return False + + try: + message_lines = [ + "🛑 МАССОВАЯ БЛОКИРОВКА ПОЛЬЗОВАТЕЛЕЙ", + "", + f"👮 Администратор: {admin_name}", + f"🆔 ID администратора: {admin_user_id}", + "", + "📊 Результаты:", + f"✅ Успешно заблокировано: {successfully_banned}", + f"❌ Не найдено: {not_found}", + f"💥 Ошибок: {errors}" + ] + + total_processed = successfully_banned + not_found + errors + if total_processed > 0: + success_rate = (successfully_banned / total_processed) * 100 + message_lines.append(f"📈 Успешность: {success_rate:.1f}%") + + message_lines.extend( + [ + "", + f"⏰ {format_local_datetime(datetime.utcnow(), '%d.%m.%Y %H:%M:%S')}", + ] + ) + + message = "\n".join(message_lines) + return await self._send_message(message) + + except Exception as e: + logger.error(f"Ошибка отправки уведомления о массовой блокировке: {e}") + return False + async def send_ticket_event_notification( self, text: str, @@ -1468,3 +1512,49 @@ class AdminNotificationService: if not (self._is_enabled() and runtime_enabled): return False return await self._send_message(text, reply_markup=keyboard, ticket_event=True) + + async def send_suspicious_traffic_notification( + self, + message: str, + bot: Bot, + topic_id: Optional[int] = None + ) -> bool: + """ + Отправляет уведомление о подозрительной активности трафика + + Args: + message: текст уведомления + bot: экземпляр бота для отправки сообщения + topic_id: ID топика для отправки уведомления (если не указан, использует стандартный) + """ + if not self.chat_id: + logger.warning("ADMIN_NOTIFICATIONS_CHAT_ID не настроен") + return False + + # Используем специальный топик для подозрительной активности, если он задан + notification_topic_id = topic_id or self.topic_id + + try: + message_kwargs = { + 'chat_id': self.chat_id, + 'text': message, + 'parse_mode': 'HTML', + 'disable_web_page_preview': True + } + + if notification_topic_id: + message_kwargs['message_thread_id'] = notification_topic_id + + await bot.send_message(**message_kwargs) + logger.info(f"Уведомление о подозрительной активности отправлено в чат {self.chat_id}, топик {notification_topic_id}") + return True + + except TelegramForbiddenError: + logger.error(f"Бот не имеет прав для отправки в чат {self.chat_id}") + return False + except TelegramBadRequest as e: + logger.error(f"Ошибка отправки уведомления о подозрительной активности: {e}") + return False + except Exception as e: + logger.error(f"Неожиданная ошибка при отправке уведомления о подозрительной активности: {e}") + return False diff --git a/app/services/blacklist_service.py b/app/services/blacklist_service.py new file mode 100644 index 00000000..f21ec5dd --- /dev/null +++ b/app/services/blacklist_service.py @@ -0,0 +1,230 @@ +""" +Сервис для работы с черным списком пользователей +Проверяет пользователей по списку из GitHub репозитория +""" +import asyncio +import logging +import re +from typing import List, Dict, Optional, Tuple +from datetime import datetime, timedelta +import aiohttp +from app.config import settings + + +logger = logging.getLogger(__name__) + + +class BlacklistService: + """ + Сервис для проверки пользователей по черному списку + """ + + def __init__(self): + self.blacklist_data = [] # Список в формате [(telegram_id, username, reason), ...] + self.last_update = None + # Используем интервал из настроек, по умолчанию 24 часа + interval_hours = self.get_blacklist_update_interval_hours() + self.update_interval = timedelta(hours=interval_hours) + self.lock = asyncio.Lock() # Блокировка для предотвращения одновременных обновлений + + def is_blacklist_check_enabled(self) -> bool: + """Проверяет, включена ли проверка черного списка""" + return getattr(settings, 'BLACKLIST_CHECK_ENABLED', False) + + def get_blacklist_github_url(self) -> Optional[str]: + """Получает URL к файлу черного списка на GitHub""" + return getattr(settings, 'BLACKLIST_GITHUB_URL', None) + + def get_blacklist_update_interval_hours(self) -> int: + """Получает интервал обновления черного списка в часах""" + return getattr(settings, 'BLACKLIST_UPDATE_INTERVAL_HOURS', 24) + + def should_ignore_admins(self) -> bool: + """Проверяет, нужно ли игнорировать администраторов при проверке черного списка""" + return getattr(settings, 'BLACKLIST_IGNORE_ADMINS', True) + + def is_admin(self, telegram_id: int) -> bool: + """Проверяет, является ли пользователь администратором""" + return settings.is_admin(telegram_id) + + async def update_blacklist(self) -> bool: + """ + Обновляет черный список из GitHub репозитория + """ + async with self.lock: + github_url = self.get_blacklist_github_url() + if not github_url: + logger.warning("URL к черному списку не задан в настройках") + return False + + try: + # Заменяем github.com на raw.githubusercontent.com для получения raw содержимого + if "github.com" in github_url: + raw_url = github_url.replace("github.com", "raw.githubusercontent.com").replace("/blob/", "/") + else: + raw_url = github_url + + # Получаем содержимое файла + async with aiohttp.ClientSession() as session: + async with session.get(raw_url) as response: + if response.status != 200: + logger.error(f"Ошибка при получении черного списка: статус {response.status}") + return False + + content = await response.text() + + # Разбираем содержимое файла + blacklist_data = [] + lines = content.splitlines() + + for line_num, line in enumerate(lines, 1): + line = line.strip() + if not line or line.startswith('#'): + continue # Пропускаем пустые строки и комментарии + + # В формате '7021477105 #@MAMYT_PAXAL2016, перепродажа подписок' + # только первая часть до пробела - это Telegram ID, всё остальное комментарий + parts = line.split() + if not parts: + continue + + try: + telegram_id = int(parts[0]) # Первое число - это Telegram ID + # Всё остальное - просто комментарий, не используем его для логики + # Но можем использовать первую часть после ID как username для отображения + username = "" + if len(parts) > 1: + # Берем вторую часть как username (если начинается с @) + if parts[1].startswith('@'): + username = parts[1] + + # По умолчанию используем "Занесен в черный список", если нет другой информации + reason = "Занесен в черный список" + + # Если есть запятая в строке, можем использовать часть после нее как причину + full_line_after_id = line[len(str(telegram_id)):].strip() + if ',' in full_line_after_id: + # Извлекаем причину после запятой + after_comma = full_line_after_id.split(',', 1)[1].strip() + reason = after_comma + + blacklist_data.append((telegram_id, username, reason)) + except ValueError: + # Если не удается преобразовать в число, это не ID + logger.warning(f"Неверный формат строки {line_num} в черном списке - первое значение не является числом: {line}") + + self.blacklist_data = blacklist_data + self.last_update = datetime.utcnow() + logger.info(f"Черный список успешно обновлен. Найдено {len(blacklist_data)} записей") + return True + + except ValueError as e: + logger.error(f"Ошибка при парсинге ID из черного списка: {e}") + return False + except Exception as e: + logger.error(f"Ошибка при обновлении черного списка: {e}") + return False + + async def is_user_blacklisted(self, telegram_id: int, username: Optional[str] = None) -> Tuple[bool, Optional[str]]: + """ + Проверяет, находится ли пользователь в черном списке + + Args: + telegram_id: Telegram ID пользователя + username: Username пользователя (опционально) + + Returns: + Кортеж (в черном списке, причина) + """ + if not self.is_blacklist_check_enabled(): + return False, None + + # Проверяем, является ли пользователь администратором и нужно ли его игнорировать + if self.should_ignore_admins() and self.is_admin(telegram_id): + logger.info(f"Пользователь {telegram_id} является администратором, игнорируем проверку черного списка") + return False, None + + # Если черный список пуст или устарел, обновляем его + interval_hours = self.get_blacklist_update_interval_hours() + required_interval = timedelta(hours=interval_hours) + if not self.blacklist_data or (self.last_update and + datetime.utcnow() - self.last_update > required_interval): + await self.update_blacklist() + + # Проверяем по Telegram ID + for bl_id, bl_username, bl_reason in self.blacklist_data: + if bl_id == telegram_id: + logger.info(f"Пользователь {telegram_id} найден в черном списке по ID: {bl_reason}") + return True, bl_reason + + # Проверяем по username, если он передан + if username: + for bl_id, bl_username, bl_reason in self.blacklist_data: + if bl_username and (bl_username == username or bl_username == f"@{username}"): + logger.info(f"Пользователь {username} ({telegram_id}) найден в черном списке по username: {bl_reason}") + return True, bl_reason + + return False, None + + async def get_all_blacklisted_users(self) -> List[Tuple[int, str, str]]: + """ + Возвращает весь черный список + """ + interval_hours = self.get_blacklist_update_interval_hours() + required_interval = timedelta(hours=interval_hours) + if not self.blacklist_data or (self.last_update and + datetime.utcnow() - self.last_update > required_interval): + await self.update_blacklist() + + return self.blacklist_data.copy() + + async def get_user_by_telegram_id(self, telegram_id: int) -> Optional[Tuple[int, str, str]]: + """ + Возвращает информацию о пользователе из черного списка по Telegram ID + + Args: + telegram_id: Telegram ID пользователя + + Returns: + Кортеж (telegram_id, username, reason) или None, если не найден + """ + for bl_id, bl_username, bl_reason in self.blacklist_data: + if bl_id == telegram_id: + return (bl_id, bl_username, bl_reason) + return None + + async def get_user_by_username(self, username: str) -> Optional[Tuple[int, str, str]]: + """ + Возвращает информацию о пользователе из черного списка по username + + Args: + username: Username пользователя + + Returns: + Кортеж (telegram_id, username, reason) или None, если не найден + """ + # Проверяем как с @, так и без + username_with_at = f"@{username}" if not username.startswith('@') else username + username_without_at = username.lstrip('@') + + for bl_id, bl_username, bl_reason in self.blacklist_data: + if bl_username == username_with_at or bl_username.lstrip('@') == username_without_at: + return (bl_id, bl_username, bl_reason) + return None + + async def force_update_blacklist(self) -> Tuple[bool, str]: + """ + Принудительно обновляет черный список + + Returns: + Кортеж (успешно, сообщение) + """ + success = await self.update_blacklist() + if success: + return True, f"Черный список обновлен успешно. Записей: {len(self.blacklist_data)}" + else: + return False, "Ошибка обновления черного списка" + + +# Глобальный экземпляр сервиса +blacklist_service = BlacklistService() \ No newline at end of file diff --git a/app/services/bulk_ban_service.py b/app/services/bulk_ban_service.py new file mode 100644 index 00000000..e07f0386 --- /dev/null +++ b/app/services/bulk_ban_service.py @@ -0,0 +1,177 @@ +""" +Модуль для массовой блокировки пользователей по списку Telegram ID +""" + +import logging +from typing import List, Tuple +from sqlalchemy.ext.asyncio import AsyncSession +from aiogram import Bot + +from app.database.crud.user import get_user_by_telegram_id +from app.services.user_service import UserService +from app.services.admin_notification_service import AdminNotificationService +from app.config import settings +from app.database.models import UserStatus + + +logger = logging.getLogger(__name__) + + +class BulkBanService: + """ + Сервис для массовой блокировки пользователей по списку Telegram ID + """ + + def __init__(self): + self.user_service = UserService() + + async def ban_users_by_telegram_ids( + self, + db: AsyncSession, + admin_user_id: int, + telegram_ids: List[int], + reason: str = "Заблокирован администратором по списку", + bot: Bot = None, + notify_admin: bool = True, + admin_name: str = "Администратор" + ) -> Tuple[int, int, List[int]]: + """ + Массовая блокировка пользователей по Telegram ID + + Args: + db: Асинхронная сессия базы данных + admin_user_id: ID администратора, который осуществляет блокировку + telegram_ids: Список Telegram ID для блокировки + reason: Причина блокировки + bot: Бот для отправки уведомлений + notify_admin: Отправлять ли уведомления администратору + admin_name: Имя администратора для логирования + + Returns: + Кортеж из (успешно заблокированных, не найденных, список ID с ошибками) + """ + successfully_banned = 0 + not_found_users = [] + error_ids = [] + + for telegram_id in telegram_ids: + try: + # Получаем пользователя по Telegram ID + user = await get_user_by_telegram_id(db, telegram_id) + + if not user: + logger.warning(f"Пользователь с Telegram ID {telegram_id} не найден") + not_found_users.append(telegram_id) + continue + + # Проверяем, что пользователь не заблокирован уже + if user.status == UserStatus.BLOCKED.value: + logger.info(f"Пользователь {telegram_id} уже заблокирован") + continue + + # Блокируем пользователя + ban_success = await self.user_service.block_user( + db, user.id, admin_user_id, reason + ) + + if ban_success: + successfully_banned += 1 + logger.info(f"Пользователь {telegram_id} успешно заблокирован") + + # Отправляем уведомление пользователю, если возможно + if bot: + try: + await bot.send_message( + chat_id=telegram_id, + text=( + f"🚫 Ваш аккаунт заблокирован\n\n" + f"Причина: {reason}\n\n" + f"Если вы считаете, что блокировка произошла ошибочно, " + f"обратитесь в поддержку." + ), + parse_mode="HTML" + ) + except Exception as e: + logger.warning(f"Не удалось отправить уведомление пользователю {telegram_id}: {e}") + else: + logger.error(f"Не удалось заблокировать пользователя {telegram_id}") + error_ids.append(telegram_id) + + except Exception as e: + logger.error(f"Ошибка при блокировке пользователя {telegram_id}: {e}") + error_ids.append(telegram_id) + + # Отправляем уведомление администратору + if notify_admin and bot: + try: + admin_notification_service = AdminNotificationService(bot) + await admin_notification_service.send_bulk_ban_notification( + admin_user_id, + successfully_banned, + len(not_found_users), + len(error_ids), + admin_name + ) + except Exception as e: + logger.error(f"Ошибка при отправке уведомления администратору: {e}") + + logger.info( + f"Массовая блокировка завершена: успешно={successfully_banned}, " + f"не найдено={len(not_found_users)}, ошибки={len(error_ids)}" + ) + + return successfully_banned, len(not_found_users), error_ids + + async def parse_telegram_ids_from_text(self, text: str) -> List[int]: + """ + Парсит Telegram ID из текста. Поддерживает различные форматы: + - по одному ID на строку + - через запятую + - через пробелы + - с @username (если username соответствует формату ID) + """ + if not text: + return [] + + # Удаляем лишние пробелы и разбиваем по переносам строк + lines = text.strip().split('\n') + ids = [] + + for line in lines: + # Убираем комментарии и лишние пробелы + line = line.strip() + if not line or line.startswith('#'): + continue + + # Разбиваем строку по запятым или пробелам + tokens = line.replace(',', ' ').split() + + for token in tokens: + token = token.strip() + + # Убираем символ @ если присутствует + if token.startswith('@'): + token = token[1:] + + # Проверяем, является ли токен числом (Telegram ID) + try: + telegram_id = int(token) + if telegram_id > 0: # Telegram ID должны быть положительными + ids.append(telegram_id) + except ValueError: + # Пропускаем, если не является числом + continue + + # Убираем дубликаты, сохранив порядок + unique_ids = [] + seen = set() + for tid in ids: + if tid not in seen: + unique_ids.append(tid) + seen.add(tid) + + return unique_ids + + +# Создаем глобальный экземпляр сервиса +bulk_ban_service = BulkBanService() \ No newline at end of file diff --git a/app/services/traffic_monitoring_service.py b/app/services/traffic_monitoring_service.py new file mode 100644 index 00000000..09e27262 --- /dev/null +++ b/app/services/traffic_monitoring_service.py @@ -0,0 +1,332 @@ +""" +Сервис для мониторинга трафика пользователей +Проверяет, не превышает ли пользователь заданный порог трафика за сутки +""" +import logging +import asyncio +from datetime import datetime, timedelta +from typing import Dict, List, Optional, Tuple +from decimal import Decimal + +import aiohttp + +from app.config import settings +from app.services.admin_notification_service import AdminNotificationService +from app.services.remnawave_service import RemnaWaveService +from app.database.crud.user import get_user_by_remnawave_uuid +from app.database.models import User +from sqlalchemy.ext.asyncio import AsyncSession + + +logger = logging.getLogger(__name__) + + +class TrafficMonitoringService: + """ + Сервис для мониторинга трафика пользователей + """ + + def __init__(self): + self.remnawave_service = RemnaWaveService() + self.lock = asyncio.Lock() # Блокировка для предотвращения одновременных проверок + + def is_traffic_monitoring_enabled(self) -> bool: + """Проверяет, включен ли мониторинг трафика""" + return getattr(settings, 'TRAFFIC_MONITORING_ENABLED', False) + + def get_traffic_threshold_gb(self) -> float: + """Получает порог трафика в ГБ за сутки""" + return getattr(settings, 'TRAFFIC_THRESHOLD_GB_PER_DAY', 10.0) + + def get_monitoring_interval_hours(self) -> int: + """Получает интервал мониторинга в часах""" + return getattr(settings, 'TRAFFIC_MONITORING_INTERVAL_HOURS', 24) + + def get_suspicious_notifications_topic_id(self) -> Optional[int]: + """Получает ID топика для уведомлений о подозрительной активности""" + return getattr(settings, 'SUSPICIOUS_NOTIFICATIONS_TOPIC_ID', None) + + async def get_user_daily_traffic(self, user_uuid: str) -> Dict: + """ + Получает статистику трафика пользователя за последние 24 часа + + Args: + user_uuid: UUID пользователя в Remnawave + + Returns: + Словарь с информацией о трафике + """ + try: + # Получаем время начала и конца суток (сегодня) + now = datetime.utcnow() + start_of_day = now.replace(hour=0, minute=0, second=0, microsecond=0) + end_of_day = now.replace(hour=23, minute=59, second=59, microsecond=999999) + + # Форматируем даты в ISO формат + start_date = start_of_day.strftime("%Y-%m-%dT%H:%M:%S.000Z") + end_date = end_of_day.strftime("%Y-%m-%dT%H:%M:%S.999Z") + + # Получаем API клиент и вызываем метод получения статистики + async with self.remnawave_service.get_api_client() as api: + traffic_data = await api.get_user_stats_usage(user_uuid, start_date, end_date) + + # Обрабатываем ответ API + if traffic_data and 'response' in traffic_data: + response = traffic_data['response'] + + # Вычисляем общий трафик + total_gb = 0 + nodes_info = [] + + if isinstance(response, list): + for item in response: + node_name = item.get('nodeName', 'Unknown') + total_bytes = item.get('total', 0) + total_gb_item = round(total_bytes / (1024**3), 2) # Конвертируем в ГБ + total_gb += total_gb_item + + nodes_info.append({ + 'node': node_name, + 'gb': total_gb_item + }) + else: + # Если response - это уже результат обработки (как в примере) + total_gb = response.get('total_gb', 0) + nodes_info = response.get('nodes', []) + + return { + 'total_gb': total_gb, + 'nodes': nodes_info, + 'date_range': { + 'start': start_date, + 'end': end_date + } + } + else: + logger.warning(f"Нет данных о трафике для пользователя {user_uuid}") + return { + 'total_gb': 0, + 'nodes': [], + 'date_range': { + 'start': start_date, + 'end': end_date + } + } + + except Exception as e: + logger.error(f"Ошибка при получении статистики трафика для {user_uuid}: {e}") + return { + 'total_gb': 0, + 'nodes': [], + 'date_range': { + 'start': None, + 'end': None + } + } + + async def check_user_traffic_threshold( + self, + db: AsyncSession, + user_uuid: str, + user_telegram_id: int = None + ) -> Tuple[bool, Dict]: + """ + Проверяет, превышает ли трафик пользователя заданный порог + + Args: + db: Сессия базы данных + user_uuid: UUID пользователя в Remnawave + user_telegram_id: Telegram ID пользователя (для логирования) + + Returns: + Кортеж (превышен ли порог, информация о трафике) + """ + if not self.is_traffic_monitoring_enabled(): + return False, {} + + # Получаем статистику трафика + traffic_info = await self.get_user_daily_traffic(user_uuid) + total_gb = traffic_info.get('total_gb', 0) + + # Получаем порог для сравнения + threshold_gb = self.get_traffic_threshold_gb() + + # Проверяем, превышает ли трафик порог + is_exceeded = total_gb > threshold_gb + + # Логируем проверку + user_id_info = f"telegram_id={user_telegram_id}" if user_telegram_id else f"uuid={user_uuid}" + status = "ПРЕВЫШЕНИЕ" if is_exceeded else "норма" + logger.info( + f"📊 Проверка трафика для {user_id_info}: {total_gb} ГБ, " + f"порог: {threshold_gb} ГБ, статус: {status}" + ) + + return is_exceeded, traffic_info + + async def process_suspicious_traffic( + self, + db: AsyncSession, + user_uuid: str, + traffic_info: Dict, + bot + ): + """ + Обрабатывает подозрительный трафик - отправляет уведомление администраторам + """ + try: + # Получаем информацию о пользователе из базы данных + user = await get_user_by_remnawave_uuid(db, user_uuid) + if not user: + logger.warning(f"Пользователь с UUID {user_uuid} не найден в базе данных") + return + + # Формируем сообщение для администраторов + total_gb = traffic_info.get('total_gb', 0) + threshold_gb = self.get_traffic_threshold_gb() + + message = ( + f"⚠️ Подозрительная активность трафика\n\n" + f"👤 Пользователь: {user.full_name} (ID: {user.telegram_id})\n" + f"🔑 UUID: {user_uuid}\n" + f"📊 Трафик за сутки: {total_gb} ГБ\n" + f"📈 Порог: {threshold_gb} ГБ\n" + f"🚨 Превышение: {total_gb - threshold_gb:.2f} ГБ\n\n" + ) + + # Добавляем информацию по нодам, если есть + nodes = traffic_info.get('nodes', []) + if nodes: + message += "Разбивка по нодам:\n" + for node_info in nodes[:5]: # Показываем первые 5 нод + message += f" • {node_info.get('node', 'Unknown')}: {node_info.get('gb', 0)} ГБ\n" + if len(nodes) > 5: + message += f" • и ещё {len(nodes) - 5} нод(ы)\n" + + message += f"\n⏰ Время проверки: {datetime.utcnow().strftime('%d.%m.%Y %H:%M:%S UTC')}" + + # Создаем AdminNotificationService с ботом + admin_notification_service = AdminNotificationService(bot) + + # Отправляем уведомление администраторам + topic_id = self.get_suspicious_notifications_topic_id() + + await admin_notification_service.send_suspicious_traffic_notification( + message, + bot, + topic_id + ) + + logger.info( + f"✅ Уведомление о подозрительном трафике отправлено для пользователя {user.telegram_id}" + ) + + except Exception as e: + logger.error(f"❌ Ошибка при обработке подозрительного трафика для {user_uuid}: {e}") + + async def check_all_users_traffic(self, db: AsyncSession, bot): + """ + Проверяет трафик всех пользователей с активной подпиской + """ + if not self.is_traffic_monitoring_enabled(): + logger.info("Мониторинг трафика отключен, пропускаем проверку всех пользователей") + return + + try: + from app.database.crud.user import get_users_with_active_subscriptions + + # Получаем всех пользователей с активной подпиской + users = await get_users_with_active_subscriptions(db) + + logger.info(f"Начинаем проверку трафика для {len(users)} пользователей") + + # Проверяем трафик для каждого пользователя + for user in users: + if user.remnawave_uuid: # Проверяем только пользователей с UUID + is_exceeded, traffic_info = await self.check_user_traffic_threshold( + db, + user.remnawave_uuid, + user.telegram_id + ) + + if is_exceeded: + await self.process_suspicious_traffic( + db, + user.remnawave_uuid, + traffic_info, + bot + ) + + logger.info("Завершена проверка трафика всех пользователей") + + except Exception as e: + logger.error(f"❌ Ошибка при проверке трафика всех пользователей: {e}") + + +class TrafficMonitoringScheduler: + """ + Класс для планирования периодических проверок трафика + """ + def __init__(self, traffic_service: TrafficMonitoringService): + self.traffic_service = traffic_service + self.check_task = None + self.is_running = False + + async def start_monitoring(self, db: AsyncSession, bot): + """ + Запускает периодическую проверку трафика + """ + if self.is_running: + logger.warning("Мониторинг трафика уже запущен") + return + + if not self.traffic_service.is_traffic_monitoring_enabled(): + logger.info("Мониторинг трафика отключен в настройках") + return + + self.is_running = True + interval_hours = self.traffic_service.get_monitoring_interval_hours() + interval_seconds = interval_hours * 3600 + + logger.info(f"Запуск мониторинга трафика с интервалом {interval_hours} часов") + + # Запускаем задачу с интервалом + self.check_task = asyncio.create_task(self._periodic_check(db, bot, interval_seconds)) + + async def stop_monitoring(self): + """ + Останавливает периодическую проверку трафика + """ + if self.check_task: + self.check_task.cancel() + try: + await self.check_task + except asyncio.CancelledError: + pass + self.is_running = False + logger.info("Мониторинг трафика остановлен") + + async def _periodic_check(self, db: AsyncSession, bot, interval_seconds: int): + """ + Выполняет периодическую проверку трафика + """ + while self.is_running: + try: + logger.info("Запуск периодической проверки трафика") + await self.traffic_service.check_all_users_traffic(db, bot) + + # Ждем указанный интервал перед следующей проверкой + await asyncio.sleep(interval_seconds) + + except asyncio.CancelledError: + logger.info("Задача периодической проверки трафика отменена") + break + except Exception as e: + logger.error(f"Ошибка в периодической проверке трафика: {e}") + # Даже при ошибке продолжаем цикл, ждем интервал и пробуем снова + await asyncio.sleep(interval_seconds) + + +# Глобальные экземпляры сервисов +traffic_monitoring_service = TrafficMonitoringService() +traffic_monitoring_scheduler = TrafficMonitoringScheduler(traffic_monitoring_service) \ No newline at end of file