fix: flood control handling in pinned messages and XSS hardening in HTML sanitizer

- Add retry loop with backoff to _unpin_message_for_user (max 3 attempts)
- Add TelegramRetryAfter handling in _send_and_pin_message (unpin + send phases)
- Fix missing failed_count increment when all broadcast retries exhaust (for/else)
- Remove dead code in unpin_active_pinned_message (unreachable TelegramRetryAfter catch)
- Harden sanitize_html: allowlist URI schemes (http/https/tg/mailto/tel), whitelist
  tag attributes, strip all attrs from tags without explicit whitelist, full HTML
  entity decoding via html.unescape
This commit is contained in:
Fringg
2026-02-12 19:13:40 +03:00
parent 2de438426a
commit 454b83138e
2 changed files with 89 additions and 45 deletions

View File

@@ -189,6 +189,9 @@ async def broadcast_pinned_message(
)
failed_count += 1
break
else:
# All retry attempts exhausted (TelegramRetryAfter on every attempt)
failed_count += 1
for i in range(0, len(recipient_telegram_ids), 30):
batch = recipient_telegram_ids[i : i + 30]
@@ -251,23 +254,6 @@ async def unpin_active_pinned_message(
unpinned_count += 1
else:
failed_count += 1
except TelegramRetryAfter as retry_error:
delay = min(retry_error.retry_after + 1, 30)
logger.warning(
'RetryAfter while unpinning for user %s, waiting %s seconds',
telegram_id,
delay,
)
await asyncio.sleep(delay)
# Повторная попытка после ожидания
try:
success = await _unpin_message_for_user(bot, telegram_id)
if success:
unpinned_count += 1
else:
failed_count += 1
except Exception:
failed_count += 1
except Exception as error:
logger.error(
'Ошибка открепления сообщения у пользователя %s: %s',
@@ -311,6 +297,12 @@ async def _send_and_pin_message(bot: Bot, chat_id: int, pinned_message: PinnedMe
pass
except TelegramForbiddenError:
return False
except TelegramRetryAfter as e:
await asyncio.sleep(min(e.retry_after + 1, 30))
try:
await bot.unpin_all_chat_messages(chat_id=chat_id)
except (TelegramBadRequest, TelegramForbiddenError, TelegramRetryAfter):
pass
try:
if pinned_message.media_type == 'photo' and pinned_message.media_file_id:
@@ -345,6 +337,9 @@ async def _send_and_pin_message(bot: Bot, chat_id: int, pinned_message: PinnedMe
return True
except TelegramForbiddenError:
return False
except TelegramRetryAfter as e:
await asyncio.sleep(min(e.retry_after + 1, 30))
raise # Propagate to caller's retry loop
except TelegramBadRequest as error:
logger.warning(
'Некорректный запрос при отправке закрепленного сообщения в чат %s: %s',
@@ -361,18 +356,38 @@ async def _send_and_pin_message(bot: Bot, chat_id: int, pinned_message: PinnedMe
return False
async def _unpin_message_for_user(bot: Bot, chat_id: int) -> bool:
try:
await bot.unpin_all_chat_messages(chat_id=chat_id)
return True
except TelegramForbiddenError:
return False
except TelegramBadRequest:
return False
except Exception as error:
logger.error(
'Не удалось открепить сообщение у пользователя %s: %s',
chat_id,
error,
)
return False
async def _unpin_message_for_user(bot: Bot, chat_id: int, max_retries: int = 3) -> bool:
for attempt in range(max_retries):
try:
await bot.unpin_all_chat_messages(chat_id=chat_id)
return True
except TelegramRetryAfter as e:
if attempt < max_retries - 1:
delay = min(e.retry_after + 1, 30)
logger.warning(
'RetryAfter при откреплении для %s, ожидание %s сек (попытка %d/%d)',
chat_id,
delay,
attempt + 1,
max_retries,
)
await asyncio.sleep(delay)
else:
logger.warning(
'Не удалось открепить сообщение у %s после %d попыток (flood control)',
chat_id,
max_retries,
)
return False
except TelegramForbiddenError:
return False
except TelegramBadRequest:
return False
except Exception as error:
logger.error(
'Не удалось открепить сообщение у пользователя %s: %s',
chat_id,
error,
)
return False
return False

View File

@@ -1,3 +1,4 @@
import html as html_module
import re
from datetime import datetime
@@ -23,6 +24,16 @@ ALLOWED_HTML_TAGS = {
SELF_CLOSING_TAGS = {'br', 'hr', 'img'}
# Разрешённые атрибуты для HTML-тегов
ALLOWED_TAG_ATTRIBUTES = {
'a': {'href'},
'tg-emoji': {'emoji-id'},
'span': {'class'},
}
# Разрешённые URI-схемы в href (allowlist вместо blocklist)
SAFE_URI_SCHEMES = re.compile(r'^(https?://|tg://|mailto:|tel:)', re.IGNORECASE)
def validate_email(email: str) -> bool:
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
@@ -140,25 +151,43 @@ def sanitize_html(text: str) -> str:
# Обработка всех разрешенных тегов
for tag in allowed_tags:
# Паттерн: захватываем &lt;tag&gt;, &lt;/tag&gt;, или &lt;tag атрибуты&gt;
# Используем более сложный паттерн, чтобы захватить атрибуты до закрывающего &gt;
# (?s) - позволяет . захватывать новую строку
# [^>]*? - ленивый захват до >
pattern = rf'(&lt;)(/?{tag}\b)([^>]*?)(&gt;)'
def replace_tag(match):
match.group(1) # &lt;
tag_lower = tag.lower()
def replace_tag(match, _tag=tag_lower):
full_tag_content = match.group(2) # /?tagname
attrs_part = match.group(3) # атрибуты (без >)
match.group(4) # &gt;
attrs_part = match.group(3).removeprefix(' ') # атрибуты (без >)
# Убираем начальный пробел, если есть
attrs_part = attrs_part.removeprefix(' ')
if not attrs_part:
return f'<{full_tag_content}>'
# Формируем результат
if attrs_part:
# Безопасно обрабатываем атрибуты, заменяя только безопасные сущности
# Не разворачиваем &lt; и &gt; внутри атрибутов, чтобы избежать XSS
processed_attrs = attrs_part.replace('&quot;', '"').replace('&#x27;', "'")
# Полное декодирование HTML-сущностей для корректной проверки атрибутов
processed_attrs = html_module.unescape(attrs_part)
# Проверяем whitelist атрибутов для данного тега
allowed_attrs = ALLOWED_TAG_ATTRIBUTES.get(_tag)
if allowed_attrs is None:
# Тег без whitelist — удаляем ВСЕ атрибуты
return f'<{full_tag_content}>'
filtered_parts = []
for attr_match in re.finditer(r'([a-zA-Z][\w-]*)\s*=\s*(?:"([^"]*)"|\'([^\']*)\')', processed_attrs):
attr_name = attr_match.group(1).lower()
attr_value = attr_match.group(2) if attr_match.group(2) is not None else attr_match.group(3)
if attr_name not in allowed_attrs:
continue
# href: allowlist безопасных URI-схем
if attr_name == 'href':
# Нормализуем: убираем control chars и пробелы из начала значения
normalized = re.sub(r'[\x00-\x1f\x7f\s]+', '', attr_value)
if not SAFE_URI_SCHEMES.match(normalized):
continue
filtered_parts.append(f'{attr_name}="{attr_value}"')
processed_attrs = ' '.join(filtered_parts)
if processed_attrs:
return f'<{full_tag_content} {processed_attrs}>'
return f'<{full_tag_content}>'