diff --git a/app/bot.py b/app/bot.py index a9d04550..9ca39ed4 100644 --- a/app/bot.py +++ b/app/bot.py @@ -10,7 +10,8 @@ from app.middlewares.auth import AuthMiddleware from app.middlewares.logging import LoggingMiddleware from app.middlewares.throttling import ThrottlingMiddleware from app.middlewares.subscription_checker import SubscriptionStatusMiddleware -from app.middlewares.maintenance import MaintenanceMiddleware +from app.middlewares.maintenance import MaintenanceMiddleware +from app.middlewares.display_name_restriction import DisplayNameRestrictionMiddleware from app.services.maintenance_service import maintenance_service from app.utils.cache import cache @@ -102,6 +103,10 @@ async def setup_bot() -> tuple[Bot, Dispatcher]: dp.callback_query.middleware(LoggingMiddleware()) dp.message.middleware(MaintenanceMiddleware()) dp.callback_query.middleware(MaintenanceMiddleware()) + display_name_middleware = DisplayNameRestrictionMiddleware() + dp.message.middleware(display_name_middleware) + dp.callback_query.middleware(display_name_middleware) + dp.pre_checkout_query.middleware(display_name_middleware) dp.message.middleware(ThrottlingMiddleware()) dp.callback_query.middleware(ThrottlingMiddleware()) diff --git a/app/localization/locales/en.json b/app/localization/locales/en.json index c3aa686a..8b1cdb3a 100644 --- a/app/localization/locales/en.json +++ b/app/localization/locales/en.json @@ -5,6 +5,7 @@ "COUNTRY_MANAGEMENT_NONE": "No countries connected", "PAID_FEATURE_ONLY": "⚠ This feature is available only for paid subscriptions", "PAID_FEATURE_ONLY_SHORT": "⚠ Paid subscriptions only", + "SUSPICIOUS_DISPLAY_NAME_BLOCKED": "🚫 We detected that your display name looks like a link or an official account. Please change your Telegram name and try again.", "COUNTRY_NOT_AVAILABLE_PROMOGROUP": "❌ This server is not available for your promo group", "COUNTRY_CHANGES_NOT_FOUND": "⚠️ No changes detected", "COUNTRY_CHANGES_SUCCESS_HEADER": "✅ Countries updated!\n\n", diff --git a/app/localization/locales/ru.json b/app/localization/locales/ru.json index e305d18b..ca6ba4fd 100644 --- a/app/localization/locales/ru.json +++ b/app/localization/locales/ru.json @@ -6,6 +6,7 @@ "COUNTRY_MANAGEMENT_NONE": "Нет подключенных стран", "PAID_FEATURE_ONLY": "⚠ Эта функция доступна только для платных подписок", "PAID_FEATURE_ONLY_SHORT": "⚠ Только для платных подписок", + "SUSPICIOUS_DISPLAY_NAME_BLOCKED": "🚫 Мы обнаружили, что ваше отображаемое имя похоже на ссылку или служебный аккаунт. Пожалуйста, измените имя в профиле Telegram и повторите попытку.", "COUNTRY_NOT_AVAILABLE_PROMOGROUP": "❌ Сервер недоступен для вашей промогруппы", "COUNTRY_CHANGES_NOT_FOUND": "⚠️ Изменения не обнаружены", "COUNTRY_CHANGES_SUCCESS_HEADER": "✅ Страны успешно обновлены!\n\n", diff --git a/app/middlewares/display_name_restriction.py b/app/middlewares/display_name_restriction.py new file mode 100644 index 00000000..602aba8e --- /dev/null +++ b/app/middlewares/display_name_restriction.py @@ -0,0 +1,189 @@ +import logging +import re +from typing import Any, Awaitable, Callable, Dict + +from aiogram import BaseMiddleware +from aiogram.types import ( + CallbackQuery, + Message, + PreCheckoutQuery, + TelegramObject, + User as TgUser, +) + +from app.config import settings +from app.localization.texts import get_texts + +logger = logging.getLogger(__name__) + + +ZERO_WIDTH_PATTERN = re.compile(r"[\u200B-\u200D\uFEFF]") + +LINK_PATTERNS = [ + re.compile(pattern, re.IGNORECASE) + for pattern in ( + r"t\.me/\+", + r"joinchat", + r"https?://", + r"www\.", + r"tg://", + r"telegram\.me", + r"t\.me", + ) +] + +DOMAIN_OBFUSCATION_PATTERN = re.compile( + r"(?+=]+") + +SUSPICIOUS_KEYWORDS = [ + "telegram", + "teleqram", + "teiegram", + "teieqram", + "telegrarn", + "service", + "notification", + "system", + "security", + "safety", + "support", + "moderation", + "review", + "compliance", + "abuse", + "spam", + "report", + "телеграм", + "служебн", + "уведомлен", + "поддержк", + "безопасн", + "модерац", + "жалоб", + "абуз", + "служб", + "повiдом", + "пiдтрим", +] + + +class DisplayNameRestrictionMiddleware(BaseMiddleware): + """Blocks users whose display name imitates links or official accounts.""" + + async def __call__( + self, + handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]], + event: TelegramObject, + data: Dict[str, Any], + ) -> Any: + user: TgUser | None = None + + if isinstance(event, (Message, CallbackQuery, PreCheckoutQuery)): + user = event.from_user + + if not user or user.is_bot: + return await handler(event, data) + + display_name = self._build_display_name(user) + username = user.username or "" + + display_suspicious = self._is_suspicious(display_name) + username_suspicious = self._is_suspicious(username) + + if display_suspicious or username_suspicious: + suspicious_value = display_name if display_suspicious else username + language = self._resolve_language(user, data) + texts = get_texts(language) + warning = texts.get( + "SUSPICIOUS_DISPLAY_NAME_BLOCKED", + "🚫 Ваше отображаемое имя похоже на ссылку или служебный аккаунт. " + "Пожалуйста, измените имя и попробуйте снова.", + ) + + logger.warning( + "🚫 DisplayNameRestriction: user %s blocked due to suspicious name '%s'", + user.id, + suspicious_value, + ) + + if isinstance(event, Message): + await event.answer(warning) + elif isinstance(event, CallbackQuery): + await event.answer(warning, show_alert=True) + elif isinstance(event, PreCheckoutQuery): + await event.answer(ok=False, error_message=warning) + return None + + return await handler(event, data) + + @staticmethod + def _build_display_name(user: TgUser) -> str: + parts = [user.first_name or "", user.last_name or ""] + return " ".join(part for part in parts if part).strip() + + @staticmethod + def _resolve_language(user: TgUser, data: Dict[str, Any]) -> str: + db_user = data.get("db_user") + if db_user and getattr(db_user, "language", None): + return db_user.language + language_code = getattr(user, "language_code", None) + return language_code or settings.DEFAULT_LANGUAGE + + def _is_suspicious(self, value: str) -> bool: + if not value: + return False + + cleaned = ZERO_WIDTH_PATTERN.sub("", value) + lower_value = cleaned.lower() + + if "@" in cleaned or "@" in cleaned: + return True + + if any(pattern.search(lower_value) for pattern in LINK_PATTERNS): + return True + + if DOMAIN_OBFUSCATION_PATTERN.search(lower_value): + return True + + normalized = self._normalize_text(lower_value) + collapsed = COLLAPSE_PATTERN.sub("", normalized) + + if "tme" in collapsed: + return True + + return any( + keyword in normalized or keyword in collapsed + for keyword in SUSPICIOUS_KEYWORDS + ) + + @staticmethod + def _normalize_text(value: str) -> str: + return value.translate(CHAR_TRANSLATION) +