Improve HTML sanitization for menu messages

This commit is contained in:
Egor
2025-11-28 01:31:53 +03:00
parent d864c80457
commit 91f557e357
4 changed files with 57 additions and 35 deletions

View File

@@ -6,6 +6,7 @@ from sqlalchemy import select, func, and_
from sqlalchemy.ext.asyncio import AsyncSession
from app.database.models import User, UserMessage
from app.utils.validators import sanitize_html, validate_html_tags
logger = logging.getLogger(__name__)
@@ -17,6 +18,10 @@ async def create_user_message(
is_active: bool = True,
sort_order: int = 0
) -> UserMessage:
is_valid, error_message = validate_html_tags(message_text)
if not is_valid:
raise ValueError(error_message)
resolved_creator = created_by
if created_by is not None:
@@ -61,7 +66,7 @@ async def get_random_active_message(db: AsyncSession) -> Optional[str]:
return None
random_message = random.choice(active_messages)
return random_message.message_text
return sanitize_html(random_message.message_text)
async def get_all_user_messages(
@@ -102,8 +107,11 @@ async def update_user_message(
if not message:
return None
if message_text is not None:
is_valid, error_message = validate_html_tags(message_text)
if not is_valid:
raise ValueError(error_message)
message.message_text = message_text
if is_active is not None:

View File

@@ -12,6 +12,11 @@ from app.database.crud.user_message import (
)
from app.database.models import User
from app.keyboards.admin import get_admin_main_keyboard
from app.utils.validators import (
get_html_help_text,
sanitize_html,
validate_html_tags,
)
from app.utils.decorators import admin_required, error_handler
from app.localization.texts import get_texts
@@ -122,8 +127,6 @@ async def add_user_message_start(
db_user: User,
db: AsyncSession
):
from app.utils.validators import get_html_help_text
await callback.message.edit_text(
f"📝 <b>Добавление нового сообщения</b>\n\n"
f"Введите текст сообщения, которое будет показываться в главном меню.\n\n"
@@ -161,8 +164,6 @@ async def process_new_message_text(
)
return
from app.utils.validators import validate_html_tags, get_html_help_text
is_valid, error_msg = validate_html_tags(message_text)
if not is_valid:
await message.answer(
@@ -312,20 +313,22 @@ async def view_user_message(
return
message = await get_user_message_by_id(db, message_id)
if not message:
await callback.answer("❌ Сообщение не найдено", show_alert=True)
return
safe_content = sanitize_html(message.message_text)
status_text = "🟢 Активно" if message.is_active else "🔴 Неактивно"
text = (
f"📋 <b>Сообщение ID {message.id}</b>\n\n"
f"<b>Статус:</b> {status_text}\n"
f"<b>Создано:</b> {message.created_at.strftime('%d.%m.%Y %H:%M')}\n"
f"<b>Обновлено:</b> {message.updated_at.strftime('%d.%m.%Y %H:%M')}\n\n"
f"<b>Содержимое:</b>\n"
f"<blockquote>{message.message_text}</blockquote>"
f"<blockquote>{safe_content}</blockquote>"
)
await callback.message.edit_text(
@@ -455,7 +458,7 @@ async def edit_user_message_start(
await callback.message.edit_text(
f"✏️ <b>Редактирование сообщения ID {message.id}</b>\n\n"
f"<b>Текущий текст:</b>\n"
f"<blockquote>{message.message_text}</blockquote>\n\n"
f"<blockquote>{sanitize_html(message.message_text)}</blockquote>\n\n"
f"Введите новый текст сообщения или отправьте /cancel для отмены:",
parse_mode="HTML"
)
@@ -489,14 +492,23 @@ async def process_edit_message_text(
return
new_text = message.text.strip()
if len(new_text) > 4000:
await message.answer(
"❌ Сообщение слишком длинное. Максимум 4000 символов.\n"
"Попробуйте еще раз или отправьте /cancel для отмены."
)
return
is_valid, error_msg = validate_html_tags(new_text)
if not is_valid:
await message.answer(
f"❌ Ошибка в HTML разметке: {error_msg}\n\n"
f"Исправьте ошибку и попробуйте еще раз, или отправьте /cancel для отмены.",
parse_mode=None
)
return
try:
updated_message = await update_user_message(
db=db,
@@ -511,7 +523,7 @@ async def process_edit_message_text(
f"<b>ID:</b> {updated_message.id}\n"
f"<b>Обновлено:</b> {updated_message.updated_at.strftime('%d.%m.%Y %H:%M')}\n\n"
f"<b>Новый текст:</b>\n"
f"<blockquote>{new_text}</blockquote>",
f"<blockquote>{sanitize_html(new_text)}</blockquote>",
reply_markup=get_user_messages_keyboard(db_user.language),
parse_mode="HTML"
)

View File

@@ -123,23 +123,19 @@ def validate_subscription_period(days: Union[str, int]) -> Optional[int]:
def sanitize_html(text: str) -> str:
if not text:
return text
text = html.escape(text)
for tag in ALLOWED_HTML_TAGS:
allowed_tags = ALLOWED_HTML_TAGS.union(SELF_CLOSING_TAGS)
for tag in allowed_tags:
text = re.sub(
f'&lt;{tag}(&gt;|\\s[^&]*&gt;)',
lambda m: m.group(0).replace('&lt;', '<').replace('&gt;', '>'),
text,
f'&lt;(/?{tag}\\b[^>]*)&gt;',
lambda m: html.unescape(f"<{m.group(1)}>"),
text,
flags=re.IGNORECASE
)
text = re.sub(
f'&lt;/{tag}&gt;',
f'</{tag}>',
text,
flags=re.IGNORECASE
)
return text

View File

@@ -69,13 +69,16 @@ async def create_user_message_endpoint(
db: AsyncSession = Depends(get_db_session),
) -> UserMessageResponse:
created_by = getattr(token, "id", None)
message = await create_user_message(
db,
message_text=payload.message_text,
created_by=created_by,
is_active=payload.is_active,
sort_order=payload.sort_order,
)
try:
message = await create_user_message(
db,
message_text=payload.message_text,
created_by=created_by,
is_active=payload.is_active,
sort_order=payload.sort_order,
)
except ValueError as error:
raise HTTPException(status.HTTP_400_BAD_REQUEST, str(error)) from error
return _serialize(message)
@@ -88,7 +91,10 @@ async def update_user_message_endpoint(
db: AsyncSession = Depends(get_db_session),
) -> UserMessageResponse:
update_payload = payload.dict(exclude_unset=True)
message = await update_user_message(db, message_id, **update_payload)
try:
message = await update_user_message(db, message_id, **update_payload)
except ValueError as error:
raise HTTPException(status.HTTP_400_BAD_REQUEST, str(error)) from error
if not message:
raise HTTPException(status.HTTP_404_NOT_FOUND, "User message not found")