Files
remnawave-bedolaga-telegram…/app/utils/message_patch.py
2026-01-20 00:27:50 +03:00

227 lines
8.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from pathlib import Path
from typing import Any, Dict
from aiogram.exceptions import TelegramBadRequest
from aiogram.types import FSInputFile, InaccessibleMessage, InputMediaPhoto, Message
from app.config import settings
from app.localization.texts import get_texts
LOGO_PATH = Path(settings.LOGO_FILE)
_PRIVACY_RESTRICTED_CODE = "BUTTON_USER_PRIVACY_RESTRICTED"
_TOPIC_REQUIRED_ERRORS = (
"topic must be specified",
"TOPIC_CLOSED",
"TOPIC_DELETED",
"FORUM_CLOSED",
)
def is_qr_message(message: Message) -> bool:
if isinstance(message, InaccessibleMessage):
return False
return bool(message.caption and message.caption.startswith("\U0001F517 Ваша реферальная ссылка"))
_original_answer = Message.answer
_original_edit_text = Message.edit_text
def _get_language(message: Message) -> str | None:
try:
user = message.from_user
if user and getattr(user, "language_code", None):
return user.language_code
except AttributeError:
pass
return None
def _default_privacy_hint(language: str | None) -> str:
if language and language.lower().startswith("en"):
return (
"⚠️ Telegram blocked the contact request button because of your privacy settings. "
"Please allow sharing your contact information or send the required details manually."
)
return (
"⚠️ Telegram запретил кнопку запроса контакта из-за настроек приватности. "
"Разрешите отправку контакта в настройках Telegram или отправьте данные вручную."
)
def append_privacy_hint(text: str | None, language: str | None) -> str:
base_text = text or ""
try:
hint = get_texts(language).t(
"PRIVACY_RESTRICTED_BUTTON_HINT",
default=_default_privacy_hint(language),
)
except Exception:
hint = _default_privacy_hint(language)
hint = hint.strip()
if not hint:
return base_text
if hint in base_text:
return base_text
if base_text:
return f"{base_text}\n\n{hint}"
return hint
def prepare_privacy_safe_kwargs(kwargs: Dict[str, Any] | None = None) -> Dict[str, Any]:
safe_kwargs: Dict[str, Any] = dict(kwargs or {})
safe_kwargs.pop("reply_markup", None)
return safe_kwargs
def is_privacy_restricted_error(error: Exception) -> bool:
if not isinstance(error, TelegramBadRequest):
return False
message = getattr(error, "message", "") or ""
description = str(error)
return _PRIVACY_RESTRICTED_CODE in message or _PRIVACY_RESTRICTED_CODE in description
def is_topic_required_error(error: Exception) -> bool:
"""Проверяет, является ли ошибка связанной с топиками/форумами."""
if not isinstance(error, TelegramBadRequest):
return False
description = str(error).lower()
return any(err.lower() in description for err in _TOPIC_REQUIRED_ERRORS)
async def _answer_with_photo(self: Message, text: str = None, **kwargs):
# Уважаем флаг в рантайме: если логотип выключен — не подменяем ответ
if not settings.ENABLE_LOGO_MODE:
return await _original_answer(self, text, **kwargs)
# Если caption слишком длинный для фото — отправим как текст
try:
if text is not None and len(text) > 900:
return await _original_answer(self, text, **kwargs)
except Exception:
pass
language = _get_language(self)
if LOGO_PATH.exists():
try:
# Отправляем caption как есть; при ошибке парсинга ниже сработает фоллбек
return await self.answer_photo(FSInputFile(LOGO_PATH), caption=text, **kwargs)
except TelegramBadRequest as error:
if is_topic_required_error(error):
# Канал с топиками — просто игнорируем, нельзя ответить без message_thread_id
return None
if is_privacy_restricted_error(error):
fallback_text = append_privacy_hint(text, language)
safe_kwargs = prepare_privacy_safe_kwargs(kwargs)
try:
return await _original_answer(self, fallback_text, **safe_kwargs)
except TelegramBadRequest as inner_error:
if is_topic_required_error(inner_error):
return None
raise
# Фоллбек, если Telegram ругается на caption или другое ограничение: отправим как текст
try:
return await _original_answer(self, text, **kwargs)
except TelegramBadRequest as inner_error:
if is_topic_required_error(inner_error):
return None
raise
except Exception:
try:
return await _original_answer(self, text, **kwargs)
except TelegramBadRequest as inner_error:
if is_topic_required_error(inner_error):
return None
raise
try:
return await _original_answer(self, text, **kwargs)
except TelegramBadRequest as error:
if is_topic_required_error(error):
return None
raise
async def _edit_with_photo(self: Message, text: str, **kwargs):
# Уважаем флаг в рантайме: если логотип выключен — не подменяем редактирование
if not settings.ENABLE_LOGO_MODE:
return await _original_edit_text(self, text, **kwargs)
if self.photo:
language = _get_language(self)
# Если caption потенциально слишком длинный — отправим как текст вместо caption
try:
if text is not None and len(text) > 900:
try:
await self.delete()
except Exception:
pass
return await _original_answer(self, text, **kwargs)
except Exception:
pass
# Всегда используем логотип если включен режим логотипа,
# кроме специальных случаев (QR сообщения)
if settings.ENABLE_LOGO_MODE and LOGO_PATH.exists() and not is_qr_message(self):
media = FSInputFile(LOGO_PATH)
elif is_qr_message(self) and LOGO_PATH.exists():
media = FSInputFile(LOGO_PATH)
else:
media = self.photo[-1].file_id
media_kwargs = {"media": media, "caption": text}
edit_kwargs = dict(kwargs)
if "parse_mode" in edit_kwargs:
_pm = edit_kwargs.pop("parse_mode")
media_kwargs["parse_mode"] = _pm if _pm is not None else "HTML"
else:
media_kwargs["parse_mode"] = "HTML"
try:
return await self.edit_media(InputMediaPhoto(**media_kwargs), **edit_kwargs)
except TelegramBadRequest as error:
if is_topic_required_error(error):
return None
if is_privacy_restricted_error(error):
fallback_text = append_privacy_hint(text, language)
safe_kwargs = prepare_privacy_safe_kwargs(kwargs)
try:
await self.delete()
except Exception:
pass
try:
return await _original_answer(self, fallback_text, **safe_kwargs)
except TelegramBadRequest as inner_error:
if is_topic_required_error(inner_error):
return None
raise
# Фоллбек: удалим и отправим обычный текст без фото
try:
await self.delete()
except Exception:
pass
try:
return await _original_answer(self, text, **kwargs)
except TelegramBadRequest as inner_error:
if is_topic_required_error(inner_error):
return None
raise
# Обработка ошибок MESSAGE_ID_INVALID для сообщений без фото
try:
return await _original_edit_text(self, text, **kwargs)
except TelegramBadRequest as error:
if is_topic_required_error(error):
return None
if "MESSAGE_ID_INVALID" in str(error) or "message to edit not found" in str(error).lower():
# Сообщение удалено или недоступно — просто игнорируем
return None
raise
def patch_message_methods():
if not settings.ENABLE_LOGO_MODE:
return
Message.answer = _answer_with_photo
Message.edit_text = _edit_with_photo