Merge pull request #2083 from BEDOLAGA-DEV/dev5

Dev5
This commit is contained in:
Egor
2025-11-28 03:03:03 +03:00
committed by GitHub
9 changed files with 228 additions and 79 deletions

View File

@@ -6,6 +6,7 @@ from sqlalchemy import select, func, and_
from sqlalchemy.ext.asyncio import AsyncSession
from app.database.models import User, UserMessage
from app.utils.validators import sanitize_html, validate_html_tags
logger = logging.getLogger(__name__)
@@ -17,6 +18,10 @@ async def create_user_message(
is_active: bool = True,
sort_order: int = 0
) -> UserMessage:
is_valid, error_message = validate_html_tags(message_text)
if not is_valid:
raise ValueError(error_message)
resolved_creator = created_by
if created_by is not None:
@@ -61,7 +66,7 @@ async def get_random_active_message(db: AsyncSession) -> Optional[str]:
return None
random_message = random.choice(active_messages)
return random_message.message_text
return sanitize_html(random_message.message_text)
async def get_all_user_messages(
@@ -102,8 +107,11 @@ async def update_user_message(
if not message:
return None
if message_text is not None:
is_valid, error_message = validate_html_tags(message_text)
if not is_valid:
raise ValueError(error_message)
message.message_text = message_text
if is_active is not None:

View File

@@ -2285,6 +2285,9 @@ async def show_sync_options(
"• При полной синхронизации подписки пользователей, отсутствующих в панели, будут деактивированы\n"
"• Рекомендуется делать полную синхронизацию ежедневно\n"
"• Баланс пользователей НЕ удаляется\n\n"
"⬆️ <b>Обратная синхронизация:</b>\n"
"• Отправляет активных пользователей из бота в панель\n"
"• Используйте при сбоях панели или для восстановления данных\n\n"
+ "\n".join(status_lines)
)
@@ -2295,6 +2298,12 @@ async def show_sync_options(
callback_data="sync_all_users",
)
],
[
types.InlineKeyboardButton(
text="⬆️ Синхронизация в панель",
callback_data="sync_to_panel",
)
],
[
types.InlineKeyboardButton(
text="⚙️ Настройки автосинхронизации",
@@ -2654,6 +2663,50 @@ async def sync_all_users(
)
await callback.answer()
@admin_required
@error_handler
async def sync_users_to_panel(
callback: types.CallbackQuery,
db_user: User,
db: AsyncSession,
):
await callback.message.edit_text(
"⬆️ Выполняется синхронизация данных бота в панель Remnawave...\n\n"
"Это может занять несколько минут.",
reply_markup=None,
)
remnawave_service = RemnaWaveService()
stats = await remnawave_service.sync_users_to_panel(db)
if stats["errors"] == 0:
status_emoji = ""
status_text = "успешно завершена"
else:
status_emoji = "⚠️" if (stats["created"] + stats["updated"]) > 0 else ""
status_text = "завершена с предупреждениями" if status_emoji == "⚠️" else "завершена с ошибками"
text = (
f"{status_emoji} <b>Синхронизация в панель {status_text}</b>\n\n"
"📊 <b>Результаты:</b>\n"
f"• 🆕 Создано: {stats['created']}\n"
f"• 🔄 Обновлено: {stats['updated']}\n"
f"• ❌ Ошибок: {stats['errors']}"
)
keyboard = [
[types.InlineKeyboardButton(text="🔄 Повторить", callback_data="sync_to_panel")],
[types.InlineKeyboardButton(text="🔄 Полная синхронизация", callback_data="sync_all_users")],
[types.InlineKeyboardButton(text="⬅️ К синхронизации", callback_data="admin_rw_sync")],
]
await callback.message.edit_text(
text,
reply_markup=types.InlineKeyboardMarkup(inline_keyboard=keyboard),
)
await callback.answer()
@admin_required
@error_handler
async def show_sync_recommendations(
@@ -3126,6 +3179,7 @@ def register_handlers(dp: Dispatcher):
dp.callback_query.register(cancel_auto_sync_schedule, F.data == "remnawave_auto_sync_cancel")
dp.callback_query.register(run_auto_sync_now, F.data == "remnawave_auto_sync_run")
dp.callback_query.register(sync_all_users, F.data == "sync_all_users")
dp.callback_query.register(sync_users_to_panel, F.data == "sync_to_panel")
dp.callback_query.register(show_squad_migration_menu, F.data == "admin_rw_migration")
dp.callback_query.register(paginate_migration_source, F.data.startswith("admin_migration_source_page_"))
dp.callback_query.register(handle_migration_source_selection, F.data.startswith("admin_migration_source_"))

View File

@@ -12,6 +12,11 @@ from app.database.crud.user_message import (
)
from app.database.models import User
from app.keyboards.admin import get_admin_main_keyboard
from app.utils.validators import (
get_html_help_text,
sanitize_html,
validate_html_tags,
)
from app.utils.decorators import admin_required, error_handler
from app.localization.texts import get_texts
@@ -122,8 +127,6 @@ async def add_user_message_start(
db_user: User,
db: AsyncSession
):
from app.utils.validators import get_html_help_text
await callback.message.edit_text(
f"📝 <b>Добавление нового сообщения</b>\n\n"
f"Введите текст сообщения, которое будет показываться в главном меню.\n\n"
@@ -161,8 +164,6 @@ async def process_new_message_text(
)
return
from app.utils.validators import validate_html_tags, get_html_help_text
is_valid, error_msg = validate_html_tags(message_text)
if not is_valid:
await message.answer(
@@ -312,20 +313,22 @@ async def view_user_message(
return
message = await get_user_message_by_id(db, message_id)
if not message:
await callback.answer("❌ Сообщение не найдено", show_alert=True)
return
safe_content = sanitize_html(message.message_text)
status_text = "🟢 Активно" if message.is_active else "🔴 Неактивно"
text = (
f"📋 <b>Сообщение ID {message.id}</b>\n\n"
f"<b>Статус:</b> {status_text}\n"
f"<b>Создано:</b> {message.created_at.strftime('%d.%m.%Y %H:%M')}\n"
f"<b>Обновлено:</b> {message.updated_at.strftime('%d.%m.%Y %H:%M')}\n\n"
f"<b>Содержимое:</b>\n"
f"<blockquote>{message.message_text}</blockquote>"
f"<blockquote>{safe_content}</blockquote>"
)
await callback.message.edit_text(
@@ -455,7 +458,7 @@ async def edit_user_message_start(
await callback.message.edit_text(
f"✏️ <b>Редактирование сообщения ID {message.id}</b>\n\n"
f"<b>Текущий текст:</b>\n"
f"<blockquote>{message.message_text}</blockquote>\n\n"
f"<blockquote>{sanitize_html(message.message_text)}</blockquote>\n\n"
f"Введите новый текст сообщения или отправьте /cancel для отмены:",
parse_mode="HTML"
)
@@ -489,14 +492,23 @@ async def process_edit_message_text(
return
new_text = message.text.strip()
if len(new_text) > 4000:
await message.answer(
"❌ Сообщение слишком длинное. Максимум 4000 символов.\n"
"Попробуйте еще раз или отправьте /cancel для отмены."
)
return
is_valid, error_msg = validate_html_tags(new_text)
if not is_valid:
await message.answer(
f"❌ Ошибка в HTML разметке: {error_msg}\n\n"
f"Исправьте ошибку и попробуйте еще раз, или отправьте /cancel для отмены.",
parse_mode=None
)
return
try:
updated_message = await update_user_message(
db=db,
@@ -511,7 +523,7 @@ async def process_edit_message_text(
f"<b>ID:</b> {updated_message.id}\n"
f"<b>Обновлено:</b> {updated_message.updated_at.strftime('%d.%m.%Y %H:%M')}\n\n"
f"<b>Новый текст:</b>\n"
f"<blockquote>{new_text}</blockquote>",
f"<blockquote>{sanitize_html(new_text)}</blockquote>",
reply_markup=get_user_messages_keyboard(db_user.language),
parse_mode="HTML"
)

View File

@@ -1118,6 +1118,12 @@ def get_sync_options_keyboard(language: str = "ru") -> InlineKeyboardMarkup:
callback_data="sync_all_users"
)
],
[
InlineKeyboardButton(
text=_t(texts, "ADMIN_SYNC_TO_PANEL", "⬆️ Синхронизация в панель"),
callback_data="sync_to_panel"
)
],
[
InlineKeyboardButton(
text=_t(texts, "ADMIN_SYNC_ONLY_NEW", "🆕 Только новые"),

View File

@@ -688,6 +688,7 @@
"ADMIN_SUPPORT_SUBMENU_TITLE": "🛟 **Поддержка**\n\n",
"ADMIN_SUPPORT_TICKETS": "🎫 Тикеты поддержки",
"ADMIN_SYNC_BACK": "⬅️ К синхронизации",
"ADMIN_SYNC_TO_PANEL": "⬆️ Синхронизация в панель",
"ADMIN_SYNC_CLEANUP": "🧹 Очистка",
"ADMIN_SYNC_CONFIRM": "✅ Подтвердить",
"ADMIN_SYNC_FULL": "🔄 Полная синхронизация",

View File

@@ -687,6 +687,7 @@
"ADMIN_SUPPORT_SUBMENU_TITLE": "🛟 **Підтримка**\n\n",
"ADMIN_SUPPORT_TICKETS": "🎫 Тікети підтримки",
"ADMIN_SYNC_BACK": "⬅️ До синхронізації",
"ADMIN_SYNC_TO_PANEL": "⬆️ Синхронізація в панель",
"ADMIN_SYNC_CLEANUP": "🧹 Очищення",
"ADMIN_SYNC_CONFIRM": "✅ Підтвердити",
"ADMIN_SYNC_FULL": "🔄 Повна синхронізація",

View File

@@ -256,6 +256,29 @@ class RemnaWaveService:
)
return self._now_utc() + timedelta(days=30)
def _safe_expire_at_for_panel(self, expire_at: Optional[datetime]) -> datetime:
"""Гарантирует, что дата окончания не в прошлом для панели."""
now = self._now_utc()
minimum_expire = now + timedelta(minutes=1)
if not expire_at:
return minimum_expire
normalized_expire = expire_at
if normalized_expire.tzinfo is not None:
normalized_expire = normalized_expire.replace(tzinfo=None)
if normalized_expire < minimum_expire:
logger.debug(
"⚙️ Коррекция даты истечения (%s) до минимально допустимой (%s) для панели",
normalized_expire,
minimum_expire,
)
return minimum_expire
return normalized_expire
def _safe_panel_expire_date(self, panel_user: Dict[str, Any]) -> datetime:
"""Парсит дату окончания подписки пользователя панели для сравнения."""
@@ -1615,39 +1638,28 @@ class RemnaWaveService:
async def sync_users_to_panel(self, db: AsyncSession) -> Dict[str, int]:
try:
stats = {"created": 0, "updated": 0, "errors": 0}
users = await get_users_list(db, offset=0, limit=10000)
batch_size = 100
offset = 0
async with self.get_api_client() as api:
for user in users:
if not user.subscription:
continue
while True:
users = await get_users_list(db, offset=offset, limit=batch_size)
try:
subscription = user.subscription
hwid_limit = resolve_hwid_device_limit_for_payload(subscription)
if not users:
break
if user.remnawave_uuid:
update_kwargs = dict(
uuid=user.remnawave_uuid,
status=UserStatus.ACTIVE if subscription.is_active else UserStatus.EXPIRED,
expire_at=subscription.end_date,
traffic_limit_bytes=subscription.traffic_limit_gb * (1024**3) if subscription.traffic_limit_gb > 0 else 0,
traffic_limit_strategy=TrafficLimitStrategy.MONTH,
description=settings.format_remnawave_user_description(
full_name=user.full_name,
username=user.username,
telegram_id=user.telegram_id
),
active_internal_squads=subscription.connected_squads,
)
for user in users:
if not user.subscription:
continue
if hwid_limit is not None:
update_kwargs['hwid_device_limit'] = hwid_limit
try:
subscription = user.subscription
hwid_limit = resolve_hwid_device_limit_for_payload(subscription)
expire_at = self._safe_expire_at_for_panel(subscription.end_date)
status = UserStatus.ACTIVE if subscription.is_active else UserStatus.DISABLED
await api.update_user(**update_kwargs)
stats["updated"] += 1
else:
username = settings.format_remnawave_username(
full_name=user.full_name,
username=user.username,
@@ -1656,8 +1668,8 @@ class RemnaWaveService:
create_kwargs = dict(
username=username,
expire_at=subscription.end_date,
status=UserStatus.ACTIVE if subscription.is_active else UserStatus.EXPIRED,
expire_at=expire_at,
status=status,
traffic_limit_bytes=subscription.traffic_limit_gb * (1024**3) if subscription.traffic_limit_gb > 0 else 0,
traffic_limit_strategy=TrafficLimitStrategy.MONTH,
telegram_id=user.telegram_id,
@@ -1672,20 +1684,66 @@ class RemnaWaveService:
if hwid_limit is not None:
create_kwargs['hwid_device_limit'] = hwid_limit
new_user = await api.create_user(**create_kwargs)
await update_user(db, user, remnawave_uuid=new_user.uuid)
subscription.remnawave_short_uuid = new_user.short_uuid
# Убираем немедленный коммит для пакетной обработки
# await db.commit()
stats["created"] += 1
except Exception as e:
logger.error(f"Ошибка синхронизации пользователя {user.telegram_id} в панель: {e}")
stats["errors"] += 1
logger.info(f"✅ Синхронизация в панель завершена: создано {stats['created']}, обновлено {stats['updated']}, ошибок {stats['errors']}")
if user.remnawave_uuid:
update_kwargs = dict(
uuid=user.remnawave_uuid,
status=status,
expire_at=expire_at,
traffic_limit_bytes=create_kwargs['traffic_limit_bytes'],
traffic_limit_strategy=TrafficLimitStrategy.MONTH,
description=create_kwargs['description'],
active_internal_squads=subscription.connected_squads,
)
if hwid_limit is not None:
update_kwargs['hwid_device_limit'] = hwid_limit
try:
await api.update_user(**update_kwargs)
stats["updated"] += 1
except RemnaWaveAPIError as api_error:
if api_error.status_code == 404:
logger.warning(
"⚠️ Не найден пользователь %s в панели, создаем заново",
user.remnawave_uuid,
)
new_user = await api.create_user(**create_kwargs)
user.remnawave_uuid = new_user.uuid
subscription.remnawave_short_uuid = new_user.short_uuid
stats["created"] += 1
else:
raise
else:
new_user = await api.create_user(**create_kwargs)
user.remnawave_uuid = new_user.uuid
subscription.remnawave_short_uuid = new_user.short_uuid
stats["created"] += 1
except Exception as e:
logger.error(f"Ошибка синхронизации пользователя {user.telegram_id} в панель: {e}")
stats["errors"] += 1
try:
await db.commit()
except Exception as commit_error:
logger.error(
"Ошибка фиксации транзакции при синхронизации в панель: %s",
commit_error,
)
await db.rollback()
stats["errors"] += len(users)
if len(users) < batch_size:
break
offset += batch_size
logger.info(
f"✅ Синхронизация в панель завершена: создано {stats['created']}, обновлено {stats['updated']}, ошибок {stats['errors']}"
)
return stats
except Exception as e:

View File

@@ -123,23 +123,26 @@ def validate_subscription_period(days: Union[str, int]) -> Optional[int]:
def sanitize_html(text: str) -> str:
if not text:
return text
text = html.escape(text)
for tag in ALLOWED_HTML_TAGS:
allowed_tags = ALLOWED_HTML_TAGS.union(SELF_CLOSING_TAGS)
for tag in allowed_tags:
text = re.sub(
f'&lt;{tag}(&gt;|\\s[^&]*&gt;)',
lambda m: m.group(0).replace('&lt;', '<').replace('&gt;', '>'),
text,
f'&lt;(/?{tag}\\b[^>]*)&gt;',
lambda m: "<"
+ (
m.group(1)
.replace("&quot;", "\"")
.replace("&#x27;", "'")
.replace("&amp;", "&")
)
+ ">",
text,
flags=re.IGNORECASE
)
text = re.sub(
f'&lt;/{tag}&gt;',
f'</{tag}>',
text,
flags=re.IGNORECASE
)
return text

View File

@@ -69,13 +69,16 @@ async def create_user_message_endpoint(
db: AsyncSession = Depends(get_db_session),
) -> UserMessageResponse:
created_by = getattr(token, "id", None)
message = await create_user_message(
db,
message_text=payload.message_text,
created_by=created_by,
is_active=payload.is_active,
sort_order=payload.sort_order,
)
try:
message = await create_user_message(
db,
message_text=payload.message_text,
created_by=created_by,
is_active=payload.is_active,
sort_order=payload.sort_order,
)
except ValueError as error:
raise HTTPException(status.HTTP_400_BAD_REQUEST, str(error)) from error
return _serialize(message)
@@ -88,7 +91,10 @@ async def update_user_message_endpoint(
db: AsyncSession = Depends(get_db_session),
) -> UserMessageResponse:
update_payload = payload.dict(exclude_unset=True)
message = await update_user_message(db, message_id, **update_payload)
try:
message = await update_user_message(db, message_id, **update_payload)
except ValueError as error:
raise HTTPException(status.HTTP_400_BAD_REQUEST, str(error)) from error
if not message:
raise HTTPException(status.HTTP_404_NOT_FOUND, "User message not found")