diff --git a/app/config.py b/app/config.py index 0a5c790e..036467d8 100644 --- a/app/config.py +++ b/app/config.py @@ -71,6 +71,8 @@ class Settings(BaseSettings): REMNAWAVE_AUTH_TYPE: str = "api_key" REMNAWAVE_USER_DESCRIPTION_TEMPLATE: str = "Bot user: {full_name} {username}" REMNAWAVE_USER_DELETE_MODE: str = "delete" # "delete" или "disable" + REMNAWAVE_AUTO_SYNC_ENABLED: bool = False + REMNAWAVE_AUTO_SYNC_TIMES: str = "03:00" TRIAL_DURATION_DAYS: int = 3 TRIAL_TRAFFIC_LIMIT_GB: int = 10 @@ -513,6 +515,42 @@ class Settings(BaseSettings): description = re.sub(r'\s+', ' ', description).strip() return description + @staticmethod + def parse_daily_time_list(raw_value: Optional[str]) -> List[time]: + if not raw_value: + return [] + + segments = re.split(r"[\s,;]+", raw_value.strip()) + seen: set[tuple[int, int]] = set() + parsed: List[time] = [] + + for segment in segments: + if not segment: + continue + + try: + hours_str, minutes_str = segment.split(":", 1) + hours = int(hours_str) + minutes = int(minutes_str) + except (ValueError, AttributeError): + continue + + if not (0 <= hours < 24 and 0 <= minutes < 60): + continue + + key = (hours, minutes) + if key in seen: + continue + + seen.add(key) + parsed.append(time(hour=hours, minute=minutes)) + + parsed.sort() + return parsed + + def get_remnawave_auto_sync_times(self) -> List[time]: + return self.parse_daily_time_list(self.REMNAWAVE_AUTO_SYNC_TIMES) + def get_display_name_banned_keywords(self) -> List[str]: raw_value = self.DISPLAY_NAME_BANNED_KEYWORDS if raw_value is None: diff --git a/app/handlers/admin/remnawave.py b/app/handlers/admin/remnawave.py index d6167a27..05f669c1 100644 --- a/app/handlers/admin/remnawave.py +++ b/app/handlers/admin/remnawave.py @@ -1,8 +1,16 @@ import logging import math +from datetime import datetime +from typing import Any, Dict, Optional + from aiogram import Dispatcher, types, F from aiogram.fsm.context import FSMContext -from app.states import SquadRenameStates, SquadCreateStates, SquadMigrationStates +from app.states import ( + RemnaWaveSyncStates, + SquadRenameStates, + SquadCreateStates, + SquadMigrationStates, +) from sqlalchemy.ext.asyncio import AsyncSession from app.config import settings @@ -19,6 +27,11 @@ from app.keyboards.admin import ( ) from app.localization.texts import get_texts from app.services.remnawave_service import RemnaWaveService, RemnaWaveConfigurationError +from app.services.remnawave_sync_service import ( + RemnaWaveAutoSyncStatus, + remnawave_sync_service, +) +from app.services.system_settings_service import bot_configuration_service from app.utils.decorators import admin_required, error_handler from app.utils.formatters import format_bytes, format_datetime @@ -30,6 +43,149 @@ squad_create_data = {} MIGRATION_PAGE_SIZE = 8 +def _format_duration(seconds: float) -> str: + if seconds < 1: + return "<1с" + + minutes, sec = divmod(int(seconds), 60) + if minutes: + if sec: + return f"{minutes} мин {sec} с" + return f"{minutes} мин" + return f"{sec} с" + + +def _format_user_stats(stats: Optional[Dict[str, Any]]) -> str: + if not stats: + return "—" + + created = stats.get("created", 0) + updated = stats.get("updated", 0) + deleted = stats.get("deleted", stats.get("deactivated", 0)) + errors = stats.get("errors", 0) + + return ( + f"• Создано: {created}\n" + f"• Обновлено: {updated}\n" + f"• Деактивировано: {deleted}\n" + f"• Ошибок: {errors}" + ) + + +def _format_server_stats(stats: Optional[Dict[str, Any]]) -> str: + if not stats: + return "—" + + created = stats.get("created", 0) + updated = stats.get("updated", 0) + removed = stats.get("removed", 0) + total = stats.get("total", 0) + + return ( + f"• Создано: {created}\n" + f"• Обновлено: {updated}\n" + f"• Удалено: {removed}\n" + f"• Всего в панели: {total}" + ) + + +def _build_auto_sync_view(status: RemnaWaveAutoSyncStatus) -> tuple[str, types.InlineKeyboardMarkup]: + times_text = ", ".join(t.strftime("%H:%M") for t in status.times) if status.times else "—" + next_run_text = format_datetime(status.next_run) if status.next_run else "—" + + if status.last_run_finished_at: + finished_text = format_datetime(status.last_run_finished_at) + started_text = ( + format_datetime(status.last_run_started_at) + if status.last_run_started_at + else "—" + ) + duration = ( + status.last_run_finished_at - status.last_run_started_at + if status.last_run_started_at + else None + ) + duration_text = f" ({_format_duration(duration.total_seconds())})" if duration else "" + reason_map = { + "manual": "вручную", + "auto": "по расписанию", + "immediate": "при включении", + } + reason_text = reason_map.get(status.last_run_reason or "", "—") + result_icon = "✅" if status.last_run_success else "❌" + result_label = "успешно" if status.last_run_success else "с ошибками" + error_block = ( + f"\n⚠️ Ошибка: {status.last_run_error}" + if status.last_run_error + else "" + ) + last_run_text = ( + f"{result_icon} {result_label}\n" + f"• Старт: {started_text}\n" + f"• Завершено: {finished_text}{duration_text}\n" + f"• Причина запуска: {reason_text}{error_block}" + ) + elif status.last_run_started_at: + last_run_text = ( + "⏳ Синхронизация началась, но еще не завершилась" + if status.is_running + else f"ℹ️ Последний запуск: {format_datetime(status.last_run_started_at)}" + ) + else: + last_run_text = "—" + + running_text = "⏳ Выполняется сейчас" if status.is_running else "Ожидание" + toggle_text = "❌ Отключить" if status.enabled else "✅ Включить" + + text = f"""🔄 Автосинхронизация RemnaWave + +⚙️ Статус: {'✅ Включена' if status.enabled else '❌ Отключена'} +🕒 Расписание: {times_text} +📅 Следующий запуск: {next_run_text if status.enabled else '—'} +⏱️ Состояние: {running_text} + +📊 Последний запуск: +{last_run_text} + +👥 Пользователи: +{_format_user_stats(status.last_user_stats)} + +🌐 Серверы: +{_format_server_stats(status.last_server_stats)} +""" + + keyboard = types.InlineKeyboardMarkup( + inline_keyboard=[ + [ + types.InlineKeyboardButton( + text="🔁 Запустить сейчас", + callback_data="remnawave_auto_sync_run", + ) + ], + [ + types.InlineKeyboardButton( + text=toggle_text, + callback_data="remnawave_auto_sync_toggle", + ) + ], + [ + types.InlineKeyboardButton( + text="🕒 Изменить расписание", + callback_data="remnawave_auto_sync_times", + ) + ], + [ + types.InlineKeyboardButton( + text="⬅️ Назад", + callback_data="admin_rw_sync", + ) + ], + ] + ) + + return text, keyboard + + def _format_migration_server_label(texts, server) -> str: status = ( texts.t("ADMIN_SQUAD_MIGRATION_STATUS_AVAILABLE", "✅ Доступен") @@ -2096,34 +2252,308 @@ async def show_sync_options( db_user: User, db: AsyncSession ): - text = """ -🔄 Синхронизация с Remnawave + status = remnawave_sync_service.get_status() + times_text = ", ".join(t.strftime("%H:%M") for t in status.times) if status.times else "—" + next_run_text = format_datetime(status.next_run) if status.next_run else "—" + last_result = "—" -🔄 Полная синхронизация выполняет: -• Создание новых пользователей из панели в боте -• Обновление данных существующих пользователей -• Деактивация подписок пользователей, отсутствующих в панели -• Сохранение балансов пользователей -• ⏱️ Время выполнения: 2-5 минут + if status.last_run_finished_at: + result_icon = "✅" if status.last_run_success else "❌" + result_label = "успешно" if status.last_run_success else "с ошибками" + finished_text = format_datetime(status.last_run_finished_at) + last_result = f"{result_icon} {result_label} ({finished_text})" + elif status.last_run_started_at: + last_result = f"⏳ Запущено {format_datetime(status.last_run_started_at)}" -⚠️ Важно: -• Во время синхронизации не выполняйте другие операции -• При полной синхронизации подписки пользователей, отсутствующих в панели, будут деактивированы -• Рекомендуется делать полную синхронизацию ежедневно -• Баланс пользователей НЕ удаляется -""" - - keyboard = [ - [types.InlineKeyboardButton(text="🔄 Запустить полную синхронизацию", callback_data="sync_all_users")], - [types.InlineKeyboardButton(text="⬅️ Назад", callback_data="admin_remnawave")] + status_lines = [ + f"⚙️ Статус: {'✅ Включена' if status.enabled else '❌ Отключена'}", + f"🕒 Расписание: {times_text}", + f"📅 Следующий запуск: {next_run_text if status.enabled else '—'}", + f"📊 Последний запуск: {last_result}", ] - + + text = ( + "🔄 Синхронизация с Remnawave\n\n" + "🔄 Полная синхронизация выполняет:\n" + "• Создание новых пользователей из панели в боте\n" + "• Обновление данных существующих пользователей\n" + "• Деактивация подписок пользователей, отсутствующих в панели\n" + "• Сохранение балансов пользователей\n" + "• ⏱️ Время выполнения: 2-5 минут\n\n" + "⚠️ Важно:\n" + "• Во время синхронизации не выполняйте другие операции\n" + "• При полной синхронизации подписки пользователей, отсутствующих в панели, будут деактивированы\n" + "• Рекомендуется делать полную синхронизацию ежедневно\n" + "• Баланс пользователей НЕ удаляется\n\n" + + "\n".join(status_lines) + ) + + keyboard = [ + [ + types.InlineKeyboardButton( + text="🔄 Запустить полную синхронизацию", + callback_data="sync_all_users", + ) + ], + [ + types.InlineKeyboardButton( + text="⚙️ Настройки автосинхронизации", + callback_data="admin_rw_auto_sync", + ) + ], + [types.InlineKeyboardButton(text="⬅️ Назад", callback_data="admin_remnawave")], + ] + await callback.message.edit_text( text, - reply_markup=types.InlineKeyboardMarkup(inline_keyboard=keyboard) + reply_markup=types.InlineKeyboardMarkup(inline_keyboard=keyboard), ) await callback.answer() + +@admin_required +@error_handler +async def show_auto_sync_settings( + callback: types.CallbackQuery, + db_user: User, + db: AsyncSession, + state: FSMContext, +): + await state.clear() + status = remnawave_sync_service.get_status() + text, keyboard = _build_auto_sync_view(status) + + await callback.message.edit_text( + text, + reply_markup=keyboard, + parse_mode="HTML", + ) + await callback.answer() + + +@admin_required +@error_handler +async def toggle_auto_sync_setting( + callback: types.CallbackQuery, + db_user: User, + db: AsyncSession, + state: FSMContext, +): + await state.clear() + new_value = not bool(settings.REMNAWAVE_AUTO_SYNC_ENABLED) + await bot_configuration_service.set_value( + db, + "REMNAWAVE_AUTO_SYNC_ENABLED", + new_value, + ) + + status = remnawave_sync_service.get_status() + text, keyboard = _build_auto_sync_view(status) + + await callback.message.edit_text( + text, + reply_markup=keyboard, + parse_mode="HTML", + ) + await callback.answer( + f"Автосинхронизация {'включена' if new_value else 'отключена'}" + ) + + +@admin_required +@error_handler +async def prompt_auto_sync_schedule( + callback: types.CallbackQuery, + db_user: User, + db: AsyncSession, + state: FSMContext, +): + status = remnawave_sync_service.get_status() + current_schedule = ", ".join(t.strftime("%H:%M") for t in status.times) if status.times else "—" + + instructions = ( + "🕒 Настройка расписания автосинхронизации\n\n" + "Укажите время запуска через запятую или с новой строки в формате HH:MM.\n" + f"Текущее расписание: {current_schedule}\n\n" + "Примеры: 03:00, 15:30 или 00:15\n06:00\n18:45\n\n" + "Отправьте отмена, чтобы вернуться без изменений." + ) + + await state.set_state(RemnaWaveSyncStates.waiting_for_schedule) + await state.update_data( + auto_sync_message_id=callback.message.message_id, + auto_sync_message_chat_id=callback.message.chat.id, + ) + + await callback.message.edit_text( + instructions, + parse_mode="HTML", + reply_markup=types.InlineKeyboardMarkup( + inline_keyboard=[ + [ + types.InlineKeyboardButton( + text="❌ Отмена", + callback_data="remnawave_auto_sync_cancel", + ) + ] + ] + ), + ) + await callback.answer() + + +@admin_required +@error_handler +async def cancel_auto_sync_schedule( + callback: types.CallbackQuery, + db_user: User, + db: AsyncSession, + state: FSMContext, +): + await state.clear() + status = remnawave_sync_service.get_status() + text, keyboard = _build_auto_sync_view(status) + + await callback.message.edit_text( + text, + reply_markup=keyboard, + parse_mode="HTML", + ) + await callback.answer("Изменение расписания отменено") + + +@admin_required +@error_handler +async def run_auto_sync_now( + callback: types.CallbackQuery, + db_user: User, + db: AsyncSession, + state: FSMContext, +): + if remnawave_sync_service.get_status().is_running: + await callback.answer("Синхронизация уже выполняется", show_alert=True) + return + + await state.clear() + await callback.message.edit_text( + "🔄 Запуск автосинхронизации...\n\nПодождите, это может занять несколько минут.", + parse_mode="HTML", + ) + await callback.answer("Автосинхронизация запущена") + + result = await remnawave_sync_service.run_sync_now(reason="manual") + status = remnawave_sync_service.get_status() + base_text, keyboard = _build_auto_sync_view(status) + + if not result.get("started"): + await callback.message.edit_text( + "⚠️ Синхронизация уже выполняется\n\n" + base_text, + reply_markup=keyboard, + parse_mode="HTML", + ) + return + + if result.get("success"): + user_stats = result.get("user_stats") or {} + server_stats = result.get("server_stats") or {} + summary = ( + "✅ Синхронизация завершена\n" + f"👥 Пользователи: создано {user_stats.get('created', 0)}, обновлено {user_stats.get('updated', 0)}, " + f"деактивировано {user_stats.get('deleted', user_stats.get('deactivated', 0))}, ошибок {user_stats.get('errors', 0)}\n" + f"🌐 Серверы: создано {server_stats.get('created', 0)}, обновлено {server_stats.get('updated', 0)}, удалено {server_stats.get('removed', 0)}\n\n" + ) + final_text = summary + base_text + await callback.message.edit_text( + final_text, + reply_markup=keyboard, + parse_mode="HTML", + ) + else: + error_text = result.get("error") or "Неизвестная ошибка" + summary = ( + "❌ Синхронизация завершилась с ошибкой\n" + f"Причина: {error_text}\n\n" + ) + await callback.message.edit_text( + summary + base_text, + reply_markup=keyboard, + parse_mode="HTML", + ) + + +@admin_required +@error_handler +async def save_auto_sync_schedule( + message: types.Message, + db_user: User, + db: AsyncSession, + state: FSMContext, +): + text = (message.text or "").strip() + data = await state.get_data() + + if text.lower() in {"отмена", "cancel"}: + await state.clear() + status = remnawave_sync_service.get_status() + view_text, keyboard = _build_auto_sync_view(status) + message_id = data.get("auto_sync_message_id") + chat_id = data.get("auto_sync_message_chat_id", message.chat.id) + if message_id: + await message.bot.edit_message_text( + view_text, + chat_id=chat_id, + message_id=message_id, + reply_markup=keyboard, + parse_mode="HTML", + ) + else: + await message.answer( + view_text, + reply_markup=keyboard, + parse_mode="HTML", + ) + await message.answer("Настройка расписания отменена") + return + + parsed_times = settings.parse_daily_time_list(text) + + if not parsed_times: + await message.answer( + "❌ Не удалось распознать время. Используйте формат HH:MM, например 03:00 или 18:45.", + ) + return + + normalized_value = ", ".join(t.strftime("%H:%M") for t in parsed_times) + await bot_configuration_service.set_value( + db, + "REMNAWAVE_AUTO_SYNC_TIMES", + normalized_value, + ) + + status = remnawave_sync_service.get_status() + view_text, keyboard = _build_auto_sync_view(status) + message_id = data.get("auto_sync_message_id") + chat_id = data.get("auto_sync_message_chat_id", message.chat.id) + + if message_id: + await message.bot.edit_message_text( + view_text, + chat_id=chat_id, + message_id=message_id, + reply_markup=keyboard, + parse_mode="HTML", + ) + else: + await message.answer( + view_text, + reply_markup=keyboard, + parse_mode="HTML", + ) + + await state.clear() + await message.answer("✅ Расписание автосинхронизации обновлено") + + @admin_required @error_handler async def sync_all_users( @@ -2688,6 +3118,11 @@ def register_handlers(dp: Dispatcher): dp.callback_query.register(manage_node, F.data.startswith("node_restart_")) dp.callback_query.register(restart_all_nodes, F.data == "admin_restart_all_nodes") dp.callback_query.register(show_sync_options, F.data == "admin_rw_sync") + dp.callback_query.register(show_auto_sync_settings, F.data == "admin_rw_auto_sync") + dp.callback_query.register(toggle_auto_sync_setting, F.data == "remnawave_auto_sync_toggle") + dp.callback_query.register(prompt_auto_sync_schedule, F.data == "remnawave_auto_sync_times") + dp.callback_query.register(cancel_auto_sync_schedule, F.data == "remnawave_auto_sync_cancel") + dp.callback_query.register(run_auto_sync_now, F.data == "remnawave_auto_sync_run") dp.callback_query.register(sync_all_users, F.data == "sync_all_users") dp.callback_query.register(show_squad_migration_menu, F.data == "admin_rw_migration") dp.callback_query.register(paginate_migration_source, F.data.startswith("admin_migration_source_page_")) @@ -2716,13 +3151,19 @@ def register_handlers(dp: Dispatcher): dp.callback_query.register(finish_squad_creation, F.data == "create_squad_finish") dp.message.register( - process_squad_new_name, + process_squad_new_name, SquadRenameStates.waiting_for_new_name, F.text ) - + dp.message.register( process_squad_name, SquadCreateStates.waiting_for_name, F.text ) + + dp.message.register( + save_auto_sync_schedule, + RemnaWaveSyncStates.waiting_for_schedule, + F.text, + ) diff --git a/app/services/remnawave_sync_service.py b/app/services/remnawave_sync_service.py new file mode 100644 index 00000000..ba018c5d --- /dev/null +++ b/app/services/remnawave_sync_service.py @@ -0,0 +1,264 @@ +import asyncio +import logging +from dataclasses import dataclass +from datetime import datetime, timedelta, time +from typing import Any, Dict, List, Optional, Tuple + +from sqlalchemy.ext.asyncio import AsyncSession + +from app.config import settings +from app.database.database import AsyncSessionLocal +from app.database.crud.server_squad import sync_with_remnawave +from app.services.remnawave_service import ( + RemnaWaveConfigurationError, + RemnaWaveService, +) +from app.utils.cache import cache + + +logger = logging.getLogger(__name__) + + +@dataclass(frozen=True) +class RemnaWaveAutoSyncStatus: + enabled: bool + times: List[time] + next_run: Optional[datetime] + last_run_started_at: Optional[datetime] + last_run_finished_at: Optional[datetime] + last_run_success: Optional[bool] + last_run_reason: Optional[str] + last_run_error: Optional[str] + last_user_stats: Optional[Dict[str, Any]] + last_server_stats: Optional[Dict[str, Any]] + is_running: bool + + +class RemnaWaveAutoSyncService: + def __init__(self) -> None: + self._scheduler_task: Optional[asyncio.Task] = None + self._scheduler_lock = asyncio.Lock() + self._sync_lock = asyncio.Lock() + self._service = RemnaWaveService() + + self._loop: Optional[asyncio.AbstractEventLoop] = None + self._initialized = False + self._pending_refresh = False + self._pending_run_immediately = False + + self._next_run: Optional[datetime] = None + self._last_run_started_at: Optional[datetime] = None + self._last_run_finished_at: Optional[datetime] = None + self._last_run_success: Optional[bool] = None + self._last_run_reason: Optional[str] = None + self._last_run_error: Optional[str] = None + self._last_user_stats: Optional[Dict[str, Any]] = None + self._last_server_stats: Optional[Dict[str, Any]] = None + + async def initialize(self) -> None: + self._loop = asyncio.get_running_loop() + self._initialized = True + + run_immediately = self._pending_run_immediately + if self._pending_refresh: + self._pending_refresh = False + self._pending_run_immediately = False + await self.refresh_schedule(run_immediately=run_immediately) + else: + await self.refresh_schedule() + + async def refresh_schedule(self, *, run_immediately: bool = False) -> None: + async with self._scheduler_lock: + if self._scheduler_task and not self._scheduler_task.done(): + self._scheduler_task.cancel() + try: + await self._scheduler_task + except asyncio.CancelledError: + pass + finally: + self._scheduler_task = None + + if not settings.REMNAWAVE_AUTO_SYNC_ENABLED: + self._next_run = None + return + + times = settings.get_remnawave_auto_sync_times() + if not times: + logger.warning( + "⚠️ Автосинхронизация включена, но расписание пустое. Укажите время запуска." + ) + self._next_run = None + return + + self._scheduler_task = asyncio.create_task(self._run_scheduler(times)) + + if run_immediately: + asyncio.create_task(self.run_sync_now(reason="immediate")) + + def schedule_refresh(self, *, run_immediately: bool = False) -> None: + if not self._initialized: + self._pending_refresh = True + if run_immediately: + self._pending_run_immediately = True + return + + loop = self._loop or asyncio.get_running_loop() + loop.create_task(self.refresh_schedule(run_immediately=run_immediately)) + + async def stop(self) -> None: + async with self._scheduler_lock: + if self._scheduler_task and not self._scheduler_task.done(): + self._scheduler_task.cancel() + try: + await self._scheduler_task + except asyncio.CancelledError: + pass + self._scheduler_task = None + self._next_run = None + + async def run_sync_now(self, *, reason: str = "manual") -> Dict[str, Any]: + if self._sync_lock.locked(): + return {"started": False, "reason": "already_running"} + + async with self._sync_lock: + self._last_run_started_at = datetime.utcnow() + self._last_run_finished_at = None + self._last_run_reason = reason + self._last_run_error = None + self._last_run_success = None + + try: + user_stats, server_stats = await self._perform_sync() + except RemnaWaveConfigurationError as error: + message = str(error) + self._last_run_error = message + self._last_run_success = False + self._last_user_stats = None + self._last_server_stats = None + self._last_run_finished_at = datetime.utcnow() + logger.error("❌ Автосинхронизация RemnaWave: %s", message) + return { + "started": True, + "success": False, + "error": message, + "user_stats": None, + "server_stats": None, + } + except Exception as error: + message = str(error) + self._last_run_error = message + self._last_run_success = False + self._last_user_stats = None + self._last_server_stats = None + self._last_run_finished_at = datetime.utcnow() + logger.exception("❌ Ошибка автосинхронизации RemnaWave: %s", error) + return { + "started": True, + "success": False, + "error": message, + "user_stats": None, + "server_stats": None, + } + + self._last_run_success = True + self._last_run_error = None + self._last_user_stats = user_stats + self._last_server_stats = server_stats + self._last_run_finished_at = datetime.utcnow() + + return { + "started": True, + "success": True, + "error": None, + "user_stats": user_stats, + "server_stats": server_stats, + } + + def get_status(self) -> RemnaWaveAutoSyncStatus: + times = settings.get_remnawave_auto_sync_times() + enabled = settings.REMNAWAVE_AUTO_SYNC_ENABLED and bool(times) + + return RemnaWaveAutoSyncStatus( + enabled=enabled, + times=times, + next_run=self._next_run, + last_run_started_at=self._last_run_started_at, + last_run_finished_at=self._last_run_finished_at, + last_run_success=self._last_run_success, + last_run_reason=self._last_run_reason, + last_run_error=self._last_run_error, + last_user_stats=self._last_user_stats, + last_server_stats=self._last_server_stats, + is_running=self._sync_lock.locked(), + ) + + async def _run_scheduler(self, times: List[time]) -> None: + try: + while True: + next_run = self._calculate_next_run(times) + self._next_run = next_run + + delay = (next_run - datetime.utcnow()).total_seconds() + if delay > 0: + await asyncio.sleep(delay) + + await self.run_sync_now(reason="auto") + except asyncio.CancelledError: + raise + finally: + self._next_run = None + + async def _perform_sync(self) -> Tuple[Dict[str, Any], Dict[str, Any]]: + if not self._service.is_configured: + raise RemnaWaveConfigurationError( + self._service.configuration_error or "RemnaWave API не настроен" + ) + + async with AsyncSessionLocal() as session: + user_stats = await self._service.sync_users_from_panel(session, "all") + server_stats = await self._sync_servers(session) + + return user_stats, server_stats + + async def _sync_servers(self, session: AsyncSession) -> Dict[str, Any]: + squads = await self._service.get_all_squads() + + if not squads: + logger.warning("⚠️ Не удалось получить сквады из RemnaWave для автосинхронизации") + return {"created": 0, "updated": 0, "removed": 0, "total": 0} + + created, updated, removed = await sync_with_remnawave(session, squads) + + try: + await cache.delete_pattern("available_countries*") + except Exception as error: + logger.warning("⚠️ Не удалось очистить кеш стран после автосинхронизации: %s", error) + + return { + "created": created, + "updated": updated, + "removed": removed, + "total": len(squads), + } + + @staticmethod + def _calculate_next_run(times: List[time]) -> datetime: + now = datetime.utcnow() + today = now.date() + + for scheduled in sorted(times): + candidate = datetime.combine(today, scheduled) + if candidate > now: + return candidate + + first_time = sorted(times)[0] + next_day = today + timedelta(days=1) + return datetime.combine(next_day, first_time) + + +def _create_service() -> RemnaWaveAutoSyncService: + service = RemnaWaveAutoSyncService() + return service + + +remnawave_sync_service = _create_service() diff --git a/app/services/system_settings_service.py b/app/services/system_settings_service.py index 8e43883f..cfd309ca 100644 --- a/app/services/system_settings_service.py +++ b/app/services/system_settings_service.py @@ -248,6 +248,8 @@ class BotConfigurationService: "VERSION_CHECK_INTERVAL_HOURS": "VERSION", "TELEGRAM_STARS_RATE_RUB": "TELEGRAM", "REMNAWAVE_USER_DESCRIPTION_TEMPLATE": "REMNAWAVE", + "REMNAWAVE_AUTO_SYNC_ENABLED": "REMNAWAVE", + "REMNAWAVE_AUTO_SYNC_TIMES": "REMNAWAVE", } CATEGORY_PREFIX_OVERRIDES: Dict[str, str] = { @@ -425,6 +427,20 @@ class BotConfigurationService: "warning": "Недоступный адрес приведет к ошибкам при управлении VPN-учетками.", "dependencies": "REMNAWAVE_API_KEY или REMNAWAVE_USERNAME/REMNAWAVE_PASSWORD", }, + "REMNAWAVE_AUTO_SYNC_ENABLED": { + "description": "Автоматически запускает синхронизацию пользователей и серверов с панелью RemnaWave.", + "format": "Булево значение.", + "example": "Включено при корректно настроенных API-ключах.", + "warning": "При включении без расписания синхронизация не будет выполнена.", + "dependencies": "REMNAWAVE_AUTO_SYNC_TIMES", + }, + "REMNAWAVE_AUTO_SYNC_TIMES": { + "description": "Список времени в формате HH:MM, когда запускается автосинхронизация в течение суток.", + "format": "Перечислите время через запятую или с новой строки (например, 03:00, 15:00).", + "example": "03:00, 15:00", + "warning": "Минимальный интервал между запусками не ограничен, но слишком частые синхронизации нагружают панель.", + "dependencies": "REMNAWAVE_AUTO_SYNC_ENABLED", + }, "EXTERNAL_ADMIN_TOKEN": { "description": "Приватный токен, который использует внешняя админка для проверки запросов.", "format": "Значение генерируется автоматически из username бота и его токена и доступно только для чтения.", @@ -964,6 +980,18 @@ class BotConfigurationService: refresh_period_prices() elif key.startswith("PRICE_TRAFFIC_") or key == "TRAFFIC_PACKAGES_CONFIG": refresh_traffic_prices() + elif key in {"REMNAWAVE_AUTO_SYNC_ENABLED", "REMNAWAVE_AUTO_SYNC_TIMES"}: + try: + from app.services.remnawave_sync_service import remnawave_sync_service + + remnawave_sync_service.schedule_refresh( + run_immediately=(key == "REMNAWAVE_AUTO_SYNC_ENABLED" and bool(value)) + ) + except Exception as error: + logger.error( + "Не удалось обновить сервис автосинхронизации RemnaWave: %s", + error, + ) except Exception as error: logger.error("Не удалось применить значение %s=%s: %s", key, value, error) diff --git a/app/states.py b/app/states.py index 0d10a629..7ab82d76 100644 --- a/app/states.py +++ b/app/states.py @@ -173,6 +173,10 @@ class SquadMigrationStates(StatesGroup): confirming = State() +class RemnaWaveSyncStates(StatesGroup): + waiting_for_schedule = State() + + class AdminSubmenuStates(StatesGroup): in_users_submenu = State() in_promo_submenu = State() diff --git a/main.py b/main.py index 7c0d92c4..337903fc 100644 --- a/main.py +++ b/main.py @@ -20,6 +20,7 @@ from app.external.pal24_webhook import start_pal24_webhook_server, Pal24WebhookS from app.database.universal_migration import run_universal_migration from app.services.backup_service import backup_service from app.services.reporting_service import reporting_service +from app.services.remnawave_sync_service import remnawave_sync_service from app.localization.loader import ensure_locale_templates from app.services.system_settings_service import bot_configuration_service from app.services.external_admin_service import ensure_external_admin_token @@ -185,6 +186,29 @@ async def main(): stage.warning(f"Ошибка запуска сервиса отчетов: {e}") logger.error(f"❌ Ошибка запуска сервиса отчетов: {e}") + async with timeline.stage( + "Автосинхронизация RemnaWave", + "🔄", + success_message="Сервис автосинхронизации готов", + ) as stage: + try: + await remnawave_sync_service.initialize() + status = remnawave_sync_service.get_status() + if status.enabled: + times_text = ", ".join(t.strftime("%H:%M") for t in status.times) or "—" + if status.next_run: + next_run_text = status.next_run.strftime("%d.%m.%Y %H:%M") + stage.log( + f"Активирована: расписание {times_text}, ближайший запуск {next_run_text}" + ) + else: + stage.log(f"Активирована: расписание {times_text}") + else: + stage.log("Автосинхронизация отключена настройками") + except Exception as e: + stage.warning(f"Ошибка запуска автосинхронизации: {e}") + logger.error(f"❌ Ошибка запуска автосинхронизации RemnaWave: {e}") + payment_service = PaymentService(bot) async with timeline.stage( @@ -459,6 +483,12 @@ async def main(): except Exception as e: logger.error(f"Ошибка остановки сервиса отчетов: {e}") + logger.info("ℹ️ Остановка сервиса автосинхронизации RemnaWave...") + try: + await remnawave_sync_service.stop() + except Exception as e: + logger.error(f"Ошибка остановки автосинхронизации RemnaWave: {e}") + logger.info("ℹ️ Остановка сервиса бекапов...") try: await backup_service.stop_auto_backup() diff --git a/tests/utils/test_remnawave_auto_sync.py b/tests/utils/test_remnawave_auto_sync.py new file mode 100644 index 00000000..96401df1 --- /dev/null +++ b/tests/utils/test_remnawave_auto_sync.py @@ -0,0 +1,52 @@ +from datetime import datetime, time as time_cls +from types import SimpleNamespace + +import pytest + +from app.config import settings +from app.services.remnawave_sync_service import RemnaWaveAutoSyncService + + +@pytest.mark.parametrize( + "raw, expected", + [ + ("03:00, 15:30 03:00; 07:05", [time_cls(3, 0), time_cls(7, 5), time_cls(15, 30)]), + ("", []), + (None, []), + ("25:00, 10:70, test, 09:15", [time_cls(9, 15)]), + ], +) +def test_parse_daily_time_list(raw, expected): + assert settings.parse_daily_time_list(raw) == expected + + +def _patch_datetime(monkeypatch, current): + real_datetime = datetime + + monkeypatch.setattr( + "app.services.remnawave_sync_service.datetime", + SimpleNamespace( + utcnow=lambda: current, + combine=lambda date_obj, time_obj: real_datetime.combine(date_obj, time_obj), + ), + ) + + +def test_calculate_next_run_same_day(monkeypatch): + service = RemnaWaveAutoSyncService() + current = datetime(2024, 1, 1, 2, 30) + _patch_datetime(monkeypatch, current) + + next_run = service._calculate_next_run([time_cls(1, 0), time_cls(3, 0)]) + + assert next_run == datetime(2024, 1, 1, 3, 0) + + +def test_calculate_next_run_rollover(monkeypatch): + service = RemnaWaveAutoSyncService() + current = datetime(2024, 1, 1, 23, 45) + _patch_datetime(monkeypatch, current) + + next_run = service._calculate_next_run([time_cls(1, 0), time_cls(10, 0)]) + + assert next_run == datetime(2024, 1, 2, 1, 0)