import logging from typing import Callable, Dict, Any, Awaitable, Optional from datetime import datetime from aiogram import BaseMiddleware, Bot, types from aiogram.exceptions import TelegramForbiddenError, TelegramBadRequest from aiogram.fsm.context import FSMContext from aiogram.types import TelegramObject, Update, Message, CallbackQuery from aiogram.enums import ChatMemberStatus from sqlalchemy.ext.asyncio import AsyncSession from app.config import settings from app.database.database import AsyncSessionLocal from app.database.crud.campaign import get_campaign_by_start_parameter from app.database.crud.subscription import deactivate_subscription, reactivate_subscription from app.database.crud.user import get_user_by_telegram_id from app.database.models import SubscriptionStatus, User from app.keyboards.inline import get_channel_sub_keyboard from app.localization.loader import DEFAULT_LANGUAGE from app.localization.texts import get_texts from app.utils.check_reg_process import is_registration_process from app.services.subscription_service import SubscriptionService from app.services.admin_notification_service import AdminNotificationService logger = logging.getLogger(__name__) class ChannelCheckerMiddleware(BaseMiddleware): """ Middleware для проверки подписки на канал. ОПТИМИЗИРОВАНО: создаёт максимум одну сессию БД на запрос. """ def __init__(self): self.BAD_MEMBER_STATUS = ( ChatMemberStatus.LEFT, ChatMemberStatus.KICKED, ChatMemberStatus.RESTRICTED ) self.GOOD_MEMBER_STATUS = ( ChatMemberStatus.MEMBER, ChatMemberStatus.ADMINISTRATOR, ChatMemberStatus.CREATOR ) logger.info("🔧 ChannelCheckerMiddleware инициализирован") async def __call__( self, handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]], event: TelegramObject, data: Dict[str, Any] ) -> Any: telegram_id = None if isinstance(event, (Message, CallbackQuery)): telegram_id = event.from_user.id elif isinstance(event, Update): if event.message: telegram_id = event.message.from_user.id elif event.callback_query: telegram_id = event.callback_query.from_user.id if telegram_id is None: logger.debug("❌ telegram_id не найден, пропускаем") return await handler(event, data) # Админам разрешаем пропускать проверку подписки if settings.is_admin(telegram_id): logger.debug( "✅ Пользователь %s является администратором — пропускаем проверку подписки", telegram_id, ) return await handler(event, data) state: FSMContext = data.get('state') current_state = None if state: current_state = await state.get_state() is_reg_process = is_registration_process(event, current_state) if is_reg_process: logger.debug("✅ Событие разрешено (процесс регистрации), пропускаем проверку") return await handler(event, data) bot: Bot = data["bot"] channel_id = settings.CHANNEL_SUB_ID if not channel_id: logger.warning("⚠️ CHANNEL_SUB_ID не установлен, пропускаем проверку") return await handler(event, data) is_required = settings.CHANNEL_IS_REQUIRED_SUB if not is_required: logger.debug("⚠️ Обязательная подписка отключена, пропускаем проверку") return await handler(event, data) channel_link = self._normalize_channel_link(settings.CHANNEL_LINK, channel_id) if not channel_link: logger.warning( "⚠️ CHANNEL_LINK не задан или невалиден, кнопка подписки будет скрыта" ) try: member = await bot.get_chat_member(chat_id=channel_id, user_id=telegram_id) if member.status in self.GOOD_MEMBER_STATUS: # Реактивируем подписку если была отключена из-за отписки от канала if telegram_id and (settings.CHANNEL_DISABLE_TRIAL_ON_UNSUBSCRIBE or settings.CHANNEL_REQUIRED_FOR_ALL): await self._reactivate_subscription_on_subscribe(telegram_id, bot) return await handler(event, data) elif member.status in self.BAD_MEMBER_STATUS: logger.info(f"❌ Пользователь {telegram_id} не подписан на канал (статус: {member.status})") if telegram_id and (settings.CHANNEL_DISABLE_TRIAL_ON_UNSUBSCRIBE or settings.CHANNEL_REQUIRED_FOR_ALL): await self._deactivate_subscription_on_unsubscribe(telegram_id, bot, channel_link) await self._capture_start_payload(state, event, bot) if isinstance(event, CallbackQuery) and event.data == "sub_channel_check": await event.answer("❌ Вы еще не подписались на канал! Подпишитесь и попробуйте снова.", show_alert=True) return return await self._deny_message(event, bot, channel_link, channel_id) else: logger.warning(f"⚠️ Неожиданный статус пользователя {telegram_id}: {member.status}") await self._capture_start_payload(state, event, bot) return await self._deny_message(event, bot, channel_link, channel_id) except TelegramForbiddenError as e: logger.error(f"❌ Бот заблокирован в канале {channel_id}: {e}") await self._capture_start_payload(state, event, bot) return await self._deny_message(event, bot, channel_link, channel_id) except TelegramBadRequest as e: if "chat not found" in str(e).lower(): logger.error(f"❌ Канал {channel_id} не найден: {e}") elif "user not found" in str(e).lower(): logger.error(f"❌ Пользователь {telegram_id} не найден: {e}") else: logger.error(f"❌ Ошибка запроса к каналу {channel_id}: {e}") await self._capture_start_payload(state, event, bot) return await self._deny_message(event, bot, channel_link, channel_id) except Exception as e: logger.error(f"❌ Неожиданная ошибка при проверке подписки: {e}") return await handler(event, data) @staticmethod def _normalize_channel_link(channel_link: Optional[str], channel_id: Optional[str]) -> Optional[str]: link = (channel_link or "").strip() if link.startswith("@"): # raw username return f"https://t.me/{link.lstrip('@')}" if link and not link.lower().startswith(("http://", "https://", "tg://")): return f"https://{link}" if link: return link if channel_id and str(channel_id).startswith("@"): return f"https://t.me/{str(channel_id).lstrip('@')}" return None async def _capture_start_payload( self, state: Optional[FSMContext], event: TelegramObject, bot: Optional[Bot] = None, ) -> None: if not state: return message: Optional[Message] = None if isinstance(event, Message): message = event elif isinstance(event, CallbackQuery): message = event.message elif isinstance(event, Update): message = event.message if not message or not message.text: return text = message.text.strip() if not text.startswith("/start"): return parts = text.split(maxsplit=1) if len(parts) < 2 or not parts[1]: return payload = parts[1] state_data = await state.get_data() or {} if state_data.get("pending_start_payload") != payload: state_data["pending_start_payload"] = payload await state.set_data(state_data) logger.debug("💾 Сохранен start payload %s для последующей обработки", payload) if bot and message.from_user: await self._try_send_campaign_visit_notification( bot, message.from_user, state, payload, ) async def _try_send_campaign_visit_notification( self, bot: Bot, telegram_user: types.User, state: FSMContext, payload: str, ) -> None: try: state_data = await state.get_data() or {} except Exception as error: logger.error( "❌ Не удалось получить данные состояния для уведомления по кампании %s: %s", payload, error, ) return if state_data.get("campaign_notification_sent"): return async with AsyncSessionLocal() as db: try: campaign = await get_campaign_by_start_parameter( db, payload, only_active=True, ) if not campaign: return user = await get_user_by_telegram_id(db, telegram_user.id) notification_service = AdminNotificationService(bot) sent = await notification_service.send_campaign_link_visit_notification( db, telegram_user, campaign, user, ) if sent: await state.update_data(campaign_notification_sent=True) await db.commit() except Exception as error: logger.error( "❌ Ошибка отправки уведомления о переходе по кампании %s: %s", payload, error, ) await db.rollback() async def _deactivate_subscription_on_unsubscribe( self, telegram_id: int, bot: Bot, channel_link: Optional[str] ) -> None: """Деактивация подписки при отписке от канала.""" if not settings.CHANNEL_DISABLE_TRIAL_ON_UNSUBSCRIBE and not settings.CHANNEL_REQUIRED_FOR_ALL: return async with AsyncSessionLocal() as db: try: user = await get_user_by_telegram_id(db, telegram_id) if not user or not user.subscription: return subscription = user.subscription if subscription.status != SubscriptionStatus.ACTIVE.value: return if settings.CHANNEL_REQUIRED_FOR_ALL: pass elif not subscription.is_trial: return await deactivate_subscription(db, subscription) sub_type = "Триальная" if subscription.is_trial else "Платная" logger.info( "🚫 %s подписка пользователя %s отключена после отписки от канала", sub_type, telegram_id, ) if user.remnawave_uuid: service = SubscriptionService() try: await service.disable_remnawave_user(user.remnawave_uuid) except Exception as api_error: logger.error( "❌ Не удалось отключить пользователя RemnaWave %s: %s", user.remnawave_uuid, api_error, ) # Уведомляем пользователя о деактивации try: texts = get_texts(user.language if user.language else DEFAULT_LANGUAGE) notification_text = texts.t( "SUBSCRIPTION_DEACTIVATED_CHANNEL_UNSUBSCRIBE", "🚫 Ваша подписка приостановлена, так как вы отписались от канала.\n\n" "Подпишитесь на канал снова, чтобы восстановить доступ к VPN." ) channel_kb = get_channel_sub_keyboard(channel_link, language=user.language) await bot.send_message(telegram_id, notification_text, reply_markup=channel_kb) except Exception as notify_error: logger.error( "❌ Не удалось отправить уведомление о деактивации пользователю %s: %s", telegram_id, notify_error, ) await db.commit() except Exception as db_error: logger.error( "❌ Ошибка деактивации подписки пользователя %s после отписки: %s", telegram_id, db_error, ) await db.rollback() async def _reactivate_subscription_on_subscribe(self, telegram_id: int, bot: Bot) -> None: """Реактивация подписки после повторной подписки на канал.""" if not settings.CHANNEL_DISABLE_TRIAL_ON_UNSUBSCRIBE and not settings.CHANNEL_REQUIRED_FOR_ALL: return async with AsyncSessionLocal() as db: try: user = await get_user_by_telegram_id(db, telegram_id) if not user or not user.subscription: return subscription = user.subscription # Реактивируем только DISABLED подписки if subscription.status != SubscriptionStatus.DISABLED.value: return # Проверяем что подписка ещё не истекла if subscription.end_date and subscription.end_date <= datetime.utcnow(): return # Реактивируем в БД await reactivate_subscription(db, subscription) sub_type = "Триальная" if subscription.is_trial else "Платная" logger.info( "✅ %s подписка пользователя %s реактивирована после подписки на канал", sub_type, telegram_id, ) # Включаем в RemnaWave if user.remnawave_uuid: service = SubscriptionService() try: await service.enable_remnawave_user(user.remnawave_uuid) except Exception as api_error: logger.error( "❌ Не удалось включить пользователя RemnaWave %s: %s", user.remnawave_uuid, api_error, ) # Уведомляем пользователя о реактивации try: texts = get_texts(user.language if user.language else DEFAULT_LANGUAGE) notification_text = texts.t( "SUBSCRIPTION_REACTIVATED_CHANNEL_SUBSCRIBE", "✅ Ваша подписка восстановлена!\n\n" "Спасибо, что подписались на канал. VPN снова работает." ) await bot.send_message(telegram_id, notification_text) except Exception as notify_error: logger.warning( "Не удалось отправить уведомление о реактивации пользователю %s: %s", telegram_id, notify_error, ) await db.commit() except Exception as db_error: logger.error( "❌ Ошибка реактивации подписки пользователя %s: %s", telegram_id, db_error, ) await db.rollback() @staticmethod async def _deny_message( event: TelegramObject, bot: Bot, channel_link: Optional[str], channel_id: Optional[str], ): logger.debug("🚫 Отправляем сообщение о необходимости подписки") user = None if isinstance(event, (Message, CallbackQuery)): user = getattr(event, "from_user", None) elif isinstance(event, Update): if event.message and event.message.from_user: user = event.message.from_user elif event.callback_query and event.callback_query.from_user: user = event.callback_query.from_user language = DEFAULT_LANGUAGE if user and user.language_code: language = user.language_code.split('-')[0] texts = get_texts(language) channel_sub_kb = get_channel_sub_keyboard(channel_link, language=language) text = texts.t( "CHANNEL_REQUIRED_TEXT", "🔒 Для использования бота подпишитесь на новостной канал, чтобы получать уведомления о новых возможностях и обновлениях бота. Спасибо!", ) if not channel_link and channel_id: channel_hint = None if str(channel_id).startswith("@"): # username-based channel id channel_hint = f"@{str(channel_id).lstrip('@')}" if channel_hint: text = f"{text}\n\n{channel_hint}" try: if isinstance(event, Message): return await event.answer(text, reply_markup=channel_sub_kb) elif isinstance(event, CallbackQuery): try: return await event.message.edit_text(text, reply_markup=channel_sub_kb) except TelegramBadRequest as e: if "message is not modified" in str(e).lower(): logger.debug("ℹ️ Сообщение уже содержит текст проверки подписки, пропускаем редактирование") return await event.answer(text, show_alert=True) raise elif isinstance(event, Update) and event.message: return await bot.send_message(event.message.chat.id, text, reply_markup=channel_sub_kb) except Exception as e: logger.error(f"❌ Ошибка при отправке сообщения о подписке: {e}")