Files
2026-01-17 01:14:57 +03:00

447 lines
20 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
from typing import Callable, Dict, Any, Awaitable, Optional
from datetime import datetime
from aiogram import BaseMiddleware, Bot, types
from aiogram.exceptions import TelegramForbiddenError, TelegramBadRequest
from aiogram.fsm.context import FSMContext
from aiogram.types import TelegramObject, Update, Message, CallbackQuery
from aiogram.enums import ChatMemberStatus
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import settings
from app.database.database import AsyncSessionLocal
from app.database.crud.campaign import get_campaign_by_start_parameter
from app.database.crud.subscription import deactivate_subscription, reactivate_subscription
from app.database.crud.user import get_user_by_telegram_id
from app.database.models import SubscriptionStatus, User
from app.keyboards.inline import get_channel_sub_keyboard
from app.localization.loader import DEFAULT_LANGUAGE
from app.localization.texts import get_texts
from app.utils.check_reg_process import is_registration_process
from app.services.subscription_service import SubscriptionService
from app.services.admin_notification_service import AdminNotificationService
logger = logging.getLogger(__name__)
class ChannelCheckerMiddleware(BaseMiddleware):
"""
Middleware для проверки подписки на канал.
ОПТИМИЗИРОВАНО: создаёт максимум одну сессию БД на запрос.
"""
def __init__(self):
self.BAD_MEMBER_STATUS = (
ChatMemberStatus.LEFT,
ChatMemberStatus.KICKED,
ChatMemberStatus.RESTRICTED
)
self.GOOD_MEMBER_STATUS = (
ChatMemberStatus.MEMBER,
ChatMemberStatus.ADMINISTRATOR,
ChatMemberStatus.CREATOR
)
logger.info("🔧 ChannelCheckerMiddleware инициализирован")
async def __call__(
self,
handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]],
event: TelegramObject,
data: Dict[str, Any]
) -> Any:
telegram_id = None
if isinstance(event, (Message, CallbackQuery)):
telegram_id = event.from_user.id
elif isinstance(event, Update):
if event.message:
telegram_id = event.message.from_user.id
elif event.callback_query:
telegram_id = event.callback_query.from_user.id
if telegram_id is None:
logger.debug("❌ telegram_id не найден, пропускаем")
return await handler(event, data)
# Админам разрешаем пропускать проверку подписки
if settings.is_admin(telegram_id):
logger.debug(
"✅ Пользователь %s является администратором — пропускаем проверку подписки",
telegram_id,
)
return await handler(event, data)
state: FSMContext = data.get('state')
current_state = None
if state:
current_state = await state.get_state()
is_reg_process = is_registration_process(event, current_state)
if is_reg_process:
logger.debug("✅ Событие разрешено (процесс регистрации), пропускаем проверку")
return await handler(event, data)
bot: Bot = data["bot"]
channel_id = settings.CHANNEL_SUB_ID
if not channel_id:
logger.warning("⚠️ CHANNEL_SUB_ID не установлен, пропускаем проверку")
return await handler(event, data)
is_required = settings.CHANNEL_IS_REQUIRED_SUB
if not is_required:
logger.debug("⚠️ Обязательная подписка отключена, пропускаем проверку")
return await handler(event, data)
channel_link = self._normalize_channel_link(settings.CHANNEL_LINK, channel_id)
if not channel_link:
logger.warning(
"⚠️ CHANNEL_LINK не задан или невалиден, кнопка подписки будет скрыта"
)
try:
member = await bot.get_chat_member(chat_id=channel_id, user_id=telegram_id)
if member.status in self.GOOD_MEMBER_STATUS:
# Реактивируем подписку если была отключена из-за отписки от канала
if telegram_id and (settings.CHANNEL_DISABLE_TRIAL_ON_UNSUBSCRIBE or settings.CHANNEL_REQUIRED_FOR_ALL):
await self._reactivate_subscription_on_subscribe(telegram_id, bot)
return await handler(event, data)
elif member.status in self.BAD_MEMBER_STATUS:
logger.info(f"❌ Пользователь {telegram_id} не подписан на канал (статус: {member.status})")
if telegram_id and (settings.CHANNEL_DISABLE_TRIAL_ON_UNSUBSCRIBE or settings.CHANNEL_REQUIRED_FOR_ALL):
await self._deactivate_subscription_on_unsubscribe(telegram_id, bot, channel_link)
await self._capture_start_payload(state, event, bot)
if isinstance(event, CallbackQuery) and event.data == "sub_channel_check":
await event.answer("❌ Вы еще не подписались на канал! Подпишитесь и попробуйте снова.", show_alert=True)
return
return await self._deny_message(event, bot, channel_link, channel_id)
else:
logger.warning(f"⚠️ Неожиданный статус пользователя {telegram_id}: {member.status}")
await self._capture_start_payload(state, event, bot)
return await self._deny_message(event, bot, channel_link, channel_id)
except TelegramForbiddenError as e:
logger.error(f"❌ Бот заблокирован в канале {channel_id}: {e}")
await self._capture_start_payload(state, event, bot)
return await self._deny_message(event, bot, channel_link, channel_id)
except TelegramBadRequest as e:
if "chat not found" in str(e).lower():
logger.error(f"❌ Канал {channel_id} не найден: {e}")
elif "user not found" in str(e).lower():
logger.error(f"❌ Пользователь {telegram_id} не найден: {e}")
else:
logger.error(f"❌ Ошибка запроса к каналу {channel_id}: {e}")
await self._capture_start_payload(state, event, bot)
return await self._deny_message(event, bot, channel_link, channel_id)
except Exception as e:
logger.error(f"❌ Неожиданная ошибка при проверке подписки: {e}")
return await handler(event, data)
@staticmethod
def _normalize_channel_link(channel_link: Optional[str], channel_id: Optional[str]) -> Optional[str]:
link = (channel_link or "").strip()
if link.startswith("@"): # raw username
return f"https://t.me/{link.lstrip('@')}"
if link and not link.lower().startswith(("http://", "https://", "tg://")):
return f"https://{link}"
if link:
return link
if channel_id and str(channel_id).startswith("@"):
return f"https://t.me/{str(channel_id).lstrip('@')}"
return None
async def _capture_start_payload(
self,
state: Optional[FSMContext],
event: TelegramObject,
bot: Optional[Bot] = None,
) -> None:
if not state:
return
message: Optional[Message] = None
if isinstance(event, Message):
message = event
elif isinstance(event, CallbackQuery):
message = event.message
elif isinstance(event, Update):
message = event.message
if not message or not message.text:
return
text = message.text.strip()
if not text.startswith("/start"):
return
parts = text.split(maxsplit=1)
if len(parts) < 2 or not parts[1]:
return
payload = parts[1]
state_data = await state.get_data() or {}
if state_data.get("pending_start_payload") != payload:
state_data["pending_start_payload"] = payload
await state.set_data(state_data)
logger.debug("💾 Сохранен start payload %s для последующей обработки", payload)
if bot and message.from_user:
await self._try_send_campaign_visit_notification(
bot,
message.from_user,
state,
payload,
)
async def _try_send_campaign_visit_notification(
self,
bot: Bot,
telegram_user: types.User,
state: FSMContext,
payload: str,
) -> None:
try:
state_data = await state.get_data() or {}
except Exception as error:
logger.error(
"Не удалось получить данные состояния для уведомления по кампании %s: %s",
payload,
error,
)
return
if state_data.get("campaign_notification_sent"):
return
async with AsyncSessionLocal() as db:
try:
campaign = await get_campaign_by_start_parameter(
db,
payload,
only_active=True,
)
if not campaign:
return
user = await get_user_by_telegram_id(db, telegram_user.id)
notification_service = AdminNotificationService(bot)
sent = await notification_service.send_campaign_link_visit_notification(
db,
telegram_user,
campaign,
user,
)
if sent:
await state.update_data(campaign_notification_sent=True)
await db.commit()
except Exception as error:
logger.error(
"❌ Ошибка отправки уведомления о переходе по кампании %s: %s",
payload,
error,
)
await db.rollback()
async def _deactivate_subscription_on_unsubscribe(
self, telegram_id: int, bot: Bot, channel_link: Optional[str]
) -> None:
"""Деактивация подписки при отписке от канала."""
if not settings.CHANNEL_DISABLE_TRIAL_ON_UNSUBSCRIBE and not settings.CHANNEL_REQUIRED_FOR_ALL:
return
async with AsyncSessionLocal() as db:
try:
user = await get_user_by_telegram_id(db, telegram_id)
if not user or not user.subscription:
return
subscription = user.subscription
if subscription.status != SubscriptionStatus.ACTIVE.value:
return
if settings.CHANNEL_REQUIRED_FOR_ALL:
pass
elif not subscription.is_trial:
return
await deactivate_subscription(db, subscription)
sub_type = "Триальная" if subscription.is_trial else "Платная"
logger.info(
"🚫 %s подписка пользователя %s отключена после отписки от канала",
sub_type,
telegram_id,
)
if user.remnawave_uuid:
service = SubscriptionService()
try:
await service.disable_remnawave_user(user.remnawave_uuid)
except Exception as api_error:
logger.error(
"Не удалось отключить пользователя RemnaWave %s: %s",
user.remnawave_uuid,
api_error,
)
# Уведомляем пользователя о деактивации
try:
texts = get_texts(user.language if user.language else DEFAULT_LANGUAGE)
notification_text = texts.t(
"SUBSCRIPTION_DEACTIVATED_CHANNEL_UNSUBSCRIBE",
"🚫 Ваша подписка приостановлена, так как вы отписались от канала.\n\n"
"Подпишитесь на канал снова, чтобы восстановить доступ к VPN."
)
channel_kb = get_channel_sub_keyboard(channel_link, language=user.language)
await bot.send_message(telegram_id, notification_text, reply_markup=channel_kb)
except Exception as notify_error:
logger.error(
"Не удалось отправить уведомление о деактивации пользователю %s: %s",
telegram_id,
notify_error,
)
await db.commit()
except Exception as db_error:
logger.error(
"❌ Ошибка деактивации подписки пользователя %s после отписки: %s",
telegram_id,
db_error,
)
await db.rollback()
async def _reactivate_subscription_on_subscribe(self, telegram_id: int, bot: Bot) -> None:
"""Реактивация подписки после повторной подписки на канал."""
if not settings.CHANNEL_DISABLE_TRIAL_ON_UNSUBSCRIBE and not settings.CHANNEL_REQUIRED_FOR_ALL:
return
async with AsyncSessionLocal() as db:
try:
user = await get_user_by_telegram_id(db, telegram_id)
if not user or not user.subscription:
return
subscription = user.subscription
# Реактивируем только DISABLED подписки
if subscription.status != SubscriptionStatus.DISABLED.value:
return
# Проверяем что подписка ещё не истекла
if subscription.end_date and subscription.end_date <= datetime.utcnow():
return
# Реактивируем в БД
await reactivate_subscription(db, subscription)
sub_type = "Триальная" if subscription.is_trial else "Платная"
logger.info(
"%s подписка пользователя %s реактивирована после подписки на канал",
sub_type,
telegram_id,
)
# Включаем в RemnaWave
if user.remnawave_uuid:
service = SubscriptionService()
try:
await service.enable_remnawave_user(user.remnawave_uuid)
except Exception as api_error:
logger.error(
"Не удалось включить пользователя RemnaWave %s: %s",
user.remnawave_uuid,
api_error,
)
# Уведомляем пользователя о реактивации
try:
texts = get_texts(user.language if user.language else DEFAULT_LANGUAGE)
notification_text = texts.t(
"SUBSCRIPTION_REACTIVATED_CHANNEL_SUBSCRIBE",
"✅ Ваша подписка восстановлена!\n\n"
"Спасибо, что подписались на канал. VPN снова работает."
)
await bot.send_message(telegram_id, notification_text)
except Exception as notify_error:
logger.warning(
"Не удалось отправить уведомление о реактивации пользователю %s: %s",
telegram_id,
notify_error,
)
await db.commit()
except Exception as db_error:
logger.error(
"❌ Ошибка реактивации подписки пользователя %s: %s",
telegram_id,
db_error,
)
await db.rollback()
@staticmethod
async def _deny_message(
event: TelegramObject,
bot: Bot,
channel_link: Optional[str],
channel_id: Optional[str],
):
logger.debug("🚫 Отправляем сообщение о необходимости подписки")
user = None
if isinstance(event, (Message, CallbackQuery)):
user = getattr(event, "from_user", None)
elif isinstance(event, Update):
if event.message and event.message.from_user:
user = event.message.from_user
elif event.callback_query and event.callback_query.from_user:
user = event.callback_query.from_user
language = DEFAULT_LANGUAGE
if user and user.language_code:
language = user.language_code.split('-')[0]
texts = get_texts(language)
channel_sub_kb = get_channel_sub_keyboard(channel_link, language=language)
text = texts.t(
"CHANNEL_REQUIRED_TEXT",
"🔒 Для использования бота подпишитесь на новостной канал, чтобы получать уведомления о новых возможностях и обновлениях бота. Спасибо!",
)
if not channel_link and channel_id:
channel_hint = None
if str(channel_id).startswith("@"): # username-based channel id
channel_hint = f"@{str(channel_id).lstrip('@')}"
if channel_hint:
text = f"{text}\n\n{channel_hint}"
try:
if isinstance(event, Message):
return await event.answer(text, reply_markup=channel_sub_kb)
elif isinstance(event, CallbackQuery):
try:
return await event.message.edit_text(text, reply_markup=channel_sub_kb)
except TelegramBadRequest as e:
if "message is not modified" in str(e).lower():
logger.debug(" Сообщение уже содержит текст проверки подписки, пропускаем редактирование")
return await event.answer(text, show_alert=True)
raise
elif isinstance(event, Update) and event.message:
return await bot.send_message(event.message.chat.id, text, reply_markup=channel_sub_kb)
except Exception as e:
logger.error(f"❌ Ошибка при отправке сообщения о подписке: {e}")