From 5d9f6064d65e91b9cc0b9a3dbc7be5b5d373617a Mon Sep 17 00:00:00 2001 From: Egor Date: Fri, 28 Nov 2025 01:00:15 +0300 Subject: [PATCH 01/12] Validate user message HTML --- app/database/crud/user_message.py | 12 ++++++++-- app/handlers/admin/user_messages.py | 36 +++++++++++++++++++---------- app/webapi/routes/user_messages.py | 22 +++++++++++------- 3 files changed, 48 insertions(+), 22 deletions(-) diff --git a/app/database/crud/user_message.py b/app/database/crud/user_message.py index 03960eff..29159bf0 100644 --- a/app/database/crud/user_message.py +++ b/app/database/crud/user_message.py @@ -6,6 +6,7 @@ from sqlalchemy import select, func, and_ from sqlalchemy.ext.asyncio import AsyncSession from app.database.models import User, UserMessage +from app.utils.validators import sanitize_html, validate_html_tags logger = logging.getLogger(__name__) @@ -17,6 +18,10 @@ async def create_user_message( is_active: bool = True, sort_order: int = 0 ) -> UserMessage: + is_valid, error_message = validate_html_tags(message_text) + if not is_valid: + raise ValueError(error_message) + resolved_creator = created_by if created_by is not None: @@ -61,7 +66,7 @@ async def get_random_active_message(db: AsyncSession) -> Optional[str]: return None random_message = random.choice(active_messages) - return random_message.message_text + return sanitize_html(random_message.message_text) async def get_all_user_messages( @@ -102,8 +107,11 @@ async def update_user_message( if not message: return None - + if message_text is not None: + is_valid, error_message = validate_html_tags(message_text) + if not is_valid: + raise ValueError(error_message) message.message_text = message_text if is_active is not None: diff --git a/app/handlers/admin/user_messages.py b/app/handlers/admin/user_messages.py index 61533c4d..ceacd20a 100644 --- a/app/handlers/admin/user_messages.py +++ b/app/handlers/admin/user_messages.py @@ -12,6 +12,11 @@ from app.database.crud.user_message import ( ) from app.database.models import User from app.keyboards.admin import get_admin_main_keyboard +from app.utils.validators import ( + get_html_help_text, + sanitize_html, + validate_html_tags, +) from app.utils.decorators import admin_required, error_handler from app.localization.texts import get_texts @@ -122,8 +127,6 @@ async def add_user_message_start( db_user: User, db: AsyncSession ): - from app.utils.validators import get_html_help_text - await callback.message.edit_text( f"📝 Добавление нового сообщения\n\n" f"Введите текст сообщения, которое будет показываться в главном меню.\n\n" @@ -161,8 +164,6 @@ async def process_new_message_text( ) return - from app.utils.validators import validate_html_tags, get_html_help_text - is_valid, error_msg = validate_html_tags(message_text) if not is_valid: await message.answer( @@ -312,20 +313,22 @@ async def view_user_message( return message = await get_user_message_by_id(db, message_id) - + if not message: await callback.answer("❌ Сообщение не найдено", show_alert=True) return - + + safe_content = sanitize_html(message.message_text) + status_text = "🟢 Активно" if message.is_active else "🔴 Неактивно" - + text = ( f"📋 Сообщение ID {message.id}\n\n" f"Статус: {status_text}\n" f"Создано: {message.created_at.strftime('%d.%m.%Y %H:%M')}\n" f"Обновлено: {message.updated_at.strftime('%d.%m.%Y %H:%M')}\n\n" f"Содержимое:\n" - f"
{message.message_text}
" + f"
{safe_content}
" ) await callback.message.edit_text( @@ -455,7 +458,7 @@ async def edit_user_message_start( await callback.message.edit_text( f"✏️ Редактирование сообщения ID {message.id}\n\n" f"Текущий текст:\n" - f"
{message.message_text}
\n\n" + f"
{sanitize_html(message.message_text)}
\n\n" f"Введите новый текст сообщения или отправьте /cancel для отмены:", parse_mode="HTML" ) @@ -489,14 +492,23 @@ async def process_edit_message_text( return new_text = message.text.strip() - + if len(new_text) > 4000: await message.answer( "❌ Сообщение слишком длинное. Максимум 4000 символов.\n" "Попробуйте еще раз или отправьте /cancel для отмены." ) return - + + is_valid, error_msg = validate_html_tags(new_text) + if not is_valid: + await message.answer( + f"❌ Ошибка в HTML разметке: {error_msg}\n\n" + f"Исправьте ошибку и попробуйте еще раз, или отправьте /cancel для отмены.", + parse_mode=None + ) + return + try: updated_message = await update_user_message( db=db, @@ -511,7 +523,7 @@ async def process_edit_message_text( f"ID: {updated_message.id}\n" f"Обновлено: {updated_message.updated_at.strftime('%d.%m.%Y %H:%M')}\n\n" f"Новый текст:\n" - f"
{new_text}
", + f"
{sanitize_html(new_text)}
", reply_markup=get_user_messages_keyboard(db_user.language), parse_mode="HTML" ) diff --git a/app/webapi/routes/user_messages.py b/app/webapi/routes/user_messages.py index d6fba008..847cd1c0 100644 --- a/app/webapi/routes/user_messages.py +++ b/app/webapi/routes/user_messages.py @@ -69,13 +69,16 @@ async def create_user_message_endpoint( db: AsyncSession = Depends(get_db_session), ) -> UserMessageResponse: created_by = getattr(token, "id", None) - message = await create_user_message( - db, - message_text=payload.message_text, - created_by=created_by, - is_active=payload.is_active, - sort_order=payload.sort_order, - ) + try: + message = await create_user_message( + db, + message_text=payload.message_text, + created_by=created_by, + is_active=payload.is_active, + sort_order=payload.sort_order, + ) + except ValueError as error: + raise HTTPException(status.HTTP_400_BAD_REQUEST, str(error)) from error return _serialize(message) @@ -88,7 +91,10 @@ async def update_user_message_endpoint( db: AsyncSession = Depends(get_db_session), ) -> UserMessageResponse: update_payload = payload.dict(exclude_unset=True) - message = await update_user_message(db, message_id, **update_payload) + try: + message = await update_user_message(db, message_id, **update_payload) + except ValueError as error: + raise HTTPException(status.HTTP_400_BAD_REQUEST, str(error)) from error if not message: raise HTTPException(status.HTTP_404_NOT_FOUND, "User message not found") From 94cc04703bf5bf621288a09fc190c25810f1e597 Mon Sep 17 00:00:00 2001 From: Egor Date: Fri, 28 Nov 2025 01:31:39 +0300 Subject: [PATCH 02/12] Revert "Validate HTML in user messages" --- app/database/crud/user_message.py | 12 ++-------- app/handlers/admin/user_messages.py | 36 ++++++++++------------------- app/webapi/routes/user_messages.py | 22 +++++++----------- 3 files changed, 22 insertions(+), 48 deletions(-) diff --git a/app/database/crud/user_message.py b/app/database/crud/user_message.py index 29159bf0..03960eff 100644 --- a/app/database/crud/user_message.py +++ b/app/database/crud/user_message.py @@ -6,7 +6,6 @@ from sqlalchemy import select, func, and_ from sqlalchemy.ext.asyncio import AsyncSession from app.database.models import User, UserMessage -from app.utils.validators import sanitize_html, validate_html_tags logger = logging.getLogger(__name__) @@ -18,10 +17,6 @@ async def create_user_message( is_active: bool = True, sort_order: int = 0 ) -> UserMessage: - is_valid, error_message = validate_html_tags(message_text) - if not is_valid: - raise ValueError(error_message) - resolved_creator = created_by if created_by is not None: @@ -66,7 +61,7 @@ async def get_random_active_message(db: AsyncSession) -> Optional[str]: return None random_message = random.choice(active_messages) - return sanitize_html(random_message.message_text) + return random_message.message_text async def get_all_user_messages( @@ -107,11 +102,8 @@ async def update_user_message( if not message: return None - + if message_text is not None: - is_valid, error_message = validate_html_tags(message_text) - if not is_valid: - raise ValueError(error_message) message.message_text = message_text if is_active is not None: diff --git a/app/handlers/admin/user_messages.py b/app/handlers/admin/user_messages.py index ceacd20a..61533c4d 100644 --- a/app/handlers/admin/user_messages.py +++ b/app/handlers/admin/user_messages.py @@ -12,11 +12,6 @@ from app.database.crud.user_message import ( ) from app.database.models import User from app.keyboards.admin import get_admin_main_keyboard -from app.utils.validators import ( - get_html_help_text, - sanitize_html, - validate_html_tags, -) from app.utils.decorators import admin_required, error_handler from app.localization.texts import get_texts @@ -127,6 +122,8 @@ async def add_user_message_start( db_user: User, db: AsyncSession ): + from app.utils.validators import get_html_help_text + await callback.message.edit_text( f"📝 Добавление нового сообщения\n\n" f"Введите текст сообщения, которое будет показываться в главном меню.\n\n" @@ -164,6 +161,8 @@ async def process_new_message_text( ) return + from app.utils.validators import validate_html_tags, get_html_help_text + is_valid, error_msg = validate_html_tags(message_text) if not is_valid: await message.answer( @@ -313,22 +312,20 @@ async def view_user_message( return message = await get_user_message_by_id(db, message_id) - + if not message: await callback.answer("❌ Сообщение не найдено", show_alert=True) return - - safe_content = sanitize_html(message.message_text) - + status_text = "🟢 Активно" if message.is_active else "🔴 Неактивно" - + text = ( f"📋 Сообщение ID {message.id}\n\n" f"Статус: {status_text}\n" f"Создано: {message.created_at.strftime('%d.%m.%Y %H:%M')}\n" f"Обновлено: {message.updated_at.strftime('%d.%m.%Y %H:%M')}\n\n" f"Содержимое:\n" - f"
{safe_content}
" + f"
{message.message_text}
" ) await callback.message.edit_text( @@ -458,7 +455,7 @@ async def edit_user_message_start( await callback.message.edit_text( f"✏️ Редактирование сообщения ID {message.id}\n\n" f"Текущий текст:\n" - f"
{sanitize_html(message.message_text)}
\n\n" + f"
{message.message_text}
\n\n" f"Введите новый текст сообщения или отправьте /cancel для отмены:", parse_mode="HTML" ) @@ -492,23 +489,14 @@ async def process_edit_message_text( return new_text = message.text.strip() - + if len(new_text) > 4000: await message.answer( "❌ Сообщение слишком длинное. Максимум 4000 символов.\n" "Попробуйте еще раз или отправьте /cancel для отмены." ) return - - is_valid, error_msg = validate_html_tags(new_text) - if not is_valid: - await message.answer( - f"❌ Ошибка в HTML разметке: {error_msg}\n\n" - f"Исправьте ошибку и попробуйте еще раз, или отправьте /cancel для отмены.", - parse_mode=None - ) - return - + try: updated_message = await update_user_message( db=db, @@ -523,7 +511,7 @@ async def process_edit_message_text( f"ID: {updated_message.id}\n" f"Обновлено: {updated_message.updated_at.strftime('%d.%m.%Y %H:%M')}\n\n" f"Новый текст:\n" - f"
{sanitize_html(new_text)}
", + f"
{new_text}
", reply_markup=get_user_messages_keyboard(db_user.language), parse_mode="HTML" ) diff --git a/app/webapi/routes/user_messages.py b/app/webapi/routes/user_messages.py index 847cd1c0..d6fba008 100644 --- a/app/webapi/routes/user_messages.py +++ b/app/webapi/routes/user_messages.py @@ -69,16 +69,13 @@ async def create_user_message_endpoint( db: AsyncSession = Depends(get_db_session), ) -> UserMessageResponse: created_by = getattr(token, "id", None) - try: - message = await create_user_message( - db, - message_text=payload.message_text, - created_by=created_by, - is_active=payload.is_active, - sort_order=payload.sort_order, - ) - except ValueError as error: - raise HTTPException(status.HTTP_400_BAD_REQUEST, str(error)) from error + message = await create_user_message( + db, + message_text=payload.message_text, + created_by=created_by, + is_active=payload.is_active, + sort_order=payload.sort_order, + ) return _serialize(message) @@ -91,10 +88,7 @@ async def update_user_message_endpoint( db: AsyncSession = Depends(get_db_session), ) -> UserMessageResponse: update_payload = payload.dict(exclude_unset=True) - try: - message = await update_user_message(db, message_id, **update_payload) - except ValueError as error: - raise HTTPException(status.HTTP_400_BAD_REQUEST, str(error)) from error + message = await update_user_message(db, message_id, **update_payload) if not message: raise HTTPException(status.HTTP_404_NOT_FOUND, "User message not found") From 91f557e3570c213965691ca1de1821543085a30f Mon Sep 17 00:00:00 2001 From: Egor Date: Fri, 28 Nov 2025 01:31:53 +0300 Subject: [PATCH 03/12] Improve HTML sanitization for menu messages --- app/database/crud/user_message.py | 12 ++++++++-- app/handlers/admin/user_messages.py | 36 +++++++++++++++++++---------- app/utils/validators.py | 22 ++++++++---------- app/webapi/routes/user_messages.py | 22 +++++++++++------- 4 files changed, 57 insertions(+), 35 deletions(-) diff --git a/app/database/crud/user_message.py b/app/database/crud/user_message.py index 03960eff..29159bf0 100644 --- a/app/database/crud/user_message.py +++ b/app/database/crud/user_message.py @@ -6,6 +6,7 @@ from sqlalchemy import select, func, and_ from sqlalchemy.ext.asyncio import AsyncSession from app.database.models import User, UserMessage +from app.utils.validators import sanitize_html, validate_html_tags logger = logging.getLogger(__name__) @@ -17,6 +18,10 @@ async def create_user_message( is_active: bool = True, sort_order: int = 0 ) -> UserMessage: + is_valid, error_message = validate_html_tags(message_text) + if not is_valid: + raise ValueError(error_message) + resolved_creator = created_by if created_by is not None: @@ -61,7 +66,7 @@ async def get_random_active_message(db: AsyncSession) -> Optional[str]: return None random_message = random.choice(active_messages) - return random_message.message_text + return sanitize_html(random_message.message_text) async def get_all_user_messages( @@ -102,8 +107,11 @@ async def update_user_message( if not message: return None - + if message_text is not None: + is_valid, error_message = validate_html_tags(message_text) + if not is_valid: + raise ValueError(error_message) message.message_text = message_text if is_active is not None: diff --git a/app/handlers/admin/user_messages.py b/app/handlers/admin/user_messages.py index 61533c4d..ceacd20a 100644 --- a/app/handlers/admin/user_messages.py +++ b/app/handlers/admin/user_messages.py @@ -12,6 +12,11 @@ from app.database.crud.user_message import ( ) from app.database.models import User from app.keyboards.admin import get_admin_main_keyboard +from app.utils.validators import ( + get_html_help_text, + sanitize_html, + validate_html_tags, +) from app.utils.decorators import admin_required, error_handler from app.localization.texts import get_texts @@ -122,8 +127,6 @@ async def add_user_message_start( db_user: User, db: AsyncSession ): - from app.utils.validators import get_html_help_text - await callback.message.edit_text( f"📝 Добавление нового сообщения\n\n" f"Введите текст сообщения, которое будет показываться в главном меню.\n\n" @@ -161,8 +164,6 @@ async def process_new_message_text( ) return - from app.utils.validators import validate_html_tags, get_html_help_text - is_valid, error_msg = validate_html_tags(message_text) if not is_valid: await message.answer( @@ -312,20 +313,22 @@ async def view_user_message( return message = await get_user_message_by_id(db, message_id) - + if not message: await callback.answer("❌ Сообщение не найдено", show_alert=True) return - + + safe_content = sanitize_html(message.message_text) + status_text = "🟢 Активно" if message.is_active else "🔴 Неактивно" - + text = ( f"📋 Сообщение ID {message.id}\n\n" f"Статус: {status_text}\n" f"Создано: {message.created_at.strftime('%d.%m.%Y %H:%M')}\n" f"Обновлено: {message.updated_at.strftime('%d.%m.%Y %H:%M')}\n\n" f"Содержимое:\n" - f"
{message.message_text}
" + f"
{safe_content}
" ) await callback.message.edit_text( @@ -455,7 +458,7 @@ async def edit_user_message_start( await callback.message.edit_text( f"✏️ Редактирование сообщения ID {message.id}\n\n" f"Текущий текст:\n" - f"
{message.message_text}
\n\n" + f"
{sanitize_html(message.message_text)}
\n\n" f"Введите новый текст сообщения или отправьте /cancel для отмены:", parse_mode="HTML" ) @@ -489,14 +492,23 @@ async def process_edit_message_text( return new_text = message.text.strip() - + if len(new_text) > 4000: await message.answer( "❌ Сообщение слишком длинное. Максимум 4000 символов.\n" "Попробуйте еще раз или отправьте /cancel для отмены." ) return - + + is_valid, error_msg = validate_html_tags(new_text) + if not is_valid: + await message.answer( + f"❌ Ошибка в HTML разметке: {error_msg}\n\n" + f"Исправьте ошибку и попробуйте еще раз, или отправьте /cancel для отмены.", + parse_mode=None + ) + return + try: updated_message = await update_user_message( db=db, @@ -511,7 +523,7 @@ async def process_edit_message_text( f"ID: {updated_message.id}\n" f"Обновлено: {updated_message.updated_at.strftime('%d.%m.%Y %H:%M')}\n\n" f"Новый текст:\n" - f"
{new_text}
", + f"
{sanitize_html(new_text)}
", reply_markup=get_user_messages_keyboard(db_user.language), parse_mode="HTML" ) diff --git a/app/utils/validators.py b/app/utils/validators.py index c63b5f71..78245c36 100644 --- a/app/utils/validators.py +++ b/app/utils/validators.py @@ -123,23 +123,19 @@ def validate_subscription_period(days: Union[str, int]) -> Optional[int]: def sanitize_html(text: str) -> str: if not text: return text - + text = html.escape(text) - - for tag in ALLOWED_HTML_TAGS: + + allowed_tags = ALLOWED_HTML_TAGS.union(SELF_CLOSING_TAGS) + + for tag in allowed_tags: text = re.sub( - f'<{tag}(>|\\s[^&]*>)', - lambda m: m.group(0).replace('<', '<').replace('>', '>'), - text, + f'<(/?{tag}\\b[^>]*)>', + lambda m: html.unescape(f"<{m.group(1)}>"), + text, flags=re.IGNORECASE ) - text = re.sub( - f'</{tag}>', - f'', - text, - flags=re.IGNORECASE - ) - + return text diff --git a/app/webapi/routes/user_messages.py b/app/webapi/routes/user_messages.py index d6fba008..847cd1c0 100644 --- a/app/webapi/routes/user_messages.py +++ b/app/webapi/routes/user_messages.py @@ -69,13 +69,16 @@ async def create_user_message_endpoint( db: AsyncSession = Depends(get_db_session), ) -> UserMessageResponse: created_by = getattr(token, "id", None) - message = await create_user_message( - db, - message_text=payload.message_text, - created_by=created_by, - is_active=payload.is_active, - sort_order=payload.sort_order, - ) + try: + message = await create_user_message( + db, + message_text=payload.message_text, + created_by=created_by, + is_active=payload.is_active, + sort_order=payload.sort_order, + ) + except ValueError as error: + raise HTTPException(status.HTTP_400_BAD_REQUEST, str(error)) from error return _serialize(message) @@ -88,7 +91,10 @@ async def update_user_message_endpoint( db: AsyncSession = Depends(get_db_session), ) -> UserMessageResponse: update_payload = payload.dict(exclude_unset=True) - message = await update_user_message(db, message_id, **update_payload) + try: + message = await update_user_message(db, message_id, **update_payload) + except ValueError as error: + raise HTTPException(status.HTTP_400_BAD_REQUEST, str(error)) from error if not message: raise HTTPException(status.HTTP_404_NOT_FOUND, "User message not found") From bf5e0ecd4f15907f6e3e404ce9da5de1135b694e Mon Sep 17 00:00:00 2001 From: Egor Date: Fri, 28 Nov 2025 02:12:32 +0300 Subject: [PATCH 04/12] Revert "Improve HTML sanitization for menu messages" --- app/database/crud/user_message.py | 12 ++-------- app/handlers/admin/user_messages.py | 36 ++++++++++------------------- app/utils/validators.py | 22 ++++++++++-------- app/webapi/routes/user_messages.py | 22 +++++++----------- 4 files changed, 35 insertions(+), 57 deletions(-) diff --git a/app/database/crud/user_message.py b/app/database/crud/user_message.py index 29159bf0..03960eff 100644 --- a/app/database/crud/user_message.py +++ b/app/database/crud/user_message.py @@ -6,7 +6,6 @@ from sqlalchemy import select, func, and_ from sqlalchemy.ext.asyncio import AsyncSession from app.database.models import User, UserMessage -from app.utils.validators import sanitize_html, validate_html_tags logger = logging.getLogger(__name__) @@ -18,10 +17,6 @@ async def create_user_message( is_active: bool = True, sort_order: int = 0 ) -> UserMessage: - is_valid, error_message = validate_html_tags(message_text) - if not is_valid: - raise ValueError(error_message) - resolved_creator = created_by if created_by is not None: @@ -66,7 +61,7 @@ async def get_random_active_message(db: AsyncSession) -> Optional[str]: return None random_message = random.choice(active_messages) - return sanitize_html(random_message.message_text) + return random_message.message_text async def get_all_user_messages( @@ -107,11 +102,8 @@ async def update_user_message( if not message: return None - + if message_text is not None: - is_valid, error_message = validate_html_tags(message_text) - if not is_valid: - raise ValueError(error_message) message.message_text = message_text if is_active is not None: diff --git a/app/handlers/admin/user_messages.py b/app/handlers/admin/user_messages.py index ceacd20a..61533c4d 100644 --- a/app/handlers/admin/user_messages.py +++ b/app/handlers/admin/user_messages.py @@ -12,11 +12,6 @@ from app.database.crud.user_message import ( ) from app.database.models import User from app.keyboards.admin import get_admin_main_keyboard -from app.utils.validators import ( - get_html_help_text, - sanitize_html, - validate_html_tags, -) from app.utils.decorators import admin_required, error_handler from app.localization.texts import get_texts @@ -127,6 +122,8 @@ async def add_user_message_start( db_user: User, db: AsyncSession ): + from app.utils.validators import get_html_help_text + await callback.message.edit_text( f"📝 Добавление нового сообщения\n\n" f"Введите текст сообщения, которое будет показываться в главном меню.\n\n" @@ -164,6 +161,8 @@ async def process_new_message_text( ) return + from app.utils.validators import validate_html_tags, get_html_help_text + is_valid, error_msg = validate_html_tags(message_text) if not is_valid: await message.answer( @@ -313,22 +312,20 @@ async def view_user_message( return message = await get_user_message_by_id(db, message_id) - + if not message: await callback.answer("❌ Сообщение не найдено", show_alert=True) return - - safe_content = sanitize_html(message.message_text) - + status_text = "🟢 Активно" if message.is_active else "🔴 Неактивно" - + text = ( f"📋 Сообщение ID {message.id}\n\n" f"Статус: {status_text}\n" f"Создано: {message.created_at.strftime('%d.%m.%Y %H:%M')}\n" f"Обновлено: {message.updated_at.strftime('%d.%m.%Y %H:%M')}\n\n" f"Содержимое:\n" - f"
{safe_content}
" + f"
{message.message_text}
" ) await callback.message.edit_text( @@ -458,7 +455,7 @@ async def edit_user_message_start( await callback.message.edit_text( f"✏️ Редактирование сообщения ID {message.id}\n\n" f"Текущий текст:\n" - f"
{sanitize_html(message.message_text)}
\n\n" + f"
{message.message_text}
\n\n" f"Введите новый текст сообщения или отправьте /cancel для отмены:", parse_mode="HTML" ) @@ -492,23 +489,14 @@ async def process_edit_message_text( return new_text = message.text.strip() - + if len(new_text) > 4000: await message.answer( "❌ Сообщение слишком длинное. Максимум 4000 символов.\n" "Попробуйте еще раз или отправьте /cancel для отмены." ) return - - is_valid, error_msg = validate_html_tags(new_text) - if not is_valid: - await message.answer( - f"❌ Ошибка в HTML разметке: {error_msg}\n\n" - f"Исправьте ошибку и попробуйте еще раз, или отправьте /cancel для отмены.", - parse_mode=None - ) - return - + try: updated_message = await update_user_message( db=db, @@ -523,7 +511,7 @@ async def process_edit_message_text( f"ID: {updated_message.id}\n" f"Обновлено: {updated_message.updated_at.strftime('%d.%m.%Y %H:%M')}\n\n" f"Новый текст:\n" - f"
{sanitize_html(new_text)}
", + f"
{new_text}
", reply_markup=get_user_messages_keyboard(db_user.language), parse_mode="HTML" ) diff --git a/app/utils/validators.py b/app/utils/validators.py index 78245c36..c63b5f71 100644 --- a/app/utils/validators.py +++ b/app/utils/validators.py @@ -123,19 +123,23 @@ def validate_subscription_period(days: Union[str, int]) -> Optional[int]: def sanitize_html(text: str) -> str: if not text: return text - + text = html.escape(text) - - allowed_tags = ALLOWED_HTML_TAGS.union(SELF_CLOSING_TAGS) - - for tag in allowed_tags: + + for tag in ALLOWED_HTML_TAGS: text = re.sub( - f'<(/?{tag}\\b[^>]*)>', - lambda m: html.unescape(f"<{m.group(1)}>"), - text, + f'<{tag}(>|\\s[^&]*>)', + lambda m: m.group(0).replace('<', '<').replace('>', '>'), + text, flags=re.IGNORECASE ) - + text = re.sub( + f'</{tag}>', + f'', + text, + flags=re.IGNORECASE + ) + return text diff --git a/app/webapi/routes/user_messages.py b/app/webapi/routes/user_messages.py index 847cd1c0..d6fba008 100644 --- a/app/webapi/routes/user_messages.py +++ b/app/webapi/routes/user_messages.py @@ -69,16 +69,13 @@ async def create_user_message_endpoint( db: AsyncSession = Depends(get_db_session), ) -> UserMessageResponse: created_by = getattr(token, "id", None) - try: - message = await create_user_message( - db, - message_text=payload.message_text, - created_by=created_by, - is_active=payload.is_active, - sort_order=payload.sort_order, - ) - except ValueError as error: - raise HTTPException(status.HTTP_400_BAD_REQUEST, str(error)) from error + message = await create_user_message( + db, + message_text=payload.message_text, + created_by=created_by, + is_active=payload.is_active, + sort_order=payload.sort_order, + ) return _serialize(message) @@ -91,10 +88,7 @@ async def update_user_message_endpoint( db: AsyncSession = Depends(get_db_session), ) -> UserMessageResponse: update_payload = payload.dict(exclude_unset=True) - try: - message = await update_user_message(db, message_id, **update_payload) - except ValueError as error: - raise HTTPException(status.HTTP_400_BAD_REQUEST, str(error)) from error + message = await update_user_message(db, message_id, **update_payload) if not message: raise HTTPException(status.HTTP_404_NOT_FOUND, "User message not found") From 9a5b0553c6cefd698f530db95460ecf95d408160 Mon Sep 17 00:00:00 2001 From: Egor Date: Fri, 28 Nov 2025 02:22:39 +0300 Subject: [PATCH 05/12] Avoid unescaping attribute entities in sanitizer --- app/database/crud/user_message.py | 12 ++++++++-- app/handlers/admin/user_messages.py | 36 +++++++++++++++++++---------- app/utils/validators.py | 29 ++++++++++++----------- app/webapi/routes/user_messages.py | 22 +++++++++++------- 4 files changed, 64 insertions(+), 35 deletions(-) diff --git a/app/database/crud/user_message.py b/app/database/crud/user_message.py index 03960eff..29159bf0 100644 --- a/app/database/crud/user_message.py +++ b/app/database/crud/user_message.py @@ -6,6 +6,7 @@ from sqlalchemy import select, func, and_ from sqlalchemy.ext.asyncio import AsyncSession from app.database.models import User, UserMessage +from app.utils.validators import sanitize_html, validate_html_tags logger = logging.getLogger(__name__) @@ -17,6 +18,10 @@ async def create_user_message( is_active: bool = True, sort_order: int = 0 ) -> UserMessage: + is_valid, error_message = validate_html_tags(message_text) + if not is_valid: + raise ValueError(error_message) + resolved_creator = created_by if created_by is not None: @@ -61,7 +66,7 @@ async def get_random_active_message(db: AsyncSession) -> Optional[str]: return None random_message = random.choice(active_messages) - return random_message.message_text + return sanitize_html(random_message.message_text) async def get_all_user_messages( @@ -102,8 +107,11 @@ async def update_user_message( if not message: return None - + if message_text is not None: + is_valid, error_message = validate_html_tags(message_text) + if not is_valid: + raise ValueError(error_message) message.message_text = message_text if is_active is not None: diff --git a/app/handlers/admin/user_messages.py b/app/handlers/admin/user_messages.py index 61533c4d..ceacd20a 100644 --- a/app/handlers/admin/user_messages.py +++ b/app/handlers/admin/user_messages.py @@ -12,6 +12,11 @@ from app.database.crud.user_message import ( ) from app.database.models import User from app.keyboards.admin import get_admin_main_keyboard +from app.utils.validators import ( + get_html_help_text, + sanitize_html, + validate_html_tags, +) from app.utils.decorators import admin_required, error_handler from app.localization.texts import get_texts @@ -122,8 +127,6 @@ async def add_user_message_start( db_user: User, db: AsyncSession ): - from app.utils.validators import get_html_help_text - await callback.message.edit_text( f"📝 Добавление нового сообщения\n\n" f"Введите текст сообщения, которое будет показываться в главном меню.\n\n" @@ -161,8 +164,6 @@ async def process_new_message_text( ) return - from app.utils.validators import validate_html_tags, get_html_help_text - is_valid, error_msg = validate_html_tags(message_text) if not is_valid: await message.answer( @@ -312,20 +313,22 @@ async def view_user_message( return message = await get_user_message_by_id(db, message_id) - + if not message: await callback.answer("❌ Сообщение не найдено", show_alert=True) return - + + safe_content = sanitize_html(message.message_text) + status_text = "🟢 Активно" if message.is_active else "🔴 Неактивно" - + text = ( f"📋 Сообщение ID {message.id}\n\n" f"Статус: {status_text}\n" f"Создано: {message.created_at.strftime('%d.%m.%Y %H:%M')}\n" f"Обновлено: {message.updated_at.strftime('%d.%m.%Y %H:%M')}\n\n" f"Содержимое:\n" - f"
{message.message_text}
" + f"
{safe_content}
" ) await callback.message.edit_text( @@ -455,7 +458,7 @@ async def edit_user_message_start( await callback.message.edit_text( f"✏️ Редактирование сообщения ID {message.id}\n\n" f"Текущий текст:\n" - f"
{message.message_text}
\n\n" + f"
{sanitize_html(message.message_text)}
\n\n" f"Введите новый текст сообщения или отправьте /cancel для отмены:", parse_mode="HTML" ) @@ -489,14 +492,23 @@ async def process_edit_message_text( return new_text = message.text.strip() - + if len(new_text) > 4000: await message.answer( "❌ Сообщение слишком длинное. Максимум 4000 символов.\n" "Попробуйте еще раз или отправьте /cancel для отмены." ) return - + + is_valid, error_msg = validate_html_tags(new_text) + if not is_valid: + await message.answer( + f"❌ Ошибка в HTML разметке: {error_msg}\n\n" + f"Исправьте ошибку и попробуйте еще раз, или отправьте /cancel для отмены.", + parse_mode=None + ) + return + try: updated_message = await update_user_message( db=db, @@ -511,7 +523,7 @@ async def process_edit_message_text( f"ID: {updated_message.id}\n" f"Обновлено: {updated_message.updated_at.strftime('%d.%m.%Y %H:%M')}\n\n" f"Новый текст:\n" - f"
{new_text}
", + f"
{sanitize_html(new_text)}
", reply_markup=get_user_messages_keyboard(db_user.language), parse_mode="HTML" ) diff --git a/app/utils/validators.py b/app/utils/validators.py index c63b5f71..43cecbab 100644 --- a/app/utils/validators.py +++ b/app/utils/validators.py @@ -123,23 +123,26 @@ def validate_subscription_period(days: Union[str, int]) -> Optional[int]: def sanitize_html(text: str) -> str: if not text: return text - + text = html.escape(text) - - for tag in ALLOWED_HTML_TAGS: + + allowed_tags = ALLOWED_HTML_TAGS.union(SELF_CLOSING_TAGS) + + for tag in allowed_tags: text = re.sub( - f'<{tag}(>|\\s[^&]*>)', - lambda m: m.group(0).replace('<', '<').replace('>', '>'), - text, + f'<(/?{tag}\\b[^>]*)>', + lambda m: "<" + + ( + m.group(1) + .replace(""", "\"") + .replace("'", "'") + .replace("&", "&") + ) + + ">", + text, flags=re.IGNORECASE ) - text = re.sub( - f'</{tag}>', - f'', - text, - flags=re.IGNORECASE - ) - + return text diff --git a/app/webapi/routes/user_messages.py b/app/webapi/routes/user_messages.py index d6fba008..847cd1c0 100644 --- a/app/webapi/routes/user_messages.py +++ b/app/webapi/routes/user_messages.py @@ -69,13 +69,16 @@ async def create_user_message_endpoint( db: AsyncSession = Depends(get_db_session), ) -> UserMessageResponse: created_by = getattr(token, "id", None) - message = await create_user_message( - db, - message_text=payload.message_text, - created_by=created_by, - is_active=payload.is_active, - sort_order=payload.sort_order, - ) + try: + message = await create_user_message( + db, + message_text=payload.message_text, + created_by=created_by, + is_active=payload.is_active, + sort_order=payload.sort_order, + ) + except ValueError as error: + raise HTTPException(status.HTTP_400_BAD_REQUEST, str(error)) from error return _serialize(message) @@ -88,7 +91,10 @@ async def update_user_message_endpoint( db: AsyncSession = Depends(get_db_session), ) -> UserMessageResponse: update_payload = payload.dict(exclude_unset=True) - message = await update_user_message(db, message_id, **update_payload) + try: + message = await update_user_message(db, message_id, **update_payload) + except ValueError as error: + raise HTTPException(status.HTTP_400_BAD_REQUEST, str(error)) from error if not message: raise HTTPException(status.HTTP_404_NOT_FOUND, "User message not found") From 168cb5ea388fd57e9ea65e6c4864a2c93141cc30 Mon Sep 17 00:00:00 2001 From: Egor Date: Fri, 28 Nov 2025 02:34:50 +0300 Subject: [PATCH 06/12] Add reverse remnawave sync with batched upload --- app/handlers/admin/remnawave.py | 54 +++++++++++ app/keyboards/admin.py | 6 ++ app/localization/locales/ru.json | 1 + app/localization/locales/ua.json | 1 + app/services/remnawave_service.py | 152 +++++++++++++++++------------- 5 files changed, 149 insertions(+), 65 deletions(-) diff --git a/app/handlers/admin/remnawave.py b/app/handlers/admin/remnawave.py index e63c1c80..968a56f2 100644 --- a/app/handlers/admin/remnawave.py +++ b/app/handlers/admin/remnawave.py @@ -2285,6 +2285,9 @@ async def show_sync_options( "• При полной синхронизации подписки пользователей, отсутствующих в панели, будут деактивированы\n" "• Рекомендуется делать полную синхронизацию ежедневно\n" "• Баланс пользователей НЕ удаляется\n\n" + "⬆️ Обратная синхронизация:\n" + "• Отправляет активных пользователей из бота в панель\n" + "• Используйте при сбоях панели или для восстановления данных\n\n" + "\n".join(status_lines) ) @@ -2295,6 +2298,12 @@ async def show_sync_options( callback_data="sync_all_users", ) ], + [ + types.InlineKeyboardButton( + text="⬆️ Синхронизация в панель", + callback_data="sync_to_panel", + ) + ], [ types.InlineKeyboardButton( text="⚙️ Настройки автосинхронизации", @@ -2654,6 +2663,50 @@ async def sync_all_users( ) await callback.answer() + +@admin_required +@error_handler +async def sync_users_to_panel( + callback: types.CallbackQuery, + db_user: User, + db: AsyncSession, +): + await callback.message.edit_text( + "⬆️ Выполняется синхронизация данных бота в панель Remnawave...\n\n" + "Это может занять несколько минут.", + reply_markup=None, + ) + + remnawave_service = RemnaWaveService() + stats = await remnawave_service.sync_users_to_panel(db) + + if stats["errors"] == 0: + status_emoji = "✅" + status_text = "успешно завершена" + else: + status_emoji = "⚠️" if (stats["created"] + stats["updated"]) > 0 else "❌" + status_text = "завершена с предупреждениями" if status_emoji == "⚠️" else "завершена с ошибками" + + text = ( + f"{status_emoji} Синхронизация в панель {status_text}\n\n" + "📊 Результаты:\n" + f"• 🆕 Создано: {stats['created']}\n" + f"• 🔄 Обновлено: {stats['updated']}\n" + f"• ❌ Ошибок: {stats['errors']}" + ) + + keyboard = [ + [types.InlineKeyboardButton(text="🔄 Повторить", callback_data="sync_to_panel")], + [types.InlineKeyboardButton(text="🔄 Полная синхронизация", callback_data="sync_all_users")], + [types.InlineKeyboardButton(text="⬅️ К синхронизации", callback_data="admin_rw_sync")], + ] + + await callback.message.edit_text( + text, + reply_markup=types.InlineKeyboardMarkup(inline_keyboard=keyboard), + ) + await callback.answer() + @admin_required @error_handler async def show_sync_recommendations( @@ -3126,6 +3179,7 @@ def register_handlers(dp: Dispatcher): dp.callback_query.register(cancel_auto_sync_schedule, F.data == "remnawave_auto_sync_cancel") dp.callback_query.register(run_auto_sync_now, F.data == "remnawave_auto_sync_run") dp.callback_query.register(sync_all_users, F.data == "sync_all_users") + dp.callback_query.register(sync_users_to_panel, F.data == "sync_to_panel") dp.callback_query.register(show_squad_migration_menu, F.data == "admin_rw_migration") dp.callback_query.register(paginate_migration_source, F.data.startswith("admin_migration_source_page_")) dp.callback_query.register(handle_migration_source_selection, F.data.startswith("admin_migration_source_")) diff --git a/app/keyboards/admin.py b/app/keyboards/admin.py index 6272a531..964984fb 100644 --- a/app/keyboards/admin.py +++ b/app/keyboards/admin.py @@ -1118,6 +1118,12 @@ def get_sync_options_keyboard(language: str = "ru") -> InlineKeyboardMarkup: callback_data="sync_all_users" ) ], + [ + InlineKeyboardButton( + text=_t(texts, "ADMIN_SYNC_TO_PANEL", "⬆️ Синхронизация в панель"), + callback_data="sync_to_panel" + ) + ], [ InlineKeyboardButton( text=_t(texts, "ADMIN_SYNC_ONLY_NEW", "🆕 Только новые"), diff --git a/app/localization/locales/ru.json b/app/localization/locales/ru.json index 3519200a..3d3aef2c 100644 --- a/app/localization/locales/ru.json +++ b/app/localization/locales/ru.json @@ -688,6 +688,7 @@ "ADMIN_SUPPORT_SUBMENU_TITLE": "🛟 **Поддержка**\n\n", "ADMIN_SUPPORT_TICKETS": "🎫 Тикеты поддержки", "ADMIN_SYNC_BACK": "⬅️ К синхронизации", + "ADMIN_SYNC_TO_PANEL": "⬆️ Синхронизация в панель", "ADMIN_SYNC_CLEANUP": "🧹 Очистка", "ADMIN_SYNC_CONFIRM": "✅ Подтвердить", "ADMIN_SYNC_FULL": "🔄 Полная синхронизация", diff --git a/app/localization/locales/ua.json b/app/localization/locales/ua.json index f0b24625..d291f56a 100644 --- a/app/localization/locales/ua.json +++ b/app/localization/locales/ua.json @@ -687,6 +687,7 @@ "ADMIN_SUPPORT_SUBMENU_TITLE": "🛟 **Підтримка**\n\n", "ADMIN_SUPPORT_TICKETS": "🎫 Тікети підтримки", "ADMIN_SYNC_BACK": "⬅️ До синхронізації", + "ADMIN_SYNC_TO_PANEL": "⬆️ Синхронізація в панель", "ADMIN_SYNC_CLEANUP": "🧹 Очищення", "ADMIN_SYNC_CONFIRM": "✅ Підтвердити", "ADMIN_SYNC_FULL": "🔄 Повна синхронізація", diff --git a/app/services/remnawave_service.py b/app/services/remnawave_service.py index e2cb7db5..c3851c6a 100644 --- a/app/services/remnawave_service.py +++ b/app/services/remnawave_service.py @@ -1615,77 +1615,99 @@ class RemnaWaveService: async def sync_users_to_panel(self, db: AsyncSession) -> Dict[str, int]: try: stats = {"created": 0, "updated": 0, "errors": 0} - - users = await get_users_list(db, offset=0, limit=10000) - + + batch_size = 100 + offset = 0 + async with self.get_api_client() as api: - for user in users: - if not user.subscription: - continue + while True: + users = await get_users_list(db, offset=offset, limit=batch_size) + + if not users: + break + + for user in users: + if not user.subscription: + continue + + try: + subscription = user.subscription + hwid_limit = resolve_hwid_device_limit_for_payload(subscription) + + if user.remnawave_uuid: + update_kwargs = dict( + uuid=user.remnawave_uuid, + status=UserStatus.ACTIVE if subscription.is_active else UserStatus.EXPIRED, + expire_at=subscription.end_date, + traffic_limit_bytes=subscription.traffic_limit_gb * (1024**3) if subscription.traffic_limit_gb > 0 else 0, + traffic_limit_strategy=TrafficLimitStrategy.MONTH, + description=settings.format_remnawave_user_description( + full_name=user.full_name, + username=user.username, + telegram_id=user.telegram_id + ), + active_internal_squads=subscription.connected_squads, + ) + + if hwid_limit is not None: + update_kwargs['hwid_device_limit'] = hwid_limit + + await api.update_user(**update_kwargs) + stats["updated"] += 1 + else: + username = settings.format_remnawave_username( + full_name=user.full_name, + username=user.username, + telegram_id=user.telegram_id, + ) + + create_kwargs = dict( + username=username, + expire_at=subscription.end_date, + status=UserStatus.ACTIVE if subscription.is_active else UserStatus.EXPIRED, + traffic_limit_bytes=subscription.traffic_limit_gb * (1024**3) if subscription.traffic_limit_gb > 0 else 0, + traffic_limit_strategy=TrafficLimitStrategy.MONTH, + telegram_id=user.telegram_id, + description=settings.format_remnawave_user_description( + full_name=user.full_name, + username=user.username, + telegram_id=user.telegram_id + ), + active_internal_squads=subscription.connected_squads, + ) + + if hwid_limit is not None: + create_kwargs['hwid_device_limit'] = hwid_limit + + new_user = await api.create_user(**create_kwargs) + + user.remnawave_uuid = new_user.uuid + subscription.remnawave_short_uuid = new_user.short_uuid + + stats["created"] += 1 + + except Exception as e: + logger.error(f"Ошибка синхронизации пользователя {user.telegram_id} в панель: {e}") + stats["errors"] += 1 try: - subscription = user.subscription - hwid_limit = resolve_hwid_device_limit_for_payload(subscription) + await db.commit() + except Exception as commit_error: + logger.error( + "Ошибка фиксации транзакции при синхронизации в панель: %s", + commit_error, + ) + await db.rollback() + stats["errors"] += len(users) - if user.remnawave_uuid: - update_kwargs = dict( - uuid=user.remnawave_uuid, - status=UserStatus.ACTIVE if subscription.is_active else UserStatus.EXPIRED, - expire_at=subscription.end_date, - traffic_limit_bytes=subscription.traffic_limit_gb * (1024**3) if subscription.traffic_limit_gb > 0 else 0, - traffic_limit_strategy=TrafficLimitStrategy.MONTH, - description=settings.format_remnawave_user_description( - full_name=user.full_name, - username=user.username, - telegram_id=user.telegram_id - ), - active_internal_squads=subscription.connected_squads, - ) + if len(users) < batch_size: + break - if hwid_limit is not None: - update_kwargs['hwid_device_limit'] = hwid_limit + offset += batch_size - await api.update_user(**update_kwargs) - stats["updated"] += 1 - else: - username = settings.format_remnawave_username( - full_name=user.full_name, - username=user.username, - telegram_id=user.telegram_id, - ) - - create_kwargs = dict( - username=username, - expire_at=subscription.end_date, - status=UserStatus.ACTIVE if subscription.is_active else UserStatus.EXPIRED, - traffic_limit_bytes=subscription.traffic_limit_gb * (1024**3) if subscription.traffic_limit_gb > 0 else 0, - traffic_limit_strategy=TrafficLimitStrategy.MONTH, - telegram_id=user.telegram_id, - description=settings.format_remnawave_user_description( - full_name=user.full_name, - username=user.username, - telegram_id=user.telegram_id - ), - active_internal_squads=subscription.connected_squads, - ) - - if hwid_limit is not None: - create_kwargs['hwid_device_limit'] = hwid_limit - - new_user = await api.create_user(**create_kwargs) - - await update_user(db, user, remnawave_uuid=new_user.uuid) - subscription.remnawave_short_uuid = new_user.short_uuid - # Убираем немедленный коммит для пакетной обработки - # await db.commit() - - stats["created"] += 1 - - except Exception as e: - logger.error(f"Ошибка синхронизации пользователя {user.telegram_id} в панель: {e}") - stats["errors"] += 1 - - logger.info(f"✅ Синхронизация в панель завершена: создано {stats['created']}, обновлено {stats['updated']}, ошибок {stats['errors']}") + logger.info( + f"✅ Синхронизация в панель завершена: создано {stats['created']}, обновлено {stats['updated']}, ошибок {stats['errors']}" + ) return stats except Exception as e: From 92efc52f7e91542f0b7eed706ed5224a68c1033c Mon Sep 17 00:00:00 2001 From: Egor Date: Fri, 28 Nov 2025 02:48:28 +0300 Subject: [PATCH 07/12] Revert "Add reverse remnawave sync with batched upload" --- app/handlers/admin/remnawave.py | 54 ----------- app/keyboards/admin.py | 6 -- app/localization/locales/ru.json | 1 - app/localization/locales/ua.json | 1 - app/services/remnawave_service.py | 152 +++++++++++++----------------- 5 files changed, 65 insertions(+), 149 deletions(-) diff --git a/app/handlers/admin/remnawave.py b/app/handlers/admin/remnawave.py index 968a56f2..e63c1c80 100644 --- a/app/handlers/admin/remnawave.py +++ b/app/handlers/admin/remnawave.py @@ -2285,9 +2285,6 @@ async def show_sync_options( "• При полной синхронизации подписки пользователей, отсутствующих в панели, будут деактивированы\n" "• Рекомендуется делать полную синхронизацию ежедневно\n" "• Баланс пользователей НЕ удаляется\n\n" - "⬆️ Обратная синхронизация:\n" - "• Отправляет активных пользователей из бота в панель\n" - "• Используйте при сбоях панели или для восстановления данных\n\n" + "\n".join(status_lines) ) @@ -2298,12 +2295,6 @@ async def show_sync_options( callback_data="sync_all_users", ) ], - [ - types.InlineKeyboardButton( - text="⬆️ Синхронизация в панель", - callback_data="sync_to_panel", - ) - ], [ types.InlineKeyboardButton( text="⚙️ Настройки автосинхронизации", @@ -2663,50 +2654,6 @@ async def sync_all_users( ) await callback.answer() - -@admin_required -@error_handler -async def sync_users_to_panel( - callback: types.CallbackQuery, - db_user: User, - db: AsyncSession, -): - await callback.message.edit_text( - "⬆️ Выполняется синхронизация данных бота в панель Remnawave...\n\n" - "Это может занять несколько минут.", - reply_markup=None, - ) - - remnawave_service = RemnaWaveService() - stats = await remnawave_service.sync_users_to_panel(db) - - if stats["errors"] == 0: - status_emoji = "✅" - status_text = "успешно завершена" - else: - status_emoji = "⚠️" if (stats["created"] + stats["updated"]) > 0 else "❌" - status_text = "завершена с предупреждениями" if status_emoji == "⚠️" else "завершена с ошибками" - - text = ( - f"{status_emoji} Синхронизация в панель {status_text}\n\n" - "📊 Результаты:\n" - f"• 🆕 Создано: {stats['created']}\n" - f"• 🔄 Обновлено: {stats['updated']}\n" - f"• ❌ Ошибок: {stats['errors']}" - ) - - keyboard = [ - [types.InlineKeyboardButton(text="🔄 Повторить", callback_data="sync_to_panel")], - [types.InlineKeyboardButton(text="🔄 Полная синхронизация", callback_data="sync_all_users")], - [types.InlineKeyboardButton(text="⬅️ К синхронизации", callback_data="admin_rw_sync")], - ] - - await callback.message.edit_text( - text, - reply_markup=types.InlineKeyboardMarkup(inline_keyboard=keyboard), - ) - await callback.answer() - @admin_required @error_handler async def show_sync_recommendations( @@ -3179,7 +3126,6 @@ def register_handlers(dp: Dispatcher): dp.callback_query.register(cancel_auto_sync_schedule, F.data == "remnawave_auto_sync_cancel") dp.callback_query.register(run_auto_sync_now, F.data == "remnawave_auto_sync_run") dp.callback_query.register(sync_all_users, F.data == "sync_all_users") - dp.callback_query.register(sync_users_to_panel, F.data == "sync_to_panel") dp.callback_query.register(show_squad_migration_menu, F.data == "admin_rw_migration") dp.callback_query.register(paginate_migration_source, F.data.startswith("admin_migration_source_page_")) dp.callback_query.register(handle_migration_source_selection, F.data.startswith("admin_migration_source_")) diff --git a/app/keyboards/admin.py b/app/keyboards/admin.py index 964984fb..6272a531 100644 --- a/app/keyboards/admin.py +++ b/app/keyboards/admin.py @@ -1118,12 +1118,6 @@ def get_sync_options_keyboard(language: str = "ru") -> InlineKeyboardMarkup: callback_data="sync_all_users" ) ], - [ - InlineKeyboardButton( - text=_t(texts, "ADMIN_SYNC_TO_PANEL", "⬆️ Синхронизация в панель"), - callback_data="sync_to_panel" - ) - ], [ InlineKeyboardButton( text=_t(texts, "ADMIN_SYNC_ONLY_NEW", "🆕 Только новые"), diff --git a/app/localization/locales/ru.json b/app/localization/locales/ru.json index 3d3aef2c..3519200a 100644 --- a/app/localization/locales/ru.json +++ b/app/localization/locales/ru.json @@ -688,7 +688,6 @@ "ADMIN_SUPPORT_SUBMENU_TITLE": "🛟 **Поддержка**\n\n", "ADMIN_SUPPORT_TICKETS": "🎫 Тикеты поддержки", "ADMIN_SYNC_BACK": "⬅️ К синхронизации", - "ADMIN_SYNC_TO_PANEL": "⬆️ Синхронизация в панель", "ADMIN_SYNC_CLEANUP": "🧹 Очистка", "ADMIN_SYNC_CONFIRM": "✅ Подтвердить", "ADMIN_SYNC_FULL": "🔄 Полная синхронизация", diff --git a/app/localization/locales/ua.json b/app/localization/locales/ua.json index d291f56a..f0b24625 100644 --- a/app/localization/locales/ua.json +++ b/app/localization/locales/ua.json @@ -687,7 +687,6 @@ "ADMIN_SUPPORT_SUBMENU_TITLE": "🛟 **Підтримка**\n\n", "ADMIN_SUPPORT_TICKETS": "🎫 Тікети підтримки", "ADMIN_SYNC_BACK": "⬅️ До синхронізації", - "ADMIN_SYNC_TO_PANEL": "⬆️ Синхронізація в панель", "ADMIN_SYNC_CLEANUP": "🧹 Очищення", "ADMIN_SYNC_CONFIRM": "✅ Підтвердити", "ADMIN_SYNC_FULL": "🔄 Повна синхронізація", diff --git a/app/services/remnawave_service.py b/app/services/remnawave_service.py index c3851c6a..e2cb7db5 100644 --- a/app/services/remnawave_service.py +++ b/app/services/remnawave_service.py @@ -1615,99 +1615,77 @@ class RemnaWaveService: async def sync_users_to_panel(self, db: AsyncSession) -> Dict[str, int]: try: stats = {"created": 0, "updated": 0, "errors": 0} - - batch_size = 100 - offset = 0 - + + users = await get_users_list(db, offset=0, limit=10000) + async with self.get_api_client() as api: - while True: - users = await get_users_list(db, offset=offset, limit=batch_size) - - if not users: - break - - for user in users: - if not user.subscription: - continue - - try: - subscription = user.subscription - hwid_limit = resolve_hwid_device_limit_for_payload(subscription) - - if user.remnawave_uuid: - update_kwargs = dict( - uuid=user.remnawave_uuid, - status=UserStatus.ACTIVE if subscription.is_active else UserStatus.EXPIRED, - expire_at=subscription.end_date, - traffic_limit_bytes=subscription.traffic_limit_gb * (1024**3) if subscription.traffic_limit_gb > 0 else 0, - traffic_limit_strategy=TrafficLimitStrategy.MONTH, - description=settings.format_remnawave_user_description( - full_name=user.full_name, - username=user.username, - telegram_id=user.telegram_id - ), - active_internal_squads=subscription.connected_squads, - ) - - if hwid_limit is not None: - update_kwargs['hwid_device_limit'] = hwid_limit - - await api.update_user(**update_kwargs) - stats["updated"] += 1 - else: - username = settings.format_remnawave_username( - full_name=user.full_name, - username=user.username, - telegram_id=user.telegram_id, - ) - - create_kwargs = dict( - username=username, - expire_at=subscription.end_date, - status=UserStatus.ACTIVE if subscription.is_active else UserStatus.EXPIRED, - traffic_limit_bytes=subscription.traffic_limit_gb * (1024**3) if subscription.traffic_limit_gb > 0 else 0, - traffic_limit_strategy=TrafficLimitStrategy.MONTH, - telegram_id=user.telegram_id, - description=settings.format_remnawave_user_description( - full_name=user.full_name, - username=user.username, - telegram_id=user.telegram_id - ), - active_internal_squads=subscription.connected_squads, - ) - - if hwid_limit is not None: - create_kwargs['hwid_device_limit'] = hwid_limit - - new_user = await api.create_user(**create_kwargs) - - user.remnawave_uuid = new_user.uuid - subscription.remnawave_short_uuid = new_user.short_uuid - - stats["created"] += 1 - - except Exception as e: - logger.error(f"Ошибка синхронизации пользователя {user.telegram_id} в панель: {e}") - stats["errors"] += 1 + for user in users: + if not user.subscription: + continue try: - await db.commit() - except Exception as commit_error: - logger.error( - "Ошибка фиксации транзакции при синхронизации в панель: %s", - commit_error, - ) - await db.rollback() - stats["errors"] += len(users) + subscription = user.subscription + hwid_limit = resolve_hwid_device_limit_for_payload(subscription) - if len(users) < batch_size: - break + if user.remnawave_uuid: + update_kwargs = dict( + uuid=user.remnawave_uuid, + status=UserStatus.ACTIVE if subscription.is_active else UserStatus.EXPIRED, + expire_at=subscription.end_date, + traffic_limit_bytes=subscription.traffic_limit_gb * (1024**3) if subscription.traffic_limit_gb > 0 else 0, + traffic_limit_strategy=TrafficLimitStrategy.MONTH, + description=settings.format_remnawave_user_description( + full_name=user.full_name, + username=user.username, + telegram_id=user.telegram_id + ), + active_internal_squads=subscription.connected_squads, + ) - offset += batch_size + if hwid_limit is not None: + update_kwargs['hwid_device_limit'] = hwid_limit - logger.info( - f"✅ Синхронизация в панель завершена: создано {stats['created']}, обновлено {stats['updated']}, ошибок {stats['errors']}" - ) + await api.update_user(**update_kwargs) + stats["updated"] += 1 + else: + username = settings.format_remnawave_username( + full_name=user.full_name, + username=user.username, + telegram_id=user.telegram_id, + ) + + create_kwargs = dict( + username=username, + expire_at=subscription.end_date, + status=UserStatus.ACTIVE if subscription.is_active else UserStatus.EXPIRED, + traffic_limit_bytes=subscription.traffic_limit_gb * (1024**3) if subscription.traffic_limit_gb > 0 else 0, + traffic_limit_strategy=TrafficLimitStrategy.MONTH, + telegram_id=user.telegram_id, + description=settings.format_remnawave_user_description( + full_name=user.full_name, + username=user.username, + telegram_id=user.telegram_id + ), + active_internal_squads=subscription.connected_squads, + ) + + if hwid_limit is not None: + create_kwargs['hwid_device_limit'] = hwid_limit + + new_user = await api.create_user(**create_kwargs) + + await update_user(db, user, remnawave_uuid=new_user.uuid) + subscription.remnawave_short_uuid = new_user.short_uuid + # Убираем немедленный коммит для пакетной обработки + # await db.commit() + + stats["created"] += 1 + + except Exception as e: + logger.error(f"Ошибка синхронизации пользователя {user.telegram_id} в панель: {e}") + stats["errors"] += 1 + + logger.info(f"✅ Синхронизация в панель завершена: создано {stats['created']}, обновлено {stats['updated']}, ошибок {stats['errors']}") return stats except Exception as e: From 1f75413abd360530f2a97dd2bfa578cdf842a06f Mon Sep 17 00:00:00 2001 From: Egor Date: Fri, 28 Nov 2025 02:48:49 +0300 Subject: [PATCH 08/12] Handle RemnaWave API status and expire constraints --- app/handlers/admin/remnawave.py | 54 ++++++++ app/handlers/admin/users.py | 26 +++- app/keyboards/admin.py | 6 + app/localization/locales/ru.json | 1 + app/localization/locales/ua.json | 1 + app/services/monitoring_service.py | 22 +++- app/services/remnawave_service.py | 183 +++++++++++++++++---------- app/services/subscription_service.py | 68 ++++++++-- 8 files changed, 274 insertions(+), 87 deletions(-) diff --git a/app/handlers/admin/remnawave.py b/app/handlers/admin/remnawave.py index e63c1c80..968a56f2 100644 --- a/app/handlers/admin/remnawave.py +++ b/app/handlers/admin/remnawave.py @@ -2285,6 +2285,9 @@ async def show_sync_options( "• При полной синхронизации подписки пользователей, отсутствующих в панели, будут деактивированы\n" "• Рекомендуется делать полную синхронизацию ежедневно\n" "• Баланс пользователей НЕ удаляется\n\n" + "⬆️ Обратная синхронизация:\n" + "• Отправляет активных пользователей из бота в панель\n" + "• Используйте при сбоях панели или для восстановления данных\n\n" + "\n".join(status_lines) ) @@ -2295,6 +2298,12 @@ async def show_sync_options( callback_data="sync_all_users", ) ], + [ + types.InlineKeyboardButton( + text="⬆️ Синхронизация в панель", + callback_data="sync_to_panel", + ) + ], [ types.InlineKeyboardButton( text="⚙️ Настройки автосинхронизации", @@ -2654,6 +2663,50 @@ async def sync_all_users( ) await callback.answer() + +@admin_required +@error_handler +async def sync_users_to_panel( + callback: types.CallbackQuery, + db_user: User, + db: AsyncSession, +): + await callback.message.edit_text( + "⬆️ Выполняется синхронизация данных бота в панель Remnawave...\n\n" + "Это может занять несколько минут.", + reply_markup=None, + ) + + remnawave_service = RemnaWaveService() + stats = await remnawave_service.sync_users_to_panel(db) + + if stats["errors"] == 0: + status_emoji = "✅" + status_text = "успешно завершена" + else: + status_emoji = "⚠️" if (stats["created"] + stats["updated"]) > 0 else "❌" + status_text = "завершена с предупреждениями" if status_emoji == "⚠️" else "завершена с ошибками" + + text = ( + f"{status_emoji} Синхронизация в панель {status_text}\n\n" + "📊 Результаты:\n" + f"• 🆕 Создано: {stats['created']}\n" + f"• 🔄 Обновлено: {stats['updated']}\n" + f"• ❌ Ошибок: {stats['errors']}" + ) + + keyboard = [ + [types.InlineKeyboardButton(text="🔄 Повторить", callback_data="sync_to_panel")], + [types.InlineKeyboardButton(text="🔄 Полная синхронизация", callback_data="sync_all_users")], + [types.InlineKeyboardButton(text="⬅️ К синхронизации", callback_data="admin_rw_sync")], + ] + + await callback.message.edit_text( + text, + reply_markup=types.InlineKeyboardMarkup(inline_keyboard=keyboard), + ) + await callback.answer() + @admin_required @error_handler async def show_sync_recommendations( @@ -3126,6 +3179,7 @@ def register_handlers(dp: Dispatcher): dp.callback_query.register(cancel_auto_sync_schedule, F.data == "remnawave_auto_sync_cancel") dp.callback_query.register(run_auto_sync_now, F.data == "remnawave_auto_sync_run") dp.callback_query.register(sync_all_users, F.data == "sync_all_users") + dp.callback_query.register(sync_users_to_panel, F.data == "sync_to_panel") dp.callback_query.register(show_squad_migration_menu, F.data == "admin_rw_migration") dp.callback_query.register(paginate_migration_source, F.data.startswith("admin_migration_source_page_")) dp.callback_query.register(handle_migration_source_selection, F.data.startswith("admin_migration_source_")) diff --git a/app/handlers/admin/users.py b/app/handlers/admin/users.py index d25499e6..9c9c9cb9 100644 --- a/app/handlers/admin/users.py +++ b/app/handlers/admin/users.py @@ -4595,10 +4595,19 @@ async def admin_buy_subscription_execute( if target_user.remnawave_uuid: async with remnawave_service.get_api_client() as api: + expire_at = remnawave_service.ensure_future_expire_at( + subscription.end_date + ) + status = ( + UserStatus.ACTIVE + if subscription.is_active + else UserStatus.DISABLED + ) + update_kwargs = dict( uuid=target_user.remnawave_uuid, - status=UserStatus.ACTIVE if subscription.is_active else UserStatus.EXPIRED, - expire_at=subscription.end_date, + status=status, + expire_at=expire_at, traffic_limit_bytes=subscription.traffic_limit_gb * (1024**3) if subscription.traffic_limit_gb > 0 else 0, traffic_limit_strategy=TrafficLimitStrategy.MONTH, description=settings.format_remnawave_user_description( @@ -4620,10 +4629,19 @@ async def admin_buy_subscription_execute( telegram_id=target_user.telegram_id, ) async with remnawave_service.get_api_client() as api: + expire_at = remnawave_service.ensure_future_expire_at( + subscription.end_date + ) + status = ( + UserStatus.ACTIVE + if subscription.is_active + else UserStatus.DISABLED + ) + create_kwargs = dict( username=username, - expire_at=subscription.end_date, - status=UserStatus.ACTIVE if subscription.is_active else UserStatus.EXPIRED, + expire_at=expire_at, + status=status, traffic_limit_bytes=subscription.traffic_limit_gb * (1024**3) if subscription.traffic_limit_gb > 0 else 0, traffic_limit_strategy=TrafficLimitStrategy.MONTH, telegram_id=target_user.telegram_id, diff --git a/app/keyboards/admin.py b/app/keyboards/admin.py index 6272a531..964984fb 100644 --- a/app/keyboards/admin.py +++ b/app/keyboards/admin.py @@ -1118,6 +1118,12 @@ def get_sync_options_keyboard(language: str = "ru") -> InlineKeyboardMarkup: callback_data="sync_all_users" ) ], + [ + InlineKeyboardButton( + text=_t(texts, "ADMIN_SYNC_TO_PANEL", "⬆️ Синхронизация в панель"), + callback_data="sync_to_panel" + ) + ], [ InlineKeyboardButton( text=_t(texts, "ADMIN_SYNC_ONLY_NEW", "🆕 Только новые"), diff --git a/app/localization/locales/ru.json b/app/localization/locales/ru.json index 3519200a..3d3aef2c 100644 --- a/app/localization/locales/ru.json +++ b/app/localization/locales/ru.json @@ -688,6 +688,7 @@ "ADMIN_SUPPORT_SUBMENU_TITLE": "🛟 **Поддержка**\n\n", "ADMIN_SUPPORT_TICKETS": "🎫 Тикеты поддержки", "ADMIN_SYNC_BACK": "⬅️ К синхронизации", + "ADMIN_SYNC_TO_PANEL": "⬆️ Синхронизация в панель", "ADMIN_SYNC_CLEANUP": "🧹 Очистка", "ADMIN_SYNC_CONFIRM": "✅ Подтвердить", "ADMIN_SYNC_FULL": "🔄 Полная синхронизация", diff --git a/app/localization/locales/ua.json b/app/localization/locales/ua.json index f0b24625..d291f56a 100644 --- a/app/localization/locales/ua.json +++ b/app/localization/locales/ua.json @@ -687,6 +687,7 @@ "ADMIN_SUPPORT_SUBMENU_TITLE": "🛟 **Підтримка**\n\n", "ADMIN_SUPPORT_TICKETS": "🎫 Тікети підтримки", "ADMIN_SYNC_BACK": "⬅️ До синхронізації", + "ADMIN_SYNC_TO_PANEL": "⬆️ Синхронізація в панель", "ADMIN_SYNC_CLEANUP": "🧹 Очищення", "ADMIN_SYNC_CONFIRM": "✅ Підтвердити", "ADMIN_SYNC_FULL": "🔄 Повна синхронізація", diff --git a/app/services/monitoring_service.py b/app/services/monitoring_service.py index 9a0f6f95..1a0c300c 100644 --- a/app/services/monitoring_service.py +++ b/app/services/monitoring_service.py @@ -281,11 +281,15 @@ class MonitoringService: return None current_time = datetime.utcnow() - is_active = (subscription.status == SubscriptionStatus.ACTIVE.value and - subscription.end_date > current_time) + is_active = ( + subscription.status == SubscriptionStatus.ACTIVE.value + and subscription.end_date > current_time + ) - if (subscription.status == SubscriptionStatus.ACTIVE.value and - subscription.end_date <= current_time): + if ( + subscription.status == SubscriptionStatus.ACTIVE.value + and subscription.end_date <= current_time + ): subscription.status = SubscriptionStatus.EXPIRED.value await db.commit() is_active = False @@ -301,10 +305,16 @@ class MonitoringService: async with self.subscription_service.get_api_client() as api: hwid_limit = resolve_hwid_device_limit_for_payload(subscription) + expire_at = self.subscription_service.ensure_future_expire_at( + subscription.end_date + ) + + status = UserStatus.ACTIVE if is_active else UserStatus.DISABLED + update_kwargs = dict( uuid=user.remnawave_uuid, - status=UserStatus.ACTIVE if is_active else UserStatus.EXPIRED, - expire_at=subscription.end_date, + status=status, + expire_at=expire_at, traffic_limit_bytes=self._gb_to_bytes(subscription.traffic_limit_gb), traffic_limit_strategy=TrafficLimitStrategy.MONTH, description=settings.format_remnawave_user_description( diff --git a/app/services/remnawave_service.py b/app/services/remnawave_service.py index e2cb7db5..22f9e183 100644 --- a/app/services/remnawave_service.py +++ b/app/services/remnawave_service.py @@ -220,6 +220,27 @@ class RemnaWaveService: """Возвращает текущее время в UTC без привязки к часовому поясу.""" return datetime.now(self._utc_timezone).replace(tzinfo=None) + def ensure_future_expire_at(self, expire_at: Optional[datetime]) -> datetime: + """Приводит дату окончания подписки к будущему значению для API панели.""" + + safe_now = self._now_utc() + + if expire_at is None: + adjusted = safe_now + timedelta(days=30) + logger.debug("⚙️ Используем дефолтную дату окончания подписки: %s", adjusted) + return adjusted + + if expire_at <= safe_now: + adjusted = safe_now + timedelta(minutes=1) + logger.debug( + "⚙️ Корректируем просроченную дату подписки %s → %s для RemnaWave API", + expire_at, + adjusted, + ) + return adjusted + + return expire_at + def _parse_remnawave_date(self, date_str: str) -> datetime: if not date_str: return self._now_utc() + timedelta(days=30) @@ -1615,77 +1636,109 @@ class RemnaWaveService: async def sync_users_to_panel(self, db: AsyncSession) -> Dict[str, int]: try: stats = {"created": 0, "updated": 0, "errors": 0} - - users = await get_users_list(db, offset=0, limit=10000) - + + batch_size = 100 + offset = 0 + async with self.get_api_client() as api: - for user in users: - if not user.subscription: - continue + while True: + users = await get_users_list(db, offset=offset, limit=batch_size) + + if not users: + break + + for user in users: + if not user.subscription: + continue + + try: + subscription = user.subscription + hwid_limit = resolve_hwid_device_limit_for_payload(subscription) + + expire_at = self.ensure_future_expire_at( + subscription.end_date + ) + + status = ( + UserStatus.ACTIVE + if subscription.is_active + else UserStatus.DISABLED + ) + + if user.remnawave_uuid: + update_kwargs = dict( + uuid=user.remnawave_uuid, + status=status, + expire_at=expire_at, + traffic_limit_bytes=subscription.traffic_limit_gb * (1024**3) if subscription.traffic_limit_gb > 0 else 0, + traffic_limit_strategy=TrafficLimitStrategy.MONTH, + description=settings.format_remnawave_user_description( + full_name=user.full_name, + username=user.username, + telegram_id=user.telegram_id + ), + active_internal_squads=subscription.connected_squads, + ) + + if hwid_limit is not None: + update_kwargs['hwid_device_limit'] = hwid_limit + + await api.update_user(**update_kwargs) + stats["updated"] += 1 + else: + username = settings.format_remnawave_username( + full_name=user.full_name, + username=user.username, + telegram_id=user.telegram_id, + ) + + create_kwargs = dict( + username=username, + expire_at=expire_at, + status=status, + traffic_limit_bytes=subscription.traffic_limit_gb * (1024**3) if subscription.traffic_limit_gb > 0 else 0, + traffic_limit_strategy=TrafficLimitStrategy.MONTH, + telegram_id=user.telegram_id, + description=settings.format_remnawave_user_description( + full_name=user.full_name, + username=user.username, + telegram_id=user.telegram_id + ), + active_internal_squads=subscription.connected_squads, + ) + + if hwid_limit is not None: + create_kwargs['hwid_device_limit'] = hwid_limit + + new_user = await api.create_user(**create_kwargs) + + user.remnawave_uuid = new_user.uuid + subscription.remnawave_short_uuid = new_user.short_uuid + + stats["created"] += 1 + + except Exception as e: + logger.error(f"Ошибка синхронизации пользователя {user.telegram_id} в панель: {e}") + stats["errors"] += 1 try: - subscription = user.subscription - hwid_limit = resolve_hwid_device_limit_for_payload(subscription) + await db.commit() + except Exception as commit_error: + logger.error( + "Ошибка фиксации транзакции при синхронизации в панель: %s", + commit_error, + ) + await db.rollback() + stats["errors"] += len(users) - if user.remnawave_uuid: - update_kwargs = dict( - uuid=user.remnawave_uuid, - status=UserStatus.ACTIVE if subscription.is_active else UserStatus.EXPIRED, - expire_at=subscription.end_date, - traffic_limit_bytes=subscription.traffic_limit_gb * (1024**3) if subscription.traffic_limit_gb > 0 else 0, - traffic_limit_strategy=TrafficLimitStrategy.MONTH, - description=settings.format_remnawave_user_description( - full_name=user.full_name, - username=user.username, - telegram_id=user.telegram_id - ), - active_internal_squads=subscription.connected_squads, - ) + if len(users) < batch_size: + break - if hwid_limit is not None: - update_kwargs['hwid_device_limit'] = hwid_limit + offset += batch_size - await api.update_user(**update_kwargs) - stats["updated"] += 1 - else: - username = settings.format_remnawave_username( - full_name=user.full_name, - username=user.username, - telegram_id=user.telegram_id, - ) - - create_kwargs = dict( - username=username, - expire_at=subscription.end_date, - status=UserStatus.ACTIVE if subscription.is_active else UserStatus.EXPIRED, - traffic_limit_bytes=subscription.traffic_limit_gb * (1024**3) if subscription.traffic_limit_gb > 0 else 0, - traffic_limit_strategy=TrafficLimitStrategy.MONTH, - telegram_id=user.telegram_id, - description=settings.format_remnawave_user_description( - full_name=user.full_name, - username=user.username, - telegram_id=user.telegram_id - ), - active_internal_squads=subscription.connected_squads, - ) - - if hwid_limit is not None: - create_kwargs['hwid_device_limit'] = hwid_limit - - new_user = await api.create_user(**create_kwargs) - - await update_user(db, user, remnawave_uuid=new_user.uuid) - subscription.remnawave_short_uuid = new_user.short_uuid - # Убираем немедленный коммит для пакетной обработки - # await db.commit() - - stats["created"] += 1 - - except Exception as e: - logger.error(f"Ошибка синхронизации пользователя {user.telegram_id} в панель: {e}") - stats["errors"] += 1 - - logger.info(f"✅ Синхронизация в панель завершена: создано {stats['created']}, обновлено {stats['updated']}, ошибок {stats['errors']}") + logger.info( + f"✅ Синхронизация в панель завершена: создано {stats['created']}, обновлено {stats['updated']}, ошибок {stats['errors']}" + ) return stats except Exception as e: diff --git a/app/services/subscription_service.py b/app/services/subscription_service.py index 623aeaf9..d04284b5 100644 --- a/app/services/subscription_service.py +++ b/app/services/subscription_service.py @@ -153,6 +153,27 @@ class SubscriptionService: assert self.api is not None async with self.api as api: yield api + + def ensure_future_expire_at(self, expire_at: Optional[datetime]) -> datetime: + """Гарантирует, что дата окончания передается в будущее значение для RemnaWave API.""" + + safe_now = datetime.utcnow() + + if expire_at is None: + adjusted = safe_now + timedelta(days=30) + logger.debug("⚙️ Используем дефолтную дату окончания подписки: %s", adjusted) + return adjusted + + if expire_at <= safe_now: + adjusted = safe_now + timedelta(minutes=1) + logger.debug( + "⚙️ Корректируем просроченную дату подписки %s → %s для RemnaWave API", + expire_at, + adjusted, + ) + return adjusted + + return expire_at async def create_remnawave_user( self, @@ -187,10 +208,18 @@ class SubscriptionService: except Exception as hwid_error: logger.warning(f"⚠️ Не удалось сбросить HWID: {hwid_error}") + expire_at = self.ensure_future_expire_at(subscription.end_date) + + status = ( + UserStatus.ACTIVE + if subscription.is_active + else UserStatus.DISABLED + ) + update_kwargs = dict( uuid=remnawave_user.uuid, - status=UserStatus.ACTIVE, - expire_at=subscription.end_date, + status=status, + expire_at=expire_at, traffic_limit_bytes=self._gb_to_bytes(subscription.traffic_limit_gb), traffic_limit_strategy=get_traffic_reset_strategy(), description=settings.format_remnawave_user_description( @@ -221,10 +250,18 @@ class SubscriptionService: username=user.username, telegram_id=user.telegram_id, ) + expire_at = self.ensure_future_expire_at(subscription.end_date) + + status = ( + UserStatus.ACTIVE + if subscription.is_active + else UserStatus.DISABLED + ) + create_kwargs = dict( username=username, - expire_at=subscription.end_date, - status=UserStatus.ACTIVE, + expire_at=expire_at, + status=status, traffic_limit_bytes=self._gb_to_bytes(subscription.traffic_limit_gb), traffic_limit_strategy=get_traffic_reset_strategy(), telegram_id=user.telegram_id, @@ -285,25 +322,32 @@ class SubscriptionService: return None current_time = datetime.utcnow() - is_actually_active = (subscription.status == SubscriptionStatus.ACTIVE.value and - subscription.end_date > current_time) + is_actually_active = ( + subscription.status == SubscriptionStatus.ACTIVE.value + and subscription.end_date > current_time + ) - if (subscription.status == SubscriptionStatus.ACTIVE.value and - subscription.end_date <= current_time): - + if ( + subscription.status == SubscriptionStatus.ACTIVE.value + and subscription.end_date <= current_time + ): subscription.status = SubscriptionStatus.EXPIRED.value subscription.updated_at = current_time await db.commit() is_actually_active = False logger.info(f"🔔 Статус подписки {subscription.id} автоматически изменен на 'expired'") - + async with self.get_api_client() as api: hwid_limit = resolve_hwid_device_limit_for_payload(subscription) + expire_at = self.ensure_future_expire_at(subscription.end_date) + + status = UserStatus.ACTIVE if is_actually_active else UserStatus.DISABLED + update_kwargs = dict( uuid=user.remnawave_uuid, - status=UserStatus.ACTIVE if is_actually_active else UserStatus.EXPIRED, - expire_at=subscription.end_date, + status=status, + expire_at=expire_at, traffic_limit_bytes=self._gb_to_bytes(subscription.traffic_limit_gb), traffic_limit_strategy=get_traffic_reset_strategy(), description=settings.format_remnawave_user_description( From 7cd6c3acb793acd5c08821cfd1f4665b41e4d1e6 Mon Sep 17 00:00:00 2001 From: Egor Date: Fri, 28 Nov 2025 02:52:47 +0300 Subject: [PATCH 09/12] Revert "Align RemnaWave sync with new status and expiration rules" --- app/handlers/admin/remnawave.py | 54 -------- app/handlers/admin/users.py | 26 +--- app/keyboards/admin.py | 6 - app/localization/locales/ru.json | 1 - app/localization/locales/ua.json | 1 - app/services/monitoring_service.py | 22 +--- app/services/remnawave_service.py | 183 ++++++++++----------------- app/services/subscription_service.py | 68 ++-------- 8 files changed, 87 insertions(+), 274 deletions(-) diff --git a/app/handlers/admin/remnawave.py b/app/handlers/admin/remnawave.py index 968a56f2..e63c1c80 100644 --- a/app/handlers/admin/remnawave.py +++ b/app/handlers/admin/remnawave.py @@ -2285,9 +2285,6 @@ async def show_sync_options( "• При полной синхронизации подписки пользователей, отсутствующих в панели, будут деактивированы\n" "• Рекомендуется делать полную синхронизацию ежедневно\n" "• Баланс пользователей НЕ удаляется\n\n" - "⬆️ Обратная синхронизация:\n" - "• Отправляет активных пользователей из бота в панель\n" - "• Используйте при сбоях панели или для восстановления данных\n\n" + "\n".join(status_lines) ) @@ -2298,12 +2295,6 @@ async def show_sync_options( callback_data="sync_all_users", ) ], - [ - types.InlineKeyboardButton( - text="⬆️ Синхронизация в панель", - callback_data="sync_to_panel", - ) - ], [ types.InlineKeyboardButton( text="⚙️ Настройки автосинхронизации", @@ -2663,50 +2654,6 @@ async def sync_all_users( ) await callback.answer() - -@admin_required -@error_handler -async def sync_users_to_panel( - callback: types.CallbackQuery, - db_user: User, - db: AsyncSession, -): - await callback.message.edit_text( - "⬆️ Выполняется синхронизация данных бота в панель Remnawave...\n\n" - "Это может занять несколько минут.", - reply_markup=None, - ) - - remnawave_service = RemnaWaveService() - stats = await remnawave_service.sync_users_to_panel(db) - - if stats["errors"] == 0: - status_emoji = "✅" - status_text = "успешно завершена" - else: - status_emoji = "⚠️" if (stats["created"] + stats["updated"]) > 0 else "❌" - status_text = "завершена с предупреждениями" if status_emoji == "⚠️" else "завершена с ошибками" - - text = ( - f"{status_emoji} Синхронизация в панель {status_text}\n\n" - "📊 Результаты:\n" - f"• 🆕 Создано: {stats['created']}\n" - f"• 🔄 Обновлено: {stats['updated']}\n" - f"• ❌ Ошибок: {stats['errors']}" - ) - - keyboard = [ - [types.InlineKeyboardButton(text="🔄 Повторить", callback_data="sync_to_panel")], - [types.InlineKeyboardButton(text="🔄 Полная синхронизация", callback_data="sync_all_users")], - [types.InlineKeyboardButton(text="⬅️ К синхронизации", callback_data="admin_rw_sync")], - ] - - await callback.message.edit_text( - text, - reply_markup=types.InlineKeyboardMarkup(inline_keyboard=keyboard), - ) - await callback.answer() - @admin_required @error_handler async def show_sync_recommendations( @@ -3179,7 +3126,6 @@ def register_handlers(dp: Dispatcher): dp.callback_query.register(cancel_auto_sync_schedule, F.data == "remnawave_auto_sync_cancel") dp.callback_query.register(run_auto_sync_now, F.data == "remnawave_auto_sync_run") dp.callback_query.register(sync_all_users, F.data == "sync_all_users") - dp.callback_query.register(sync_users_to_panel, F.data == "sync_to_panel") dp.callback_query.register(show_squad_migration_menu, F.data == "admin_rw_migration") dp.callback_query.register(paginate_migration_source, F.data.startswith("admin_migration_source_page_")) dp.callback_query.register(handle_migration_source_selection, F.data.startswith("admin_migration_source_")) diff --git a/app/handlers/admin/users.py b/app/handlers/admin/users.py index 9c9c9cb9..d25499e6 100644 --- a/app/handlers/admin/users.py +++ b/app/handlers/admin/users.py @@ -4595,19 +4595,10 @@ async def admin_buy_subscription_execute( if target_user.remnawave_uuid: async with remnawave_service.get_api_client() as api: - expire_at = remnawave_service.ensure_future_expire_at( - subscription.end_date - ) - status = ( - UserStatus.ACTIVE - if subscription.is_active - else UserStatus.DISABLED - ) - update_kwargs = dict( uuid=target_user.remnawave_uuid, - status=status, - expire_at=expire_at, + status=UserStatus.ACTIVE if subscription.is_active else UserStatus.EXPIRED, + expire_at=subscription.end_date, traffic_limit_bytes=subscription.traffic_limit_gb * (1024**3) if subscription.traffic_limit_gb > 0 else 0, traffic_limit_strategy=TrafficLimitStrategy.MONTH, description=settings.format_remnawave_user_description( @@ -4629,19 +4620,10 @@ async def admin_buy_subscription_execute( telegram_id=target_user.telegram_id, ) async with remnawave_service.get_api_client() as api: - expire_at = remnawave_service.ensure_future_expire_at( - subscription.end_date - ) - status = ( - UserStatus.ACTIVE - if subscription.is_active - else UserStatus.DISABLED - ) - create_kwargs = dict( username=username, - expire_at=expire_at, - status=status, + expire_at=subscription.end_date, + status=UserStatus.ACTIVE if subscription.is_active else UserStatus.EXPIRED, traffic_limit_bytes=subscription.traffic_limit_gb * (1024**3) if subscription.traffic_limit_gb > 0 else 0, traffic_limit_strategy=TrafficLimitStrategy.MONTH, telegram_id=target_user.telegram_id, diff --git a/app/keyboards/admin.py b/app/keyboards/admin.py index 964984fb..6272a531 100644 --- a/app/keyboards/admin.py +++ b/app/keyboards/admin.py @@ -1118,12 +1118,6 @@ def get_sync_options_keyboard(language: str = "ru") -> InlineKeyboardMarkup: callback_data="sync_all_users" ) ], - [ - InlineKeyboardButton( - text=_t(texts, "ADMIN_SYNC_TO_PANEL", "⬆️ Синхронизация в панель"), - callback_data="sync_to_panel" - ) - ], [ InlineKeyboardButton( text=_t(texts, "ADMIN_SYNC_ONLY_NEW", "🆕 Только новые"), diff --git a/app/localization/locales/ru.json b/app/localization/locales/ru.json index 3d3aef2c..3519200a 100644 --- a/app/localization/locales/ru.json +++ b/app/localization/locales/ru.json @@ -688,7 +688,6 @@ "ADMIN_SUPPORT_SUBMENU_TITLE": "🛟 **Поддержка**\n\n", "ADMIN_SUPPORT_TICKETS": "🎫 Тикеты поддержки", "ADMIN_SYNC_BACK": "⬅️ К синхронизации", - "ADMIN_SYNC_TO_PANEL": "⬆️ Синхронизация в панель", "ADMIN_SYNC_CLEANUP": "🧹 Очистка", "ADMIN_SYNC_CONFIRM": "✅ Подтвердить", "ADMIN_SYNC_FULL": "🔄 Полная синхронизация", diff --git a/app/localization/locales/ua.json b/app/localization/locales/ua.json index d291f56a..f0b24625 100644 --- a/app/localization/locales/ua.json +++ b/app/localization/locales/ua.json @@ -687,7 +687,6 @@ "ADMIN_SUPPORT_SUBMENU_TITLE": "🛟 **Підтримка**\n\n", "ADMIN_SUPPORT_TICKETS": "🎫 Тікети підтримки", "ADMIN_SYNC_BACK": "⬅️ До синхронізації", - "ADMIN_SYNC_TO_PANEL": "⬆️ Синхронізація в панель", "ADMIN_SYNC_CLEANUP": "🧹 Очищення", "ADMIN_SYNC_CONFIRM": "✅ Підтвердити", "ADMIN_SYNC_FULL": "🔄 Повна синхронізація", diff --git a/app/services/monitoring_service.py b/app/services/monitoring_service.py index 1a0c300c..9a0f6f95 100644 --- a/app/services/monitoring_service.py +++ b/app/services/monitoring_service.py @@ -281,15 +281,11 @@ class MonitoringService: return None current_time = datetime.utcnow() - is_active = ( - subscription.status == SubscriptionStatus.ACTIVE.value - and subscription.end_date > current_time - ) + is_active = (subscription.status == SubscriptionStatus.ACTIVE.value and + subscription.end_date > current_time) - if ( - subscription.status == SubscriptionStatus.ACTIVE.value - and subscription.end_date <= current_time - ): + if (subscription.status == SubscriptionStatus.ACTIVE.value and + subscription.end_date <= current_time): subscription.status = SubscriptionStatus.EXPIRED.value await db.commit() is_active = False @@ -305,16 +301,10 @@ class MonitoringService: async with self.subscription_service.get_api_client() as api: hwid_limit = resolve_hwid_device_limit_for_payload(subscription) - expire_at = self.subscription_service.ensure_future_expire_at( - subscription.end_date - ) - - status = UserStatus.ACTIVE if is_active else UserStatus.DISABLED - update_kwargs = dict( uuid=user.remnawave_uuid, - status=status, - expire_at=expire_at, + status=UserStatus.ACTIVE if is_active else UserStatus.EXPIRED, + expire_at=subscription.end_date, traffic_limit_bytes=self._gb_to_bytes(subscription.traffic_limit_gb), traffic_limit_strategy=TrafficLimitStrategy.MONTH, description=settings.format_remnawave_user_description( diff --git a/app/services/remnawave_service.py b/app/services/remnawave_service.py index 22f9e183..e2cb7db5 100644 --- a/app/services/remnawave_service.py +++ b/app/services/remnawave_service.py @@ -220,27 +220,6 @@ class RemnaWaveService: """Возвращает текущее время в UTC без привязки к часовому поясу.""" return datetime.now(self._utc_timezone).replace(tzinfo=None) - def ensure_future_expire_at(self, expire_at: Optional[datetime]) -> datetime: - """Приводит дату окончания подписки к будущему значению для API панели.""" - - safe_now = self._now_utc() - - if expire_at is None: - adjusted = safe_now + timedelta(days=30) - logger.debug("⚙️ Используем дефолтную дату окончания подписки: %s", adjusted) - return adjusted - - if expire_at <= safe_now: - adjusted = safe_now + timedelta(minutes=1) - logger.debug( - "⚙️ Корректируем просроченную дату подписки %s → %s для RemnaWave API", - expire_at, - adjusted, - ) - return adjusted - - return expire_at - def _parse_remnawave_date(self, date_str: str) -> datetime: if not date_str: return self._now_utc() + timedelta(days=30) @@ -1636,109 +1615,77 @@ class RemnaWaveService: async def sync_users_to_panel(self, db: AsyncSession) -> Dict[str, int]: try: stats = {"created": 0, "updated": 0, "errors": 0} - - batch_size = 100 - offset = 0 - + + users = await get_users_list(db, offset=0, limit=10000) + async with self.get_api_client() as api: - while True: - users = await get_users_list(db, offset=offset, limit=batch_size) - - if not users: - break - - for user in users: - if not user.subscription: - continue - - try: - subscription = user.subscription - hwid_limit = resolve_hwid_device_limit_for_payload(subscription) - - expire_at = self.ensure_future_expire_at( - subscription.end_date - ) - - status = ( - UserStatus.ACTIVE - if subscription.is_active - else UserStatus.DISABLED - ) - - if user.remnawave_uuid: - update_kwargs = dict( - uuid=user.remnawave_uuid, - status=status, - expire_at=expire_at, - traffic_limit_bytes=subscription.traffic_limit_gb * (1024**3) if subscription.traffic_limit_gb > 0 else 0, - traffic_limit_strategy=TrafficLimitStrategy.MONTH, - description=settings.format_remnawave_user_description( - full_name=user.full_name, - username=user.username, - telegram_id=user.telegram_id - ), - active_internal_squads=subscription.connected_squads, - ) - - if hwid_limit is not None: - update_kwargs['hwid_device_limit'] = hwid_limit - - await api.update_user(**update_kwargs) - stats["updated"] += 1 - else: - username = settings.format_remnawave_username( - full_name=user.full_name, - username=user.username, - telegram_id=user.telegram_id, - ) - - create_kwargs = dict( - username=username, - expire_at=expire_at, - status=status, - traffic_limit_bytes=subscription.traffic_limit_gb * (1024**3) if subscription.traffic_limit_gb > 0 else 0, - traffic_limit_strategy=TrafficLimitStrategy.MONTH, - telegram_id=user.telegram_id, - description=settings.format_remnawave_user_description( - full_name=user.full_name, - username=user.username, - telegram_id=user.telegram_id - ), - active_internal_squads=subscription.connected_squads, - ) - - if hwid_limit is not None: - create_kwargs['hwid_device_limit'] = hwid_limit - - new_user = await api.create_user(**create_kwargs) - - user.remnawave_uuid = new_user.uuid - subscription.remnawave_short_uuid = new_user.short_uuid - - stats["created"] += 1 - - except Exception as e: - logger.error(f"Ошибка синхронизации пользователя {user.telegram_id} в панель: {e}") - stats["errors"] += 1 + for user in users: + if not user.subscription: + continue try: - await db.commit() - except Exception as commit_error: - logger.error( - "Ошибка фиксации транзакции при синхронизации в панель: %s", - commit_error, - ) - await db.rollback() - stats["errors"] += len(users) + subscription = user.subscription + hwid_limit = resolve_hwid_device_limit_for_payload(subscription) - if len(users) < batch_size: - break + if user.remnawave_uuid: + update_kwargs = dict( + uuid=user.remnawave_uuid, + status=UserStatus.ACTIVE if subscription.is_active else UserStatus.EXPIRED, + expire_at=subscription.end_date, + traffic_limit_bytes=subscription.traffic_limit_gb * (1024**3) if subscription.traffic_limit_gb > 0 else 0, + traffic_limit_strategy=TrafficLimitStrategy.MONTH, + description=settings.format_remnawave_user_description( + full_name=user.full_name, + username=user.username, + telegram_id=user.telegram_id + ), + active_internal_squads=subscription.connected_squads, + ) - offset += batch_size + if hwid_limit is not None: + update_kwargs['hwid_device_limit'] = hwid_limit - logger.info( - f"✅ Синхронизация в панель завершена: создано {stats['created']}, обновлено {stats['updated']}, ошибок {stats['errors']}" - ) + await api.update_user(**update_kwargs) + stats["updated"] += 1 + else: + username = settings.format_remnawave_username( + full_name=user.full_name, + username=user.username, + telegram_id=user.telegram_id, + ) + + create_kwargs = dict( + username=username, + expire_at=subscription.end_date, + status=UserStatus.ACTIVE if subscription.is_active else UserStatus.EXPIRED, + traffic_limit_bytes=subscription.traffic_limit_gb * (1024**3) if subscription.traffic_limit_gb > 0 else 0, + traffic_limit_strategy=TrafficLimitStrategy.MONTH, + telegram_id=user.telegram_id, + description=settings.format_remnawave_user_description( + full_name=user.full_name, + username=user.username, + telegram_id=user.telegram_id + ), + active_internal_squads=subscription.connected_squads, + ) + + if hwid_limit is not None: + create_kwargs['hwid_device_limit'] = hwid_limit + + new_user = await api.create_user(**create_kwargs) + + await update_user(db, user, remnawave_uuid=new_user.uuid) + subscription.remnawave_short_uuid = new_user.short_uuid + # Убираем немедленный коммит для пакетной обработки + # await db.commit() + + stats["created"] += 1 + + except Exception as e: + logger.error(f"Ошибка синхронизации пользователя {user.telegram_id} в панель: {e}") + stats["errors"] += 1 + + logger.info(f"✅ Синхронизация в панель завершена: создано {stats['created']}, обновлено {stats['updated']}, ошибок {stats['errors']}") return stats except Exception as e: diff --git a/app/services/subscription_service.py b/app/services/subscription_service.py index d04284b5..623aeaf9 100644 --- a/app/services/subscription_service.py +++ b/app/services/subscription_service.py @@ -153,27 +153,6 @@ class SubscriptionService: assert self.api is not None async with self.api as api: yield api - - def ensure_future_expire_at(self, expire_at: Optional[datetime]) -> datetime: - """Гарантирует, что дата окончания передается в будущее значение для RemnaWave API.""" - - safe_now = datetime.utcnow() - - if expire_at is None: - adjusted = safe_now + timedelta(days=30) - logger.debug("⚙️ Используем дефолтную дату окончания подписки: %s", adjusted) - return adjusted - - if expire_at <= safe_now: - adjusted = safe_now + timedelta(minutes=1) - logger.debug( - "⚙️ Корректируем просроченную дату подписки %s → %s для RemnaWave API", - expire_at, - adjusted, - ) - return adjusted - - return expire_at async def create_remnawave_user( self, @@ -208,18 +187,10 @@ class SubscriptionService: except Exception as hwid_error: logger.warning(f"⚠️ Не удалось сбросить HWID: {hwid_error}") - expire_at = self.ensure_future_expire_at(subscription.end_date) - - status = ( - UserStatus.ACTIVE - if subscription.is_active - else UserStatus.DISABLED - ) - update_kwargs = dict( uuid=remnawave_user.uuid, - status=status, - expire_at=expire_at, + status=UserStatus.ACTIVE, + expire_at=subscription.end_date, traffic_limit_bytes=self._gb_to_bytes(subscription.traffic_limit_gb), traffic_limit_strategy=get_traffic_reset_strategy(), description=settings.format_remnawave_user_description( @@ -250,18 +221,10 @@ class SubscriptionService: username=user.username, telegram_id=user.telegram_id, ) - expire_at = self.ensure_future_expire_at(subscription.end_date) - - status = ( - UserStatus.ACTIVE - if subscription.is_active - else UserStatus.DISABLED - ) - create_kwargs = dict( username=username, - expire_at=expire_at, - status=status, + expire_at=subscription.end_date, + status=UserStatus.ACTIVE, traffic_limit_bytes=self._gb_to_bytes(subscription.traffic_limit_gb), traffic_limit_strategy=get_traffic_reset_strategy(), telegram_id=user.telegram_id, @@ -322,32 +285,25 @@ class SubscriptionService: return None current_time = datetime.utcnow() - is_actually_active = ( - subscription.status == SubscriptionStatus.ACTIVE.value - and subscription.end_date > current_time - ) + is_actually_active = (subscription.status == SubscriptionStatus.ACTIVE.value and + subscription.end_date > current_time) - if ( - subscription.status == SubscriptionStatus.ACTIVE.value - and subscription.end_date <= current_time - ): + if (subscription.status == SubscriptionStatus.ACTIVE.value and + subscription.end_date <= current_time): + subscription.status = SubscriptionStatus.EXPIRED.value subscription.updated_at = current_time await db.commit() is_actually_active = False logger.info(f"🔔 Статус подписки {subscription.id} автоматически изменен на 'expired'") - + async with self.get_api_client() as api: hwid_limit = resolve_hwid_device_limit_for_payload(subscription) - expire_at = self.ensure_future_expire_at(subscription.end_date) - - status = UserStatus.ACTIVE if is_actually_active else UserStatus.DISABLED - update_kwargs = dict( uuid=user.remnawave_uuid, - status=status, - expire_at=expire_at, + status=UserStatus.ACTIVE if is_actually_active else UserStatus.EXPIRED, + expire_at=subscription.end_date, traffic_limit_bytes=self._gb_to_bytes(subscription.traffic_limit_gb), traffic_limit_strategy=get_traffic_reset_strategy(), description=settings.format_remnawave_user_description( From 5b9002896efe3bdc767269d2224b702df5dc407a Mon Sep 17 00:00:00 2001 From: Egor Date: Fri, 28 Nov 2025 02:53:13 +0300 Subject: [PATCH 10/12] Handle RemnaWave status validation changes --- app/handlers/admin/remnawave.py | 54 +++++++++ app/keyboards/admin.py | 6 + app/localization/locales/ru.json | 1 + app/localization/locales/ua.json | 1 + app/services/remnawave_service.py | 178 +++++++++++++++++++----------- 5 files changed, 175 insertions(+), 65 deletions(-) diff --git a/app/handlers/admin/remnawave.py b/app/handlers/admin/remnawave.py index e63c1c80..968a56f2 100644 --- a/app/handlers/admin/remnawave.py +++ b/app/handlers/admin/remnawave.py @@ -2285,6 +2285,9 @@ async def show_sync_options( "• При полной синхронизации подписки пользователей, отсутствующих в панели, будут деактивированы\n" "• Рекомендуется делать полную синхронизацию ежедневно\n" "• Баланс пользователей НЕ удаляется\n\n" + "⬆️ Обратная синхронизация:\n" + "• Отправляет активных пользователей из бота в панель\n" + "• Используйте при сбоях панели или для восстановления данных\n\n" + "\n".join(status_lines) ) @@ -2295,6 +2298,12 @@ async def show_sync_options( callback_data="sync_all_users", ) ], + [ + types.InlineKeyboardButton( + text="⬆️ Синхронизация в панель", + callback_data="sync_to_panel", + ) + ], [ types.InlineKeyboardButton( text="⚙️ Настройки автосинхронизации", @@ -2654,6 +2663,50 @@ async def sync_all_users( ) await callback.answer() + +@admin_required +@error_handler +async def sync_users_to_panel( + callback: types.CallbackQuery, + db_user: User, + db: AsyncSession, +): + await callback.message.edit_text( + "⬆️ Выполняется синхронизация данных бота в панель Remnawave...\n\n" + "Это может занять несколько минут.", + reply_markup=None, + ) + + remnawave_service = RemnaWaveService() + stats = await remnawave_service.sync_users_to_panel(db) + + if stats["errors"] == 0: + status_emoji = "✅" + status_text = "успешно завершена" + else: + status_emoji = "⚠️" if (stats["created"] + stats["updated"]) > 0 else "❌" + status_text = "завершена с предупреждениями" if status_emoji == "⚠️" else "завершена с ошибками" + + text = ( + f"{status_emoji} Синхронизация в панель {status_text}\n\n" + "📊 Результаты:\n" + f"• 🆕 Создано: {stats['created']}\n" + f"• 🔄 Обновлено: {stats['updated']}\n" + f"• ❌ Ошибок: {stats['errors']}" + ) + + keyboard = [ + [types.InlineKeyboardButton(text="🔄 Повторить", callback_data="sync_to_panel")], + [types.InlineKeyboardButton(text="🔄 Полная синхронизация", callback_data="sync_all_users")], + [types.InlineKeyboardButton(text="⬅️ К синхронизации", callback_data="admin_rw_sync")], + ] + + await callback.message.edit_text( + text, + reply_markup=types.InlineKeyboardMarkup(inline_keyboard=keyboard), + ) + await callback.answer() + @admin_required @error_handler async def show_sync_recommendations( @@ -3126,6 +3179,7 @@ def register_handlers(dp: Dispatcher): dp.callback_query.register(cancel_auto_sync_schedule, F.data == "remnawave_auto_sync_cancel") dp.callback_query.register(run_auto_sync_now, F.data == "remnawave_auto_sync_run") dp.callback_query.register(sync_all_users, F.data == "sync_all_users") + dp.callback_query.register(sync_users_to_panel, F.data == "sync_to_panel") dp.callback_query.register(show_squad_migration_menu, F.data == "admin_rw_migration") dp.callback_query.register(paginate_migration_source, F.data.startswith("admin_migration_source_page_")) dp.callback_query.register(handle_migration_source_selection, F.data.startswith("admin_migration_source_")) diff --git a/app/keyboards/admin.py b/app/keyboards/admin.py index 6272a531..964984fb 100644 --- a/app/keyboards/admin.py +++ b/app/keyboards/admin.py @@ -1118,6 +1118,12 @@ def get_sync_options_keyboard(language: str = "ru") -> InlineKeyboardMarkup: callback_data="sync_all_users" ) ], + [ + InlineKeyboardButton( + text=_t(texts, "ADMIN_SYNC_TO_PANEL", "⬆️ Синхронизация в панель"), + callback_data="sync_to_panel" + ) + ], [ InlineKeyboardButton( text=_t(texts, "ADMIN_SYNC_ONLY_NEW", "🆕 Только новые"), diff --git a/app/localization/locales/ru.json b/app/localization/locales/ru.json index 3519200a..3d3aef2c 100644 --- a/app/localization/locales/ru.json +++ b/app/localization/locales/ru.json @@ -688,6 +688,7 @@ "ADMIN_SUPPORT_SUBMENU_TITLE": "🛟 **Поддержка**\n\n", "ADMIN_SUPPORT_TICKETS": "🎫 Тикеты поддержки", "ADMIN_SYNC_BACK": "⬅️ К синхронизации", + "ADMIN_SYNC_TO_PANEL": "⬆️ Синхронизация в панель", "ADMIN_SYNC_CLEANUP": "🧹 Очистка", "ADMIN_SYNC_CONFIRM": "✅ Подтвердить", "ADMIN_SYNC_FULL": "🔄 Полная синхронизация", diff --git a/app/localization/locales/ua.json b/app/localization/locales/ua.json index f0b24625..d291f56a 100644 --- a/app/localization/locales/ua.json +++ b/app/localization/locales/ua.json @@ -687,6 +687,7 @@ "ADMIN_SUPPORT_SUBMENU_TITLE": "🛟 **Підтримка**\n\n", "ADMIN_SUPPORT_TICKETS": "🎫 Тікети підтримки", "ADMIN_SYNC_BACK": "⬅️ До синхронізації", + "ADMIN_SYNC_TO_PANEL": "⬆️ Синхронізація в панель", "ADMIN_SYNC_CLEANUP": "🧹 Очищення", "ADMIN_SYNC_CONFIRM": "✅ Підтвердити", "ADMIN_SYNC_FULL": "🔄 Повна синхронізація", diff --git a/app/services/remnawave_service.py b/app/services/remnawave_service.py index e2cb7db5..365f8fcb 100644 --- a/app/services/remnawave_service.py +++ b/app/services/remnawave_service.py @@ -256,6 +256,29 @@ class RemnaWaveService: ) return self._now_utc() + timedelta(days=30) + def _safe_expire_at_for_panel(self, expire_at: Optional[datetime]) -> datetime: + """Гарантирует, что дата окончания не в прошлом для панели.""" + + now = self._now_utc() + minimum_expire = now + timedelta(minutes=1) + + if not expire_at: + return minimum_expire + + normalized_expire = expire_at + if normalized_expire.tzinfo is not None: + normalized_expire = normalized_expire.replace(tzinfo=None) + + if normalized_expire < minimum_expire: + logger.debug( + "⚙️ Коррекция даты истечения (%s) до минимально допустимой (%s) для панели", + normalized_expire, + minimum_expire, + ) + return minimum_expire + + return normalized_expire + def _safe_panel_expire_date(self, panel_user: Dict[str, Any]) -> datetime: """Парсит дату окончания подписки пользователя панели для сравнения.""" @@ -1615,77 +1638,102 @@ class RemnaWaveService: async def sync_users_to_panel(self, db: AsyncSession) -> Dict[str, int]: try: stats = {"created": 0, "updated": 0, "errors": 0} - - users = await get_users_list(db, offset=0, limit=10000) - + + batch_size = 100 + offset = 0 + async with self.get_api_client() as api: - for user in users: - if not user.subscription: - continue + while True: + users = await get_users_list(db, offset=offset, limit=batch_size) + + if not users: + break + + for user in users: + if not user.subscription: + continue + + try: + subscription = user.subscription + hwid_limit = resolve_hwid_device_limit_for_payload(subscription) + + expire_at = self._safe_expire_at_for_panel(subscription.end_date) + status = UserStatus.ACTIVE if subscription.is_active else UserStatus.DISABLED + + if user.remnawave_uuid: + update_kwargs = dict( + uuid=user.remnawave_uuid, + status=status, + expire_at=expire_at, + traffic_limit_bytes=subscription.traffic_limit_gb * (1024**3) if subscription.traffic_limit_gb > 0 else 0, + traffic_limit_strategy=TrafficLimitStrategy.MONTH, + description=settings.format_remnawave_user_description( + full_name=user.full_name, + username=user.username, + telegram_id=user.telegram_id + ), + active_internal_squads=subscription.connected_squads, + ) + + if hwid_limit is not None: + update_kwargs['hwid_device_limit'] = hwid_limit + + await api.update_user(**update_kwargs) + stats["updated"] += 1 + else: + username = settings.format_remnawave_username( + full_name=user.full_name, + username=user.username, + telegram_id=user.telegram_id, + ) + + create_kwargs = dict( + username=username, + expire_at=expire_at, + status=status, + traffic_limit_bytes=subscription.traffic_limit_gb * (1024**3) if subscription.traffic_limit_gb > 0 else 0, + traffic_limit_strategy=TrafficLimitStrategy.MONTH, + telegram_id=user.telegram_id, + description=settings.format_remnawave_user_description( + full_name=user.full_name, + username=user.username, + telegram_id=user.telegram_id + ), + active_internal_squads=subscription.connected_squads, + ) + + if hwid_limit is not None: + create_kwargs['hwid_device_limit'] = hwid_limit + + new_user = await api.create_user(**create_kwargs) + + user.remnawave_uuid = new_user.uuid + subscription.remnawave_short_uuid = new_user.short_uuid + + stats["created"] += 1 + + except Exception as e: + logger.error(f"Ошибка синхронизации пользователя {user.telegram_id} в панель: {e}") + stats["errors"] += 1 try: - subscription = user.subscription - hwid_limit = resolve_hwid_device_limit_for_payload(subscription) + await db.commit() + except Exception as commit_error: + logger.error( + "Ошибка фиксации транзакции при синхронизации в панель: %s", + commit_error, + ) + await db.rollback() + stats["errors"] += len(users) - if user.remnawave_uuid: - update_kwargs = dict( - uuid=user.remnawave_uuid, - status=UserStatus.ACTIVE if subscription.is_active else UserStatus.EXPIRED, - expire_at=subscription.end_date, - traffic_limit_bytes=subscription.traffic_limit_gb * (1024**3) if subscription.traffic_limit_gb > 0 else 0, - traffic_limit_strategy=TrafficLimitStrategy.MONTH, - description=settings.format_remnawave_user_description( - full_name=user.full_name, - username=user.username, - telegram_id=user.telegram_id - ), - active_internal_squads=subscription.connected_squads, - ) + if len(users) < batch_size: + break - if hwid_limit is not None: - update_kwargs['hwid_device_limit'] = hwid_limit + offset += batch_size - await api.update_user(**update_kwargs) - stats["updated"] += 1 - else: - username = settings.format_remnawave_username( - full_name=user.full_name, - username=user.username, - telegram_id=user.telegram_id, - ) - - create_kwargs = dict( - username=username, - expire_at=subscription.end_date, - status=UserStatus.ACTIVE if subscription.is_active else UserStatus.EXPIRED, - traffic_limit_bytes=subscription.traffic_limit_gb * (1024**3) if subscription.traffic_limit_gb > 0 else 0, - traffic_limit_strategy=TrafficLimitStrategy.MONTH, - telegram_id=user.telegram_id, - description=settings.format_remnawave_user_description( - full_name=user.full_name, - username=user.username, - telegram_id=user.telegram_id - ), - active_internal_squads=subscription.connected_squads, - ) - - if hwid_limit is not None: - create_kwargs['hwid_device_limit'] = hwid_limit - - new_user = await api.create_user(**create_kwargs) - - await update_user(db, user, remnawave_uuid=new_user.uuid) - subscription.remnawave_short_uuid = new_user.short_uuid - # Убираем немедленный коммит для пакетной обработки - # await db.commit() - - stats["created"] += 1 - - except Exception as e: - logger.error(f"Ошибка синхронизации пользователя {user.telegram_id} в панель: {e}") - stats["errors"] += 1 - - logger.info(f"✅ Синхронизация в панель завершена: создано {stats['created']}, обновлено {stats['updated']}, ошибок {stats['errors']}") + logger.info( + f"✅ Синхронизация в панель завершена: создано {stats['created']}, обновлено {stats['updated']}, ошибок {stats['errors']}" + ) return stats except Exception as e: From b4370e9a34168a0e3951a92e19c3c3e70b1b79b7 Mon Sep 17 00:00:00 2001 From: Egor Date: Fri, 28 Nov 2025 02:59:41 +0300 Subject: [PATCH 11/12] Revert "Handle RemnaWave status validation changes" --- app/handlers/admin/remnawave.py | 54 --------- app/keyboards/admin.py | 6 - app/localization/locales/ru.json | 1 - app/localization/locales/ua.json | 1 - app/services/remnawave_service.py | 178 +++++++++++------------------- 5 files changed, 65 insertions(+), 175 deletions(-) diff --git a/app/handlers/admin/remnawave.py b/app/handlers/admin/remnawave.py index 968a56f2..e63c1c80 100644 --- a/app/handlers/admin/remnawave.py +++ b/app/handlers/admin/remnawave.py @@ -2285,9 +2285,6 @@ async def show_sync_options( "• При полной синхронизации подписки пользователей, отсутствующих в панели, будут деактивированы\n" "• Рекомендуется делать полную синхронизацию ежедневно\n" "• Баланс пользователей НЕ удаляется\n\n" - "⬆️ Обратная синхронизация:\n" - "• Отправляет активных пользователей из бота в панель\n" - "• Используйте при сбоях панели или для восстановления данных\n\n" + "\n".join(status_lines) ) @@ -2298,12 +2295,6 @@ async def show_sync_options( callback_data="sync_all_users", ) ], - [ - types.InlineKeyboardButton( - text="⬆️ Синхронизация в панель", - callback_data="sync_to_panel", - ) - ], [ types.InlineKeyboardButton( text="⚙️ Настройки автосинхронизации", @@ -2663,50 +2654,6 @@ async def sync_all_users( ) await callback.answer() - -@admin_required -@error_handler -async def sync_users_to_panel( - callback: types.CallbackQuery, - db_user: User, - db: AsyncSession, -): - await callback.message.edit_text( - "⬆️ Выполняется синхронизация данных бота в панель Remnawave...\n\n" - "Это может занять несколько минут.", - reply_markup=None, - ) - - remnawave_service = RemnaWaveService() - stats = await remnawave_service.sync_users_to_panel(db) - - if stats["errors"] == 0: - status_emoji = "✅" - status_text = "успешно завершена" - else: - status_emoji = "⚠️" if (stats["created"] + stats["updated"]) > 0 else "❌" - status_text = "завершена с предупреждениями" if status_emoji == "⚠️" else "завершена с ошибками" - - text = ( - f"{status_emoji} Синхронизация в панель {status_text}\n\n" - "📊 Результаты:\n" - f"• 🆕 Создано: {stats['created']}\n" - f"• 🔄 Обновлено: {stats['updated']}\n" - f"• ❌ Ошибок: {stats['errors']}" - ) - - keyboard = [ - [types.InlineKeyboardButton(text="🔄 Повторить", callback_data="sync_to_panel")], - [types.InlineKeyboardButton(text="🔄 Полная синхронизация", callback_data="sync_all_users")], - [types.InlineKeyboardButton(text="⬅️ К синхронизации", callback_data="admin_rw_sync")], - ] - - await callback.message.edit_text( - text, - reply_markup=types.InlineKeyboardMarkup(inline_keyboard=keyboard), - ) - await callback.answer() - @admin_required @error_handler async def show_sync_recommendations( @@ -3179,7 +3126,6 @@ def register_handlers(dp: Dispatcher): dp.callback_query.register(cancel_auto_sync_schedule, F.data == "remnawave_auto_sync_cancel") dp.callback_query.register(run_auto_sync_now, F.data == "remnawave_auto_sync_run") dp.callback_query.register(sync_all_users, F.data == "sync_all_users") - dp.callback_query.register(sync_users_to_panel, F.data == "sync_to_panel") dp.callback_query.register(show_squad_migration_menu, F.data == "admin_rw_migration") dp.callback_query.register(paginate_migration_source, F.data.startswith("admin_migration_source_page_")) dp.callback_query.register(handle_migration_source_selection, F.data.startswith("admin_migration_source_")) diff --git a/app/keyboards/admin.py b/app/keyboards/admin.py index 964984fb..6272a531 100644 --- a/app/keyboards/admin.py +++ b/app/keyboards/admin.py @@ -1118,12 +1118,6 @@ def get_sync_options_keyboard(language: str = "ru") -> InlineKeyboardMarkup: callback_data="sync_all_users" ) ], - [ - InlineKeyboardButton( - text=_t(texts, "ADMIN_SYNC_TO_PANEL", "⬆️ Синхронизация в панель"), - callback_data="sync_to_panel" - ) - ], [ InlineKeyboardButton( text=_t(texts, "ADMIN_SYNC_ONLY_NEW", "🆕 Только новые"), diff --git a/app/localization/locales/ru.json b/app/localization/locales/ru.json index 3d3aef2c..3519200a 100644 --- a/app/localization/locales/ru.json +++ b/app/localization/locales/ru.json @@ -688,7 +688,6 @@ "ADMIN_SUPPORT_SUBMENU_TITLE": "🛟 **Поддержка**\n\n", "ADMIN_SUPPORT_TICKETS": "🎫 Тикеты поддержки", "ADMIN_SYNC_BACK": "⬅️ К синхронизации", - "ADMIN_SYNC_TO_PANEL": "⬆️ Синхронизация в панель", "ADMIN_SYNC_CLEANUP": "🧹 Очистка", "ADMIN_SYNC_CONFIRM": "✅ Подтвердить", "ADMIN_SYNC_FULL": "🔄 Полная синхронизация", diff --git a/app/localization/locales/ua.json b/app/localization/locales/ua.json index d291f56a..f0b24625 100644 --- a/app/localization/locales/ua.json +++ b/app/localization/locales/ua.json @@ -687,7 +687,6 @@ "ADMIN_SUPPORT_SUBMENU_TITLE": "🛟 **Підтримка**\n\n", "ADMIN_SUPPORT_TICKETS": "🎫 Тікети підтримки", "ADMIN_SYNC_BACK": "⬅️ До синхронізації", - "ADMIN_SYNC_TO_PANEL": "⬆️ Синхронізація в панель", "ADMIN_SYNC_CLEANUP": "🧹 Очищення", "ADMIN_SYNC_CONFIRM": "✅ Підтвердити", "ADMIN_SYNC_FULL": "🔄 Повна синхронізація", diff --git a/app/services/remnawave_service.py b/app/services/remnawave_service.py index 365f8fcb..e2cb7db5 100644 --- a/app/services/remnawave_service.py +++ b/app/services/remnawave_service.py @@ -256,29 +256,6 @@ class RemnaWaveService: ) return self._now_utc() + timedelta(days=30) - def _safe_expire_at_for_panel(self, expire_at: Optional[datetime]) -> datetime: - """Гарантирует, что дата окончания не в прошлом для панели.""" - - now = self._now_utc() - minimum_expire = now + timedelta(minutes=1) - - if not expire_at: - return minimum_expire - - normalized_expire = expire_at - if normalized_expire.tzinfo is not None: - normalized_expire = normalized_expire.replace(tzinfo=None) - - if normalized_expire < minimum_expire: - logger.debug( - "⚙️ Коррекция даты истечения (%s) до минимально допустимой (%s) для панели", - normalized_expire, - minimum_expire, - ) - return minimum_expire - - return normalized_expire - def _safe_panel_expire_date(self, panel_user: Dict[str, Any]) -> datetime: """Парсит дату окончания подписки пользователя панели для сравнения.""" @@ -1638,102 +1615,77 @@ class RemnaWaveService: async def sync_users_to_panel(self, db: AsyncSession) -> Dict[str, int]: try: stats = {"created": 0, "updated": 0, "errors": 0} - - batch_size = 100 - offset = 0 - + + users = await get_users_list(db, offset=0, limit=10000) + async with self.get_api_client() as api: - while True: - users = await get_users_list(db, offset=offset, limit=batch_size) - - if not users: - break - - for user in users: - if not user.subscription: - continue - - try: - subscription = user.subscription - hwid_limit = resolve_hwid_device_limit_for_payload(subscription) - - expire_at = self._safe_expire_at_for_panel(subscription.end_date) - status = UserStatus.ACTIVE if subscription.is_active else UserStatus.DISABLED - - if user.remnawave_uuid: - update_kwargs = dict( - uuid=user.remnawave_uuid, - status=status, - expire_at=expire_at, - traffic_limit_bytes=subscription.traffic_limit_gb * (1024**3) if subscription.traffic_limit_gb > 0 else 0, - traffic_limit_strategy=TrafficLimitStrategy.MONTH, - description=settings.format_remnawave_user_description( - full_name=user.full_name, - username=user.username, - telegram_id=user.telegram_id - ), - active_internal_squads=subscription.connected_squads, - ) - - if hwid_limit is not None: - update_kwargs['hwid_device_limit'] = hwid_limit - - await api.update_user(**update_kwargs) - stats["updated"] += 1 - else: - username = settings.format_remnawave_username( - full_name=user.full_name, - username=user.username, - telegram_id=user.telegram_id, - ) - - create_kwargs = dict( - username=username, - expire_at=expire_at, - status=status, - traffic_limit_bytes=subscription.traffic_limit_gb * (1024**3) if subscription.traffic_limit_gb > 0 else 0, - traffic_limit_strategy=TrafficLimitStrategy.MONTH, - telegram_id=user.telegram_id, - description=settings.format_remnawave_user_description( - full_name=user.full_name, - username=user.username, - telegram_id=user.telegram_id - ), - active_internal_squads=subscription.connected_squads, - ) - - if hwid_limit is not None: - create_kwargs['hwid_device_limit'] = hwid_limit - - new_user = await api.create_user(**create_kwargs) - - user.remnawave_uuid = new_user.uuid - subscription.remnawave_short_uuid = new_user.short_uuid - - stats["created"] += 1 - - except Exception as e: - logger.error(f"Ошибка синхронизации пользователя {user.telegram_id} в панель: {e}") - stats["errors"] += 1 + for user in users: + if not user.subscription: + continue try: - await db.commit() - except Exception as commit_error: - logger.error( - "Ошибка фиксации транзакции при синхронизации в панель: %s", - commit_error, - ) - await db.rollback() - stats["errors"] += len(users) + subscription = user.subscription + hwid_limit = resolve_hwid_device_limit_for_payload(subscription) - if len(users) < batch_size: - break + if user.remnawave_uuid: + update_kwargs = dict( + uuid=user.remnawave_uuid, + status=UserStatus.ACTIVE if subscription.is_active else UserStatus.EXPIRED, + expire_at=subscription.end_date, + traffic_limit_bytes=subscription.traffic_limit_gb * (1024**3) if subscription.traffic_limit_gb > 0 else 0, + traffic_limit_strategy=TrafficLimitStrategy.MONTH, + description=settings.format_remnawave_user_description( + full_name=user.full_name, + username=user.username, + telegram_id=user.telegram_id + ), + active_internal_squads=subscription.connected_squads, + ) - offset += batch_size + if hwid_limit is not None: + update_kwargs['hwid_device_limit'] = hwid_limit - logger.info( - f"✅ Синхронизация в панель завершена: создано {stats['created']}, обновлено {stats['updated']}, ошибок {stats['errors']}" - ) + await api.update_user(**update_kwargs) + stats["updated"] += 1 + else: + username = settings.format_remnawave_username( + full_name=user.full_name, + username=user.username, + telegram_id=user.telegram_id, + ) + + create_kwargs = dict( + username=username, + expire_at=subscription.end_date, + status=UserStatus.ACTIVE if subscription.is_active else UserStatus.EXPIRED, + traffic_limit_bytes=subscription.traffic_limit_gb * (1024**3) if subscription.traffic_limit_gb > 0 else 0, + traffic_limit_strategy=TrafficLimitStrategy.MONTH, + telegram_id=user.telegram_id, + description=settings.format_remnawave_user_description( + full_name=user.full_name, + username=user.username, + telegram_id=user.telegram_id + ), + active_internal_squads=subscription.connected_squads, + ) + + if hwid_limit is not None: + create_kwargs['hwid_device_limit'] = hwid_limit + + new_user = await api.create_user(**create_kwargs) + + await update_user(db, user, remnawave_uuid=new_user.uuid) + subscription.remnawave_short_uuid = new_user.short_uuid + # Убираем немедленный коммит для пакетной обработки + # await db.commit() + + stats["created"] += 1 + + except Exception as e: + logger.error(f"Ошибка синхронизации пользователя {user.telegram_id} в панель: {e}") + stats["errors"] += 1 + + logger.info(f"✅ Синхронизация в панель завершена: создано {stats['created']}, обновлено {stats['updated']}, ошибок {stats['errors']}") return stats except Exception as e: From aae85683a0bb6e310be2c60cd6ea8fc91802e9ac Mon Sep 17 00:00:00 2001 From: Egor Date: Fri, 28 Nov 2025 03:00:09 +0300 Subject: [PATCH 12/12] Recover missing RemnaWave users during panel sync --- app/handlers/admin/remnawave.py | 54 +++++++++++ app/keyboards/admin.py | 6 ++ app/localization/locales/ru.json | 1 + app/localization/locales/ua.json | 1 + app/services/remnawave_service.py | 146 +++++++++++++++++++++--------- 5 files changed, 164 insertions(+), 44 deletions(-) diff --git a/app/handlers/admin/remnawave.py b/app/handlers/admin/remnawave.py index e63c1c80..968a56f2 100644 --- a/app/handlers/admin/remnawave.py +++ b/app/handlers/admin/remnawave.py @@ -2285,6 +2285,9 @@ async def show_sync_options( "• При полной синхронизации подписки пользователей, отсутствующих в панели, будут деактивированы\n" "• Рекомендуется делать полную синхронизацию ежедневно\n" "• Баланс пользователей НЕ удаляется\n\n" + "⬆️ Обратная синхронизация:\n" + "• Отправляет активных пользователей из бота в панель\n" + "• Используйте при сбоях панели или для восстановления данных\n\n" + "\n".join(status_lines) ) @@ -2295,6 +2298,12 @@ async def show_sync_options( callback_data="sync_all_users", ) ], + [ + types.InlineKeyboardButton( + text="⬆️ Синхронизация в панель", + callback_data="sync_to_panel", + ) + ], [ types.InlineKeyboardButton( text="⚙️ Настройки автосинхронизации", @@ -2654,6 +2663,50 @@ async def sync_all_users( ) await callback.answer() + +@admin_required +@error_handler +async def sync_users_to_panel( + callback: types.CallbackQuery, + db_user: User, + db: AsyncSession, +): + await callback.message.edit_text( + "⬆️ Выполняется синхронизация данных бота в панель Remnawave...\n\n" + "Это может занять несколько минут.", + reply_markup=None, + ) + + remnawave_service = RemnaWaveService() + stats = await remnawave_service.sync_users_to_panel(db) + + if stats["errors"] == 0: + status_emoji = "✅" + status_text = "успешно завершена" + else: + status_emoji = "⚠️" if (stats["created"] + stats["updated"]) > 0 else "❌" + status_text = "завершена с предупреждениями" if status_emoji == "⚠️" else "завершена с ошибками" + + text = ( + f"{status_emoji} Синхронизация в панель {status_text}\n\n" + "📊 Результаты:\n" + f"• 🆕 Создано: {stats['created']}\n" + f"• 🔄 Обновлено: {stats['updated']}\n" + f"• ❌ Ошибок: {stats['errors']}" + ) + + keyboard = [ + [types.InlineKeyboardButton(text="🔄 Повторить", callback_data="sync_to_panel")], + [types.InlineKeyboardButton(text="🔄 Полная синхронизация", callback_data="sync_all_users")], + [types.InlineKeyboardButton(text="⬅️ К синхронизации", callback_data="admin_rw_sync")], + ] + + await callback.message.edit_text( + text, + reply_markup=types.InlineKeyboardMarkup(inline_keyboard=keyboard), + ) + await callback.answer() + @admin_required @error_handler async def show_sync_recommendations( @@ -3126,6 +3179,7 @@ def register_handlers(dp: Dispatcher): dp.callback_query.register(cancel_auto_sync_schedule, F.data == "remnawave_auto_sync_cancel") dp.callback_query.register(run_auto_sync_now, F.data == "remnawave_auto_sync_run") dp.callback_query.register(sync_all_users, F.data == "sync_all_users") + dp.callback_query.register(sync_users_to_panel, F.data == "sync_to_panel") dp.callback_query.register(show_squad_migration_menu, F.data == "admin_rw_migration") dp.callback_query.register(paginate_migration_source, F.data.startswith("admin_migration_source_page_")) dp.callback_query.register(handle_migration_source_selection, F.data.startswith("admin_migration_source_")) diff --git a/app/keyboards/admin.py b/app/keyboards/admin.py index 6272a531..964984fb 100644 --- a/app/keyboards/admin.py +++ b/app/keyboards/admin.py @@ -1118,6 +1118,12 @@ def get_sync_options_keyboard(language: str = "ru") -> InlineKeyboardMarkup: callback_data="sync_all_users" ) ], + [ + InlineKeyboardButton( + text=_t(texts, "ADMIN_SYNC_TO_PANEL", "⬆️ Синхронизация в панель"), + callback_data="sync_to_panel" + ) + ], [ InlineKeyboardButton( text=_t(texts, "ADMIN_SYNC_ONLY_NEW", "🆕 Только новые"), diff --git a/app/localization/locales/ru.json b/app/localization/locales/ru.json index 3519200a..3d3aef2c 100644 --- a/app/localization/locales/ru.json +++ b/app/localization/locales/ru.json @@ -688,6 +688,7 @@ "ADMIN_SUPPORT_SUBMENU_TITLE": "🛟 **Поддержка**\n\n", "ADMIN_SUPPORT_TICKETS": "🎫 Тикеты поддержки", "ADMIN_SYNC_BACK": "⬅️ К синхронизации", + "ADMIN_SYNC_TO_PANEL": "⬆️ Синхронизация в панель", "ADMIN_SYNC_CLEANUP": "🧹 Очистка", "ADMIN_SYNC_CONFIRM": "✅ Подтвердить", "ADMIN_SYNC_FULL": "🔄 Полная синхронизация", diff --git a/app/localization/locales/ua.json b/app/localization/locales/ua.json index f0b24625..d291f56a 100644 --- a/app/localization/locales/ua.json +++ b/app/localization/locales/ua.json @@ -687,6 +687,7 @@ "ADMIN_SUPPORT_SUBMENU_TITLE": "🛟 **Підтримка**\n\n", "ADMIN_SUPPORT_TICKETS": "🎫 Тікети підтримки", "ADMIN_SYNC_BACK": "⬅️ До синхронізації", + "ADMIN_SYNC_TO_PANEL": "⬆️ Синхронізація в панель", "ADMIN_SYNC_CLEANUP": "🧹 Очищення", "ADMIN_SYNC_CONFIRM": "✅ Підтвердити", "ADMIN_SYNC_FULL": "🔄 Повна синхронізація", diff --git a/app/services/remnawave_service.py b/app/services/remnawave_service.py index e2cb7db5..40d90797 100644 --- a/app/services/remnawave_service.py +++ b/app/services/remnawave_service.py @@ -256,6 +256,29 @@ class RemnaWaveService: ) return self._now_utc() + timedelta(days=30) + def _safe_expire_at_for_panel(self, expire_at: Optional[datetime]) -> datetime: + """Гарантирует, что дата окончания не в прошлом для панели.""" + + now = self._now_utc() + minimum_expire = now + timedelta(minutes=1) + + if not expire_at: + return minimum_expire + + normalized_expire = expire_at + if normalized_expire.tzinfo is not None: + normalized_expire = normalized_expire.replace(tzinfo=None) + + if normalized_expire < minimum_expire: + logger.debug( + "⚙️ Коррекция даты истечения (%s) до минимально допустимой (%s) для панели", + normalized_expire, + minimum_expire, + ) + return minimum_expire + + return normalized_expire + def _safe_panel_expire_date(self, panel_user: Dict[str, Any]) -> datetime: """Парсит дату окончания подписки пользователя панели для сравнения.""" @@ -1615,39 +1638,28 @@ class RemnaWaveService: async def sync_users_to_panel(self, db: AsyncSession) -> Dict[str, int]: try: stats = {"created": 0, "updated": 0, "errors": 0} - - users = await get_users_list(db, offset=0, limit=10000) - + + batch_size = 100 + offset = 0 + async with self.get_api_client() as api: - for user in users: - if not user.subscription: - continue + while True: + users = await get_users_list(db, offset=offset, limit=batch_size) - try: - subscription = user.subscription - hwid_limit = resolve_hwid_device_limit_for_payload(subscription) + if not users: + break - if user.remnawave_uuid: - update_kwargs = dict( - uuid=user.remnawave_uuid, - status=UserStatus.ACTIVE if subscription.is_active else UserStatus.EXPIRED, - expire_at=subscription.end_date, - traffic_limit_bytes=subscription.traffic_limit_gb * (1024**3) if subscription.traffic_limit_gb > 0 else 0, - traffic_limit_strategy=TrafficLimitStrategy.MONTH, - description=settings.format_remnawave_user_description( - full_name=user.full_name, - username=user.username, - telegram_id=user.telegram_id - ), - active_internal_squads=subscription.connected_squads, - ) + for user in users: + if not user.subscription: + continue - if hwid_limit is not None: - update_kwargs['hwid_device_limit'] = hwid_limit + try: + subscription = user.subscription + hwid_limit = resolve_hwid_device_limit_for_payload(subscription) + + expire_at = self._safe_expire_at_for_panel(subscription.end_date) + status = UserStatus.ACTIVE if subscription.is_active else UserStatus.DISABLED - await api.update_user(**update_kwargs) - stats["updated"] += 1 - else: username = settings.format_remnawave_username( full_name=user.full_name, username=user.username, @@ -1656,8 +1668,8 @@ class RemnaWaveService: create_kwargs = dict( username=username, - expire_at=subscription.end_date, - status=UserStatus.ACTIVE if subscription.is_active else UserStatus.EXPIRED, + expire_at=expire_at, + status=status, traffic_limit_bytes=subscription.traffic_limit_gb * (1024**3) if subscription.traffic_limit_gb > 0 else 0, traffic_limit_strategy=TrafficLimitStrategy.MONTH, telegram_id=user.telegram_id, @@ -1672,20 +1684,66 @@ class RemnaWaveService: if hwid_limit is not None: create_kwargs['hwid_device_limit'] = hwid_limit - new_user = await api.create_user(**create_kwargs) - - await update_user(db, user, remnawave_uuid=new_user.uuid) - subscription.remnawave_short_uuid = new_user.short_uuid - # Убираем немедленный коммит для пакетной обработки - # await db.commit() - - stats["created"] += 1 - - except Exception as e: - logger.error(f"Ошибка синхронизации пользователя {user.telegram_id} в панель: {e}") - stats["errors"] += 1 - - logger.info(f"✅ Синхронизация в панель завершена: создано {stats['created']}, обновлено {stats['updated']}, ошибок {stats['errors']}") + if user.remnawave_uuid: + update_kwargs = dict( + uuid=user.remnawave_uuid, + status=status, + expire_at=expire_at, + traffic_limit_bytes=create_kwargs['traffic_limit_bytes'], + traffic_limit_strategy=TrafficLimitStrategy.MONTH, + description=create_kwargs['description'], + active_internal_squads=subscription.connected_squads, + ) + + if hwid_limit is not None: + update_kwargs['hwid_device_limit'] = hwid_limit + + try: + await api.update_user(**update_kwargs) + stats["updated"] += 1 + except RemnaWaveAPIError as api_error: + if api_error.status_code == 404: + logger.warning( + "⚠️ Не найден пользователь %s в панели, создаем заново", + user.remnawave_uuid, + ) + + new_user = await api.create_user(**create_kwargs) + user.remnawave_uuid = new_user.uuid + subscription.remnawave_short_uuid = new_user.short_uuid + stats["created"] += 1 + else: + raise + else: + new_user = await api.create_user(**create_kwargs) + + user.remnawave_uuid = new_user.uuid + subscription.remnawave_short_uuid = new_user.short_uuid + + stats["created"] += 1 + + except Exception as e: + logger.error(f"Ошибка синхронизации пользователя {user.telegram_id} в панель: {e}") + stats["errors"] += 1 + + try: + await db.commit() + except Exception as commit_error: + logger.error( + "Ошибка фиксации транзакции при синхронизации в панель: %s", + commit_error, + ) + await db.rollback() + stats["errors"] += len(users) + + if len(users) < batch_size: + break + + offset += batch_size + + logger.info( + f"✅ Синхронизация в панель завершена: создано {stats['created']}, обновлено {stats['updated']}, ошибок {stats['errors']}" + ) return stats except Exception as e: