mirror of
https://github.com/BEDOLAGA-DEV/remnawave-bedolaga-telegram-bot.git
synced 2026-02-28 07:11:37 +00:00
Merge pull request #72 from Fr1ngg/dev
Доработка раздела с правилами + валидация
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
import logging
|
||||
from typing import Optional
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy import select, update
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from datetime import datetime
|
||||
|
||||
@@ -53,10 +53,37 @@ async def create_or_update_rules(
|
||||
await db.commit()
|
||||
await db.refresh(new_rules)
|
||||
|
||||
logger.info(f"✅ Правила для языка {language} обновлены")
|
||||
logger.info(f"✅ Правила для языка {language} обновлены (ID: {new_rules.id})")
|
||||
return new_rules
|
||||
|
||||
|
||||
async def clear_all_rules(db: AsyncSession, language: str = "ru") -> bool:
|
||||
try:
|
||||
result = await db.execute(
|
||||
update(ServiceRule)
|
||||
.where(
|
||||
ServiceRule.language == language,
|
||||
ServiceRule.is_active == True
|
||||
)
|
||||
.values(
|
||||
is_active=False,
|
||||
updated_at=datetime.utcnow()
|
||||
)
|
||||
)
|
||||
|
||||
await db.commit()
|
||||
|
||||
rows_affected = result.rowcount
|
||||
logger.info(f"✅ Очищены правила для языка {language}. Деактивировано записей: {rows_affected}")
|
||||
|
||||
return rows_affected > 0
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Ошибка при очистке правил для языка {language}: {e}")
|
||||
await db.rollback()
|
||||
raise
|
||||
|
||||
|
||||
async def get_current_rules_content(db: AsyncSession, language: str = "ru") -> str:
|
||||
rules = await get_rules_by_language(db, language)
|
||||
|
||||
@@ -79,4 +106,113 @@ async def get_current_rules_content(db: AsyncSession, language: str = "ru") -> s
|
||||
6. При возникновении вопросов обращайтесь в техническую поддержку.
|
||||
|
||||
Используя сервис, вы соглашаетесь с данными правилами.
|
||||
"""
|
||||
"""
|
||||
|
||||
|
||||
async def get_all_rules_versions(
|
||||
db: AsyncSession,
|
||||
language: str = "ru",
|
||||
limit: int = 10
|
||||
) -> list[ServiceRule]:
|
||||
result = await db.execute(
|
||||
select(ServiceRule)
|
||||
.where(ServiceRule.language == language)
|
||||
.order_by(ServiceRule.created_at.desc())
|
||||
.limit(limit)
|
||||
)
|
||||
return result.scalars().all()
|
||||
|
||||
|
||||
async def restore_rules_version(
|
||||
db: AsyncSession,
|
||||
rule_id: int,
|
||||
language: str = "ru"
|
||||
) -> Optional[ServiceRule]:
|
||||
try:
|
||||
result = await db.execute(
|
||||
select(ServiceRule).where(
|
||||
ServiceRule.id == rule_id,
|
||||
ServiceRule.language == language
|
||||
)
|
||||
)
|
||||
rule_to_restore = result.scalar_one_or_none()
|
||||
|
||||
if not rule_to_restore:
|
||||
logger.warning(f"Правило с ID {rule_id} не найдено для языка {language}")
|
||||
return None
|
||||
|
||||
await db.execute(
|
||||
update(ServiceRule)
|
||||
.where(
|
||||
ServiceRule.language == language,
|
||||
ServiceRule.is_active == True
|
||||
)
|
||||
.values(
|
||||
is_active=False,
|
||||
updated_at=datetime.utcnow()
|
||||
)
|
||||
)
|
||||
|
||||
restored_rule = ServiceRule(
|
||||
title=rule_to_restore.title,
|
||||
content=rule_to_restore.content,
|
||||
language=language,
|
||||
is_active=True,
|
||||
order=0
|
||||
)
|
||||
|
||||
db.add(restored_rule)
|
||||
await db.commit()
|
||||
await db.refresh(restored_rule)
|
||||
|
||||
logger.info(f"✅ Восстановлена версия правил ID {rule_id} как новое правило ID {restored_rule.id}")
|
||||
return restored_rule
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Ошибка при восстановлении правил ID {rule_id}: {e}")
|
||||
await db.rollback()
|
||||
raise
|
||||
|
||||
|
||||
async def get_rules_statistics(db: AsyncSession) -> dict:
|
||||
try:
|
||||
active_result = await db.execute(
|
||||
select(ServiceRule).where(ServiceRule.is_active == True)
|
||||
)
|
||||
active_rules = active_result.scalars().all()
|
||||
|
||||
all_result = await db.execute(select(ServiceRule))
|
||||
all_rules = all_result.scalars().all()
|
||||
|
||||
languages_stats = {}
|
||||
for rule in active_rules:
|
||||
lang = rule.language
|
||||
if lang not in languages_stats:
|
||||
languages_stats[lang] = {
|
||||
'active_count': 0,
|
||||
'last_updated': None,
|
||||
'content_length': 0
|
||||
}
|
||||
|
||||
languages_stats[lang]['active_count'] += 1
|
||||
languages_stats[lang]['content_length'] = len(rule.content)
|
||||
|
||||
if not languages_stats[lang]['last_updated'] or rule.updated_at > languages_stats[lang]['last_updated']:
|
||||
languages_stats[lang]['last_updated'] = rule.updated_at
|
||||
|
||||
return {
|
||||
'total_active': len(active_rules),
|
||||
'total_all_time': len(all_rules),
|
||||
'languages': languages_stats,
|
||||
'total_languages': len(languages_stats)
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Ошибка при получении статистики правил: {e}")
|
||||
return {
|
||||
'total_active': 0,
|
||||
'total_all_time': 0,
|
||||
'languages': {},
|
||||
'total_languages': 0,
|
||||
'error': str(e)
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import logging
|
||||
from aiogram import Dispatcher, types, F
|
||||
from aiogram.filters import Command
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.config import settings
|
||||
@@ -14,6 +15,8 @@ from app.keyboards.admin import (
|
||||
)
|
||||
from app.localization.texts import get_texts
|
||||
from app.utils.decorators import admin_required, error_handler
|
||||
from app.database.crud.rules import clear_all_rules, get_rules_statistics
|
||||
from app.localization.texts import clear_rules_cache
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -145,6 +148,121 @@ async def show_system_submenu(
|
||||
await callback.answer()
|
||||
|
||||
|
||||
|
||||
@admin_required
|
||||
@error_handler
|
||||
async def clear_rules_command(
|
||||
message: types.Message,
|
||||
db_user: User,
|
||||
db: AsyncSession
|
||||
):
|
||||
try:
|
||||
stats = await get_rules_statistics(db)
|
||||
|
||||
if stats['total_active'] == 0:
|
||||
await message.reply(
|
||||
"ℹ️ <b>Правила уже очищены</b>\n\n"
|
||||
"В системе нет активных правил. Используются стандартные правила по умолчанию."
|
||||
)
|
||||
return
|
||||
|
||||
success = await clear_all_rules(db, db_user.language)
|
||||
|
||||
if success:
|
||||
clear_rules_cache()
|
||||
|
||||
await message.reply(
|
||||
f"✅ <b>Правила успешно очищены!</b>\n\n"
|
||||
f"📊 <b>Статистика:</b>\n"
|
||||
f"• Очищено правил: {stats['total_active']}\n"
|
||||
f"• Язык: {db_user.language}\n"
|
||||
f"• Выполнил: {db_user.full_name}\n\n"
|
||||
f"Теперь используются стандартные правила по умолчанию."
|
||||
)
|
||||
|
||||
logger.info(f"Правила очищены командой администратором {db_user.telegram_id} ({db_user.full_name})")
|
||||
else:
|
||||
await message.reply(
|
||||
"⚠️ <b>Нет правил для очистки</b>\n\n"
|
||||
"Активные правила не найдены."
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Ошибка при очистке правил командой: {e}")
|
||||
await message.reply(
|
||||
"❌ <b>Ошибка при очистке правил</b>\n\n"
|
||||
f"Произошла ошибка: {str(e)}\n"
|
||||
"Попробуйте через админ-панель или повторите позже."
|
||||
)
|
||||
|
||||
|
||||
@admin_required
|
||||
@error_handler
|
||||
async def rules_stats_command(
|
||||
message: types.Message,
|
||||
db_user: User,
|
||||
db: AsyncSession
|
||||
):
|
||||
try:
|
||||
stats = await get_rules_statistics(db)
|
||||
|
||||
if 'error' in stats:
|
||||
await message.reply(f"❌ Ошибка получения статистики: {stats['error']}")
|
||||
return
|
||||
|
||||
text = f"📊 <b>Статистика правил сервиса</b>\n\n"
|
||||
text += f"📋 <b>Общая информация:</b>\n"
|
||||
text += f"• Активных правил: {stats['total_active']}\n"
|
||||
text += f"• Всего в истории: {stats['total_all_time']}\n"
|
||||
text += f"• Поддерживаемых языков: {stats['total_languages']}\n\n"
|
||||
|
||||
if stats['languages']:
|
||||
text += f"🌐 <b>По языкам:</b>\n"
|
||||
for lang, lang_stats in stats['languages'].items():
|
||||
text += f"• <code>{lang}</code>: {lang_stats['active_count']} правил, "
|
||||
text += f"{lang_stats['content_length']} символов\n"
|
||||
if lang_stats['last_updated']:
|
||||
text += f" Обновлено: {lang_stats['last_updated'].strftime('%d.%m.%Y %H:%M')}\n"
|
||||
else:
|
||||
text += "ℹ️ Активных правил нет - используются правила по умолчанию"
|
||||
|
||||
await message.reply(text)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Ошибка при получении статистики правил: {e}")
|
||||
await message.reply(
|
||||
f"❌ <b>Ошибка получения статистики</b>\n\n"
|
||||
f"Произошла ошибка: {str(e)}"
|
||||
)
|
||||
|
||||
|
||||
@admin_required
|
||||
@error_handler
|
||||
async def admin_commands_help(
|
||||
message: types.Message,
|
||||
db_user: User,
|
||||
db: AsyncSession
|
||||
):
|
||||
help_text = """
|
||||
🔧 <b>Доступные админские команды:</b>
|
||||
|
||||
<b>📋 Управление правилами:</b>
|
||||
• <code>/clear_rules</code> - очистить все правила
|
||||
• <code>/rules_stats</code> - статистика правил
|
||||
|
||||
<b>ℹ️ Справка:</b>
|
||||
• <code>/admin_help</code> - это сообщение
|
||||
|
||||
<b>📱 Панель управления:</b>
|
||||
Используйте кнопку "Админ панель" в главном меню для полного доступа ко всем функциям.
|
||||
|
||||
<b>⚠️ Важно:</b>
|
||||
Все команды логируются и требуют админских прав.
|
||||
"""
|
||||
|
||||
await message.reply(help_text)
|
||||
|
||||
|
||||
def register_handlers(dp: Dispatcher):
|
||||
dp.callback_query.register(
|
||||
show_admin_panel,
|
||||
@@ -175,3 +293,18 @@ def register_handlers(dp: Dispatcher):
|
||||
show_system_submenu,
|
||||
F.data == "admin_submenu_system"
|
||||
)
|
||||
|
||||
dp.message.register(
|
||||
clear_rules_command,
|
||||
Command("clear_rules")
|
||||
)
|
||||
|
||||
dp.message.register(
|
||||
rules_stats_command,
|
||||
Command("rules_stats")
|
||||
)
|
||||
|
||||
dp.message.register(
|
||||
admin_commands_help,
|
||||
Command("admin_help")
|
||||
)
|
||||
@@ -8,7 +8,8 @@ from app.states import AdminStates
|
||||
from app.database.models import User
|
||||
from app.localization.texts import get_texts
|
||||
from app.utils.decorators import admin_required, error_handler
|
||||
from app.database.crud.rules import get_current_rules_content, create_or_update_rules
|
||||
from app.utils.validators import validate_html_tags, get_html_help_text
|
||||
from app.database.crud.rules import get_current_rules_content, create_or_update_rules, clear_all_rules
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -31,7 +32,9 @@ async def show_rules_management(
|
||||
keyboard = [
|
||||
[types.InlineKeyboardButton(text="📝 Редактировать правила", callback_data="admin_edit_rules")],
|
||||
[types.InlineKeyboardButton(text="👀 Просмотр правил", callback_data="admin_view_rules")],
|
||||
[types.InlineKeyboardButton(text="⬅️ Назад", callback_data="admin_panel")]
|
||||
[types.InlineKeyboardButton(text="🗑️ Очистить правила", callback_data="admin_clear_rules")],
|
||||
[types.InlineKeyboardButton(text="ℹ️ Помощь по HTML", callback_data="admin_rules_help")],
|
||||
[types.InlineKeyboardButton(text="⬅️ Назад", callback_data="admin_submenu_settings")]
|
||||
]
|
||||
|
||||
await callback.message.edit_text(
|
||||
@@ -48,16 +51,33 @@ async def view_current_rules(
|
||||
db_user: User,
|
||||
db: AsyncSession
|
||||
):
|
||||
current_rules = await get_current_rules_content(db, db_user.language)
|
||||
|
||||
await callback.message.edit_text(
|
||||
f"📋 <b>Текущие правила сервиса</b>\n\n{current_rules}",
|
||||
reply_markup=types.InlineKeyboardMarkup(inline_keyboard=[
|
||||
[types.InlineKeyboardButton(text="✏️ Редактировать", callback_data="admin_edit_rules")],
|
||||
[types.InlineKeyboardButton(text="⬅️ Назад", callback_data="admin_rules")]
|
||||
])
|
||||
)
|
||||
await callback.answer()
|
||||
try:
|
||||
current_rules = await get_current_rules_content(db, db_user.language)
|
||||
|
||||
is_valid, error_msg = validate_html_tags(current_rules)
|
||||
warning = ""
|
||||
if not is_valid:
|
||||
warning = f"\n\n⚠️ <b>Внимание:</b> В правилах найдена ошибка HTML: {error_msg}"
|
||||
|
||||
await callback.message.edit_text(
|
||||
f"📋 <b>Текущие правила сервиса</b>\n\n{current_rules}{warning}",
|
||||
reply_markup=types.InlineKeyboardMarkup(inline_keyboard=[
|
||||
[types.InlineKeyboardButton(text="✏️ Редактировать", callback_data="admin_edit_rules")],
|
||||
[types.InlineKeyboardButton(text="🗑️ Очистить", callback_data="admin_clear_rules")],
|
||||
[types.InlineKeyboardButton(text="⬅️ Назад", callback_data="admin_rules")]
|
||||
])
|
||||
)
|
||||
await callback.answer()
|
||||
except Exception as e:
|
||||
logger.error(f"Ошибка при показе правил: {e}")
|
||||
await callback.message.edit_text(
|
||||
"❌ Ошибка при загрузке правил. Возможно, в тексте есть некорректные HTML теги.",
|
||||
reply_markup=types.InlineKeyboardMarkup(inline_keyboard=[
|
||||
[types.InlineKeyboardButton(text="🗑️ Очистить правила", callback_data="admin_clear_rules")],
|
||||
[types.InlineKeyboardButton(text="⬅️ Назад", callback_data="admin_rules")]
|
||||
])
|
||||
)
|
||||
await callback.answer()
|
||||
|
||||
|
||||
@admin_required
|
||||
@@ -68,20 +88,33 @@ async def start_edit_rules(
|
||||
state: FSMContext,
|
||||
db: AsyncSession
|
||||
):
|
||||
current_rules = await get_current_rules_content(db, db_user.language)
|
||||
|
||||
await callback.message.edit_text(
|
||||
"✏️ <b>Редактирование правил</b>\n\n"
|
||||
f"<b>Текущие правила:</b>\n{current_rules[:500]}{'...' if len(current_rules) > 500 else ''}\n\n"
|
||||
"Отправьте новый текст правил сервиса.\n\n"
|
||||
"<i>Поддерживается HTML разметка</i>",
|
||||
reply_markup=types.InlineKeyboardMarkup(inline_keyboard=[
|
||||
[types.InlineKeyboardButton(text="❌ Отмена", callback_data="admin_rules")]
|
||||
])
|
||||
)
|
||||
|
||||
await state.set_state(AdminStates.editing_rules_page)
|
||||
await callback.answer()
|
||||
try:
|
||||
current_rules = await get_current_rules_content(db, db_user.language)
|
||||
|
||||
preview = current_rules[:500] + ('...' if len(current_rules) > 500 else '')
|
||||
|
||||
text = (
|
||||
"✏️ <b>Редактирование правил</b>\n\n"
|
||||
f"<b>Текущие правила:</b>\n<code>{preview}</code>\n\n"
|
||||
"Отправьте новый текст правил сервиса.\n\n"
|
||||
"<i>Поддерживается HTML разметка. Все теги будут проверены перед сохранением.</i>\n\n"
|
||||
"💡 <b>Совет:</b> Нажмите /html_help для просмотра поддерживаемых тегов"
|
||||
)
|
||||
|
||||
await callback.message.edit_text(
|
||||
text,
|
||||
reply_markup=types.InlineKeyboardMarkup(inline_keyboard=[
|
||||
[types.InlineKeyboardButton(text="ℹ️ HTML помощь", callback_data="admin_rules_help")],
|
||||
[types.InlineKeyboardButton(text="❌ Отмена", callback_data="admin_rules")]
|
||||
])
|
||||
)
|
||||
|
||||
await state.set_state(AdminStates.editing_rules_page)
|
||||
await callback.answer()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Ошибка при начале редактирования правил: {e}")
|
||||
await callback.answer("❌ Ошибка при загрузке правил для редактирования", show_alert=True)
|
||||
|
||||
|
||||
@admin_required
|
||||
@@ -98,19 +131,61 @@ async def process_rules_edit(
|
||||
await message.answer("❌ Текст правил слишком длинный (максимум 4000 символов)")
|
||||
return
|
||||
|
||||
await message.answer(
|
||||
f"📋 <b>Предварительный просмотр новых правил:</b>\n\n{new_rules}\n\n"
|
||||
f"⚠️ <b>Внимание!</b> Новые правила будут показываться всем пользователям.\n\n"
|
||||
f"Сохранить изменения?",
|
||||
reply_markup=types.InlineKeyboardMarkup(inline_keyboard=[
|
||||
[
|
||||
types.InlineKeyboardButton(text="✅ Сохранить", callback_data="admin_save_rules"),
|
||||
types.InlineKeyboardButton(text="❌ Отмена", callback_data="admin_rules")
|
||||
]
|
||||
])
|
||||
)
|
||||
is_valid, error_msg = validate_html_tags(new_rules)
|
||||
if not is_valid:
|
||||
await message.answer(
|
||||
f"❌ <b>Ошибка в HTML разметке:</b>\n{error_msg}\n\n"
|
||||
f"Пожалуйста, исправьте ошибки и отправьте текст заново.\n\n"
|
||||
f"💡 Используйте /html_help для просмотра правильного синтаксиса",
|
||||
reply_markup=types.InlineKeyboardMarkup(inline_keyboard=[
|
||||
[types.InlineKeyboardButton(text="ℹ️ HTML помощь", callback_data="admin_rules_help")],
|
||||
[types.InlineKeyboardButton(text="❌ Отмена", callback_data="admin_rules")]
|
||||
])
|
||||
)
|
||||
return
|
||||
|
||||
await state.update_data(new_rules=new_rules)
|
||||
try:
|
||||
preview_text = f"📋 <b>Предварительный просмотр новых правил:</b>\n\n{new_rules}\n\n"
|
||||
preview_text += f"⚠️ <b>Внимание!</b> Новые правила будут показываться всем пользователям.\n\n"
|
||||
preview_text += f"Сохранить изменения?"
|
||||
|
||||
if len(preview_text) > 4000:
|
||||
preview_text = (
|
||||
"📋 <b>Предварительный просмотр новых правил:</b>\n\n"
|
||||
f"{new_rules[:500]}...\n\n"
|
||||
f"⚠️ <b>Внимание!</b> Новые правила будут показываться всем пользователям.\n\n"
|
||||
f"Текст правил: {len(new_rules)} символов\n"
|
||||
f"Сохранить изменения?"
|
||||
)
|
||||
|
||||
await message.answer(
|
||||
preview_text,
|
||||
reply_markup=types.InlineKeyboardMarkup(inline_keyboard=[
|
||||
[
|
||||
types.InlineKeyboardButton(text="✅ Сохранить", callback_data="admin_save_rules"),
|
||||
types.InlineKeyboardButton(text="❌ Отмена", callback_data="admin_rules")
|
||||
]
|
||||
])
|
||||
)
|
||||
|
||||
await state.update_data(new_rules=new_rules)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Ошибка при показе превью правил: {e}")
|
||||
await message.answer(
|
||||
"⚠️ <b>Подтверждение сохранения правил</b>\n\n"
|
||||
f"Новые правила готовы к сохранению ({len(new_rules)} символов).\n"
|
||||
f"HTML теги проверены и корректны.\n\n"
|
||||
f"Сохранить изменения?",
|
||||
reply_markup=types.InlineKeyboardMarkup(inline_keyboard=[
|
||||
[
|
||||
types.InlineKeyboardButton(text="✅ Сохранить", callback_data="admin_save_rules"),
|
||||
types.InlineKeyboardButton(text="❌ Отмена", callback_data="admin_rules")
|
||||
]
|
||||
])
|
||||
)
|
||||
|
||||
await state.update_data(new_rules=new_rules)
|
||||
|
||||
|
||||
@admin_required
|
||||
@@ -128,6 +203,20 @@ async def save_rules(
|
||||
await callback.answer("❌ Ошибка: текст правил не найден", show_alert=True)
|
||||
return
|
||||
|
||||
is_valid, error_msg = validate_html_tags(new_rules)
|
||||
if not is_valid:
|
||||
await callback.message.edit_text(
|
||||
f"❌ <b>Ошибка при сохранении:</b>\n{error_msg}\n\n"
|
||||
f"Правила не были сохранены из-за ошибок в HTML разметке.",
|
||||
reply_markup=types.InlineKeyboardMarkup(inline_keyboard=[
|
||||
[types.InlineKeyboardButton(text="🔄 Попробовать снова", callback_data="admin_edit_rules")],
|
||||
[types.InlineKeyboardButton(text="📋 К правилам", callback_data="admin_rules")]
|
||||
])
|
||||
)
|
||||
await state.clear()
|
||||
await callback.answer()
|
||||
return
|
||||
|
||||
try:
|
||||
await create_or_update_rules(
|
||||
db=db,
|
||||
@@ -142,10 +231,14 @@ async def save_rules(
|
||||
await refresh_rules_cache(db_user.language)
|
||||
|
||||
await callback.message.edit_text(
|
||||
"✅ <b>Правила сервиса обновлены!</b>\n\n"
|
||||
"Новые правила сохранены в базе данных и будут показываться пользователям.\n\n"
|
||||
"Кеш правил очищен и обновлен.",
|
||||
"✅ <b>Правила сервиса успешно обновлены!</b>\n\n"
|
||||
"✓ Новые правила сохранены в базе данных\n"
|
||||
"✓ HTML теги проверены и корректны\n"
|
||||
"✓ Кеш правил очищен и обновлен\n"
|
||||
"✓ Правила будут показываться пользователям\n\n"
|
||||
f"📊 Размер текста: {len(new_rules)} символов",
|
||||
reply_markup=types.InlineKeyboardMarkup(inline_keyboard=[
|
||||
[types.InlineKeyboardButton(text="👀 Просмотреть", callback_data="admin_view_rules")],
|
||||
[types.InlineKeyboardButton(text="📋 К правилам", callback_data="admin_rules")]
|
||||
])
|
||||
)
|
||||
@@ -156,7 +249,90 @@ async def save_rules(
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Ошибка сохранения правил: {e}")
|
||||
await callback.answer("❌ Ошибка сохранения правил", show_alert=True)
|
||||
await callback.message.edit_text(
|
||||
"❌ <b>Ошибка при сохранении правил</b>\n\n"
|
||||
"Произошла ошибка при записи в базу данных. Попробуйте еще раз.",
|
||||
reply_markup=types.InlineKeyboardMarkup(inline_keyboard=[
|
||||
[types.InlineKeyboardButton(text="🔄 Попробовать снова", callback_data="admin_save_rules")],
|
||||
[types.InlineKeyboardButton(text="📋 К правилам", callback_data="admin_rules")]
|
||||
])
|
||||
)
|
||||
await callback.answer()
|
||||
|
||||
|
||||
@admin_required
|
||||
@error_handler
|
||||
async def clear_rules_confirmation(
|
||||
callback: types.CallbackQuery,
|
||||
db_user: User,
|
||||
db: AsyncSession
|
||||
):
|
||||
await callback.message.edit_text(
|
||||
"🗑️ <b>Очистка правил сервиса</b>\n\n"
|
||||
"⚠️ <b>ВНИМАНИЕ!</b> Вы собираетесь полностью удалить все правила сервиса.\n\n"
|
||||
"После очистки пользователи будут видеть стандартные правила по умолчанию.\n\n"
|
||||
"Это действие нельзя отменить. Продолжить?",
|
||||
reply_markup=types.InlineKeyboardMarkup(inline_keyboard=[
|
||||
[
|
||||
types.InlineKeyboardButton(text="✅ Да, очистить", callback_data="admin_confirm_clear_rules"),
|
||||
types.InlineKeyboardButton(text="❌ Отмена", callback_data="admin_rules")
|
||||
]
|
||||
])
|
||||
)
|
||||
await callback.answer()
|
||||
|
||||
|
||||
@admin_required
|
||||
@error_handler
|
||||
async def confirm_clear_rules(
|
||||
callback: types.CallbackQuery,
|
||||
db_user: User,
|
||||
db: AsyncSession
|
||||
):
|
||||
try:
|
||||
await clear_all_rules(db, db_user.language)
|
||||
|
||||
from app.localization.texts import clear_rules_cache
|
||||
clear_rules_cache()
|
||||
|
||||
await callback.message.edit_text(
|
||||
"✅ <b>Правила успешно очищены!</b>\n\n"
|
||||
"✓ Все пользовательские правила удалены\n"
|
||||
"✓ Теперь используются стандартные правила\n"
|
||||
"✓ Кеш правил очищен\n\n"
|
||||
"Пользователи будут видеть правила по умолчанию.",
|
||||
reply_markup=types.InlineKeyboardMarkup(inline_keyboard=[
|
||||
[types.InlineKeyboardButton(text="📝 Создать новые", callback_data="admin_edit_rules")],
|
||||
[types.InlineKeyboardButton(text="👀 Посмотреть текущие", callback_data="admin_view_rules")],
|
||||
[types.InlineKeyboardButton(text="📋 К правилам", callback_data="admin_rules")]
|
||||
])
|
||||
)
|
||||
|
||||
logger.info(f"Правила очищены администратором {db_user.telegram_id}")
|
||||
await callback.answer()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Ошибка при очистке правил: {e}")
|
||||
await callback.answer("❌ Ошибка при очистке правил", show_alert=True)
|
||||
|
||||
|
||||
@admin_required
|
||||
@error_handler
|
||||
async def show_html_help(
|
||||
callback: types.CallbackQuery,
|
||||
db_user: User,
|
||||
db: AsyncSession
|
||||
):
|
||||
help_text = get_html_help_text()
|
||||
|
||||
await callback.message.edit_text(
|
||||
f"ℹ️ <b>Справка по HTML форматированию</b>\n\n{help_text}",
|
||||
reply_markup=types.InlineKeyboardMarkup(inline_keyboard=[
|
||||
[types.InlineKeyboardButton(text="📝 Редактировать правила", callback_data="admin_edit_rules")],
|
||||
[types.InlineKeyboardButton(text="⬅️ Назад", callback_data="admin_rules")]
|
||||
])
|
||||
)
|
||||
await callback.answer()
|
||||
|
||||
|
||||
def register_handlers(dp: Dispatcher):
|
||||
@@ -165,4 +341,9 @@ def register_handlers(dp: Dispatcher):
|
||||
dp.callback_query.register(start_edit_rules, F.data == "admin_edit_rules")
|
||||
dp.callback_query.register(save_rules, F.data == "admin_save_rules")
|
||||
|
||||
dp.callback_query.register(clear_rules_confirmation, F.data == "admin_clear_rules")
|
||||
dp.callback_query.register(confirm_clear_rules, F.data == "admin_confirm_clear_rules")
|
||||
|
||||
dp.callback_query.register(show_html_help, F.data == "admin_rules_help")
|
||||
|
||||
dp.message.register(process_rules_edit, AdminStates.editing_rules_page)
|
||||
@@ -1,5 +1,5 @@
|
||||
import re
|
||||
from typing import Optional, Union
|
||||
from typing import Optional, Union, Tuple
|
||||
from datetime import datetime
|
||||
import html
|
||||
|
||||
@@ -14,6 +14,11 @@ ALLOWED_HTML_TAGS = {
|
||||
'blockquote'
|
||||
}
|
||||
|
||||
SELF_CLOSING_TAGS = {
|
||||
'br', 'hr', 'img'
|
||||
}
|
||||
|
||||
|
||||
def validate_email(email: str) -> bool:
|
||||
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
|
||||
return re.match(pattern, email) is not None
|
||||
@@ -158,7 +163,8 @@ def validate_referral_code(code: str) -> bool:
|
||||
|
||||
return validate_promocode(code)
|
||||
|
||||
def validate_html_tags(text: str) -> tuple[bool, str]:
|
||||
|
||||
def validate_html_tags(text: str) -> Tuple[bool, str]:
|
||||
if not text:
|
||||
return True, ""
|
||||
|
||||
@@ -168,21 +174,34 @@ def validate_html_tags(text: str) -> tuple[bool, str]:
|
||||
for is_closing, tag_name in tags:
|
||||
tag_name_lower = tag_name.lower()
|
||||
|
||||
if tag_name_lower not in ALLOWED_HTML_TAGS:
|
||||
if tag_name_lower not in ALLOWED_HTML_TAGS and tag_name_lower not in SELF_CLOSING_TAGS:
|
||||
return False, f"Неподдерживаемый тег: <{tag_name}>"
|
||||
|
||||
return validate_html_structure(text)
|
||||
|
||||
|
||||
def validate_html_structure(text: str) -> Tuple[bool, str]:
|
||||
tag_pattern = r'<(/?)([a-zA-Z][a-zA-Z0-9-]*)[^>]*?/?>'
|
||||
|
||||
matches = re.finditer(tag_pattern, text)
|
||||
tag_stack = []
|
||||
for is_closing, tag_name in tags:
|
||||
tag_name_lower = tag_name.lower()
|
||||
|
||||
for match in matches:
|
||||
full_tag = match.group(0)
|
||||
is_closing = bool(match.group(1))
|
||||
tag_name = match.group(2).lower()
|
||||
|
||||
if not is_closing:
|
||||
tag_stack.append(tag_name_lower)
|
||||
else:
|
||||
if full_tag.endswith('/>') or tag_name in SELF_CLOSING_TAGS:
|
||||
continue
|
||||
|
||||
if not is_closing:
|
||||
tag_stack.append(tag_name)
|
||||
else:
|
||||
if not tag_stack:
|
||||
return False, f"Закрывающий тег без открывающего: </{tag_name}>"
|
||||
|
||||
last_tag = tag_stack.pop()
|
||||
if last_tag != tag_name_lower:
|
||||
if last_tag != tag_name:
|
||||
return False, f"Неправильная вложенность тегов: ожидался </{last_tag}>, найден </{tag_name}>"
|
||||
|
||||
if tag_stack:
|
||||
@@ -191,16 +210,65 @@ def validate_html_tags(text: str) -> tuple[bool, str]:
|
||||
return True, ""
|
||||
|
||||
|
||||
def fix_html_tags(text: str) -> str:
|
||||
if not text:
|
||||
return text
|
||||
|
||||
fixes = [
|
||||
(r'<a href=([^"\s>]+)>', r'<a href="\1">'),
|
||||
(r'<(br|hr|img[^>]*?)>', r'<\1 />'),
|
||||
(r'<<([^>]+)>>', r'<\1>'),
|
||||
(r'<\s+([^>]+)\s+>', r'<\1>'),
|
||||
]
|
||||
|
||||
result = text
|
||||
for pattern, replacement in fixes:
|
||||
result = re.sub(pattern, replacement, result, flags=re.IGNORECASE)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def get_html_help_text() -> str:
|
||||
return """<b>Поддерживаемые HTML теги:</b>
|
||||
|
||||
- <code><b>жирный</b></code> или <code><strong>жирный</strong></code>
|
||||
- <code><i>курсив</i></code> или <code><em>курсив</em></code>
|
||||
- <code><u>подчеркнутый</u></code>
|
||||
- <code><s>зачеркнутый</s></code>
|
||||
- <code><code>моноширинный</code></code>
|
||||
- <code><pre>блок кода</pre></code>
|
||||
- <code><a href="url">ссылка</a></code>
|
||||
- <code><blockquote>цитата</blockquote></code>
|
||||
• <code><b>жирный</b></code> или <code><strong>жирный</strong></code>
|
||||
• <code><i>курсив</i></code> или <code><em>курсив</em></code>
|
||||
• <code><u>подчеркнутый</u></code>
|
||||
• <code><s>зачеркнутый</s></code>
|
||||
• <code><code>моноширинный</code></code>
|
||||
• <code><pre>блок кода</pre></code>
|
||||
• <code><a href="url">ссылка</a></code>
|
||||
• <code><blockquote>цитата</blockquote></code>
|
||||
|
||||
<b>Неподдерживаемые теги:</b> <br>, <p>, <div>, <span>, <spoiler> и другие"""
|
||||
<b>⚠️ Важные правила:</b>
|
||||
• Каждый открывающий тег должен быть закрыт
|
||||
• Теги должны быть правильно вложены
|
||||
• Атрибуты ссылок берите в кавычки
|
||||
|
||||
<b>❌ Неправильно:</b>
|
||||
<code><b>жирный <i>курсив</b></i></code>
|
||||
<code><a href=google.com>ссылка</a></code>
|
||||
|
||||
<b>✅ Правильно:</b>
|
||||
<code><b>жирный <i>курсив</i></b></code>
|
||||
<code><a href="https://google.com">ссылка</a></code>"""
|
||||
|
||||
|
||||
def validate_rules_content(text: str) -> Tuple[bool, str, Optional[str]]:
|
||||
if not text or not text.strip():
|
||||
return False, "Текст правил не может быть пустым", None
|
||||
|
||||
if len(text) > 4000:
|
||||
return False, f"Текст слишком длинный: {len(text)} символов (максимум 4000)", None
|
||||
|
||||
is_valid_html, html_error = validate_html_tags(text)
|
||||
if not is_valid_html:
|
||||
fixed_text = fix_html_tags(text)
|
||||
fixed_is_valid, _ = validate_html_tags(fixed_text)
|
||||
|
||||
if fixed_is_valid and fixed_text != text:
|
||||
return False, html_error, fixed_text
|
||||
else:
|
||||
return False, html_error, None
|
||||
|
||||
return True, "", None
|
||||
|
||||
Reference in New Issue
Block a user