mirror of
https://github.com/BEDOLAGA-DEV/remnawave-bedolaga-telegram-bot.git
synced 2026-02-22 04:12:09 +00:00
Fix manual auto-sync callback timing
This commit is contained in:
@@ -71,6 +71,8 @@ class Settings(BaseSettings):
|
||||
REMNAWAVE_AUTH_TYPE: str = "api_key"
|
||||
REMNAWAVE_USER_DESCRIPTION_TEMPLATE: str = "Bot user: {full_name} {username}"
|
||||
REMNAWAVE_USER_DELETE_MODE: str = "delete" # "delete" или "disable"
|
||||
REMNAWAVE_AUTO_SYNC_ENABLED: bool = False
|
||||
REMNAWAVE_AUTO_SYNC_TIMES: str = "03:00"
|
||||
|
||||
TRIAL_DURATION_DAYS: int = 3
|
||||
TRIAL_TRAFFIC_LIMIT_GB: int = 10
|
||||
@@ -513,6 +515,42 @@ class Settings(BaseSettings):
|
||||
description = re.sub(r'\s+', ' ', description).strip()
|
||||
return description
|
||||
|
||||
@staticmethod
|
||||
def parse_daily_time_list(raw_value: Optional[str]) -> List[time]:
|
||||
if not raw_value:
|
||||
return []
|
||||
|
||||
segments = re.split(r"[\s,;]+", raw_value.strip())
|
||||
seen: set[tuple[int, int]] = set()
|
||||
parsed: List[time] = []
|
||||
|
||||
for segment in segments:
|
||||
if not segment:
|
||||
continue
|
||||
|
||||
try:
|
||||
hours_str, minutes_str = segment.split(":", 1)
|
||||
hours = int(hours_str)
|
||||
minutes = int(minutes_str)
|
||||
except (ValueError, AttributeError):
|
||||
continue
|
||||
|
||||
if not (0 <= hours < 24 and 0 <= minutes < 60):
|
||||
continue
|
||||
|
||||
key = (hours, minutes)
|
||||
if key in seen:
|
||||
continue
|
||||
|
||||
seen.add(key)
|
||||
parsed.append(time(hour=hours, minute=minutes))
|
||||
|
||||
parsed.sort()
|
||||
return parsed
|
||||
|
||||
def get_remnawave_auto_sync_times(self) -> List[time]:
|
||||
return self.parse_daily_time_list(self.REMNAWAVE_AUTO_SYNC_TIMES)
|
||||
|
||||
def get_display_name_banned_keywords(self) -> List[str]:
|
||||
raw_value = self.DISPLAY_NAME_BANNED_KEYWORDS
|
||||
if raw_value is None:
|
||||
|
||||
@@ -1,8 +1,16 @@
|
||||
import logging
|
||||
import math
|
||||
from datetime import datetime
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
from aiogram import Dispatcher, types, F
|
||||
from aiogram.fsm.context import FSMContext
|
||||
from app.states import SquadRenameStates, SquadCreateStates, SquadMigrationStates
|
||||
from app.states import (
|
||||
RemnaWaveSyncStates,
|
||||
SquadRenameStates,
|
||||
SquadCreateStates,
|
||||
SquadMigrationStates,
|
||||
)
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.config import settings
|
||||
@@ -19,6 +27,11 @@ from app.keyboards.admin import (
|
||||
)
|
||||
from app.localization.texts import get_texts
|
||||
from app.services.remnawave_service import RemnaWaveService, RemnaWaveConfigurationError
|
||||
from app.services.remnawave_sync_service import (
|
||||
RemnaWaveAutoSyncStatus,
|
||||
remnawave_sync_service,
|
||||
)
|
||||
from app.services.system_settings_service import bot_configuration_service
|
||||
from app.utils.decorators import admin_required, error_handler
|
||||
from app.utils.formatters import format_bytes, format_datetime
|
||||
|
||||
@@ -30,6 +43,149 @@ squad_create_data = {}
|
||||
MIGRATION_PAGE_SIZE = 8
|
||||
|
||||
|
||||
def _format_duration(seconds: float) -> str:
|
||||
if seconds < 1:
|
||||
return "<1с"
|
||||
|
||||
minutes, sec = divmod(int(seconds), 60)
|
||||
if minutes:
|
||||
if sec:
|
||||
return f"{minutes} мин {sec} с"
|
||||
return f"{minutes} мин"
|
||||
return f"{sec} с"
|
||||
|
||||
|
||||
def _format_user_stats(stats: Optional[Dict[str, Any]]) -> str:
|
||||
if not stats:
|
||||
return "—"
|
||||
|
||||
created = stats.get("created", 0)
|
||||
updated = stats.get("updated", 0)
|
||||
deleted = stats.get("deleted", stats.get("deactivated", 0))
|
||||
errors = stats.get("errors", 0)
|
||||
|
||||
return (
|
||||
f"• Создано: {created}\n"
|
||||
f"• Обновлено: {updated}\n"
|
||||
f"• Деактивировано: {deleted}\n"
|
||||
f"• Ошибок: {errors}"
|
||||
)
|
||||
|
||||
|
||||
def _format_server_stats(stats: Optional[Dict[str, Any]]) -> str:
|
||||
if not stats:
|
||||
return "—"
|
||||
|
||||
created = stats.get("created", 0)
|
||||
updated = stats.get("updated", 0)
|
||||
removed = stats.get("removed", 0)
|
||||
total = stats.get("total", 0)
|
||||
|
||||
return (
|
||||
f"• Создано: {created}\n"
|
||||
f"• Обновлено: {updated}\n"
|
||||
f"• Удалено: {removed}\n"
|
||||
f"• Всего в панели: {total}"
|
||||
)
|
||||
|
||||
|
||||
def _build_auto_sync_view(status: RemnaWaveAutoSyncStatus) -> tuple[str, types.InlineKeyboardMarkup]:
|
||||
times_text = ", ".join(t.strftime("%H:%M") for t in status.times) if status.times else "—"
|
||||
next_run_text = format_datetime(status.next_run) if status.next_run else "—"
|
||||
|
||||
if status.last_run_finished_at:
|
||||
finished_text = format_datetime(status.last_run_finished_at)
|
||||
started_text = (
|
||||
format_datetime(status.last_run_started_at)
|
||||
if status.last_run_started_at
|
||||
else "—"
|
||||
)
|
||||
duration = (
|
||||
status.last_run_finished_at - status.last_run_started_at
|
||||
if status.last_run_started_at
|
||||
else None
|
||||
)
|
||||
duration_text = f" ({_format_duration(duration.total_seconds())})" if duration else ""
|
||||
reason_map = {
|
||||
"manual": "вручную",
|
||||
"auto": "по расписанию",
|
||||
"immediate": "при включении",
|
||||
}
|
||||
reason_text = reason_map.get(status.last_run_reason or "", "—")
|
||||
result_icon = "✅" if status.last_run_success else "❌"
|
||||
result_label = "успешно" if status.last_run_success else "с ошибками"
|
||||
error_block = (
|
||||
f"\n⚠️ Ошибка: {status.last_run_error}"
|
||||
if status.last_run_error
|
||||
else ""
|
||||
)
|
||||
last_run_text = (
|
||||
f"{result_icon} {result_label}\n"
|
||||
f"• Старт: {started_text}\n"
|
||||
f"• Завершено: {finished_text}{duration_text}\n"
|
||||
f"• Причина запуска: {reason_text}{error_block}"
|
||||
)
|
||||
elif status.last_run_started_at:
|
||||
last_run_text = (
|
||||
"⏳ Синхронизация началась, но еще не завершилась"
|
||||
if status.is_running
|
||||
else f"ℹ️ Последний запуск: {format_datetime(status.last_run_started_at)}"
|
||||
)
|
||||
else:
|
||||
last_run_text = "—"
|
||||
|
||||
running_text = "⏳ Выполняется сейчас" if status.is_running else "Ожидание"
|
||||
toggle_text = "❌ Отключить" if status.enabled else "✅ Включить"
|
||||
|
||||
text = f"""🔄 <b>Автосинхронизация RemnaWave</b>
|
||||
|
||||
⚙️ <b>Статус:</b> {'✅ Включена' if status.enabled else '❌ Отключена'}
|
||||
🕒 <b>Расписание:</b> {times_text}
|
||||
📅 <b>Следующий запуск:</b> {next_run_text if status.enabled else '—'}
|
||||
⏱️ <b>Состояние:</b> {running_text}
|
||||
|
||||
📊 <b>Последний запуск:</b>
|
||||
{last_run_text}
|
||||
|
||||
👥 <b>Пользователи:</b>
|
||||
{_format_user_stats(status.last_user_stats)}
|
||||
|
||||
🌐 <b>Серверы:</b>
|
||||
{_format_server_stats(status.last_server_stats)}
|
||||
"""
|
||||
|
||||
keyboard = types.InlineKeyboardMarkup(
|
||||
inline_keyboard=[
|
||||
[
|
||||
types.InlineKeyboardButton(
|
||||
text="🔁 Запустить сейчас",
|
||||
callback_data="remnawave_auto_sync_run",
|
||||
)
|
||||
],
|
||||
[
|
||||
types.InlineKeyboardButton(
|
||||
text=toggle_text,
|
||||
callback_data="remnawave_auto_sync_toggle",
|
||||
)
|
||||
],
|
||||
[
|
||||
types.InlineKeyboardButton(
|
||||
text="🕒 Изменить расписание",
|
||||
callback_data="remnawave_auto_sync_times",
|
||||
)
|
||||
],
|
||||
[
|
||||
types.InlineKeyboardButton(
|
||||
text="⬅️ Назад",
|
||||
callback_data="admin_rw_sync",
|
||||
)
|
||||
],
|
||||
]
|
||||
)
|
||||
|
||||
return text, keyboard
|
||||
|
||||
|
||||
def _format_migration_server_label(texts, server) -> str:
|
||||
status = (
|
||||
texts.t("ADMIN_SQUAD_MIGRATION_STATUS_AVAILABLE", "✅ Доступен")
|
||||
@@ -2096,34 +2252,308 @@ async def show_sync_options(
|
||||
db_user: User,
|
||||
db: AsyncSession
|
||||
):
|
||||
text = """
|
||||
🔄 <b>Синхронизация с Remnawave</b>
|
||||
status = remnawave_sync_service.get_status()
|
||||
times_text = ", ".join(t.strftime("%H:%M") for t in status.times) if status.times else "—"
|
||||
next_run_text = format_datetime(status.next_run) if status.next_run else "—"
|
||||
last_result = "—"
|
||||
|
||||
🔄 <b>Полная синхронизация выполняет:</b>
|
||||
• Создание новых пользователей из панели в боте
|
||||
• Обновление данных существующих пользователей
|
||||
• Деактивация подписок пользователей, отсутствующих в панели
|
||||
• Сохранение балансов пользователей
|
||||
• ⏱️ Время выполнения: 2-5 минут
|
||||
if status.last_run_finished_at:
|
||||
result_icon = "✅" if status.last_run_success else "❌"
|
||||
result_label = "успешно" if status.last_run_success else "с ошибками"
|
||||
finished_text = format_datetime(status.last_run_finished_at)
|
||||
last_result = f"{result_icon} {result_label} ({finished_text})"
|
||||
elif status.last_run_started_at:
|
||||
last_result = f"⏳ Запущено {format_datetime(status.last_run_started_at)}"
|
||||
|
||||
⚠️ <b>Важно:</b>
|
||||
• Во время синхронизации не выполняйте другие операции
|
||||
• При полной синхронизации подписки пользователей, отсутствующих в панели, будут деактивированы
|
||||
• Рекомендуется делать полную синхронизацию ежедневно
|
||||
• Баланс пользователей НЕ удаляется
|
||||
"""
|
||||
|
||||
keyboard = [
|
||||
[types.InlineKeyboardButton(text="🔄 Запустить полную синхронизацию", callback_data="sync_all_users")],
|
||||
[types.InlineKeyboardButton(text="⬅️ Назад", callback_data="admin_remnawave")]
|
||||
status_lines = [
|
||||
f"⚙️ Статус: {'✅ Включена' if status.enabled else '❌ Отключена'}",
|
||||
f"🕒 Расписание: {times_text}",
|
||||
f"📅 Следующий запуск: {next_run_text if status.enabled else '—'}",
|
||||
f"📊 Последний запуск: {last_result}",
|
||||
]
|
||||
|
||||
|
||||
text = (
|
||||
"🔄 <b>Синхронизация с Remnawave</b>\n\n"
|
||||
"🔄 <b>Полная синхронизация выполняет:</b>\n"
|
||||
"• Создание новых пользователей из панели в боте\n"
|
||||
"• Обновление данных существующих пользователей\n"
|
||||
"• Деактивация подписок пользователей, отсутствующих в панели\n"
|
||||
"• Сохранение балансов пользователей\n"
|
||||
"• ⏱️ Время выполнения: 2-5 минут\n\n"
|
||||
"⚠️ <b>Важно:</b>\n"
|
||||
"• Во время синхронизации не выполняйте другие операции\n"
|
||||
"• При полной синхронизации подписки пользователей, отсутствующих в панели, будут деактивированы\n"
|
||||
"• Рекомендуется делать полную синхронизацию ежедневно\n"
|
||||
"• Баланс пользователей НЕ удаляется\n\n"
|
||||
+ "\n".join(status_lines)
|
||||
)
|
||||
|
||||
keyboard = [
|
||||
[
|
||||
types.InlineKeyboardButton(
|
||||
text="🔄 Запустить полную синхронизацию",
|
||||
callback_data="sync_all_users",
|
||||
)
|
||||
],
|
||||
[
|
||||
types.InlineKeyboardButton(
|
||||
text="⚙️ Настройки автосинхронизации",
|
||||
callback_data="admin_rw_auto_sync",
|
||||
)
|
||||
],
|
||||
[types.InlineKeyboardButton(text="⬅️ Назад", callback_data="admin_remnawave")],
|
||||
]
|
||||
|
||||
await callback.message.edit_text(
|
||||
text,
|
||||
reply_markup=types.InlineKeyboardMarkup(inline_keyboard=keyboard)
|
||||
reply_markup=types.InlineKeyboardMarkup(inline_keyboard=keyboard),
|
||||
)
|
||||
await callback.answer()
|
||||
|
||||
|
||||
@admin_required
|
||||
@error_handler
|
||||
async def show_auto_sync_settings(
|
||||
callback: types.CallbackQuery,
|
||||
db_user: User,
|
||||
db: AsyncSession,
|
||||
state: FSMContext,
|
||||
):
|
||||
await state.clear()
|
||||
status = remnawave_sync_service.get_status()
|
||||
text, keyboard = _build_auto_sync_view(status)
|
||||
|
||||
await callback.message.edit_text(
|
||||
text,
|
||||
reply_markup=keyboard,
|
||||
parse_mode="HTML",
|
||||
)
|
||||
await callback.answer()
|
||||
|
||||
|
||||
@admin_required
|
||||
@error_handler
|
||||
async def toggle_auto_sync_setting(
|
||||
callback: types.CallbackQuery,
|
||||
db_user: User,
|
||||
db: AsyncSession,
|
||||
state: FSMContext,
|
||||
):
|
||||
await state.clear()
|
||||
new_value = not bool(settings.REMNAWAVE_AUTO_SYNC_ENABLED)
|
||||
await bot_configuration_service.set_value(
|
||||
db,
|
||||
"REMNAWAVE_AUTO_SYNC_ENABLED",
|
||||
new_value,
|
||||
)
|
||||
|
||||
status = remnawave_sync_service.get_status()
|
||||
text, keyboard = _build_auto_sync_view(status)
|
||||
|
||||
await callback.message.edit_text(
|
||||
text,
|
||||
reply_markup=keyboard,
|
||||
parse_mode="HTML",
|
||||
)
|
||||
await callback.answer(
|
||||
f"Автосинхронизация {'включена' if new_value else 'отключена'}"
|
||||
)
|
||||
|
||||
|
||||
@admin_required
|
||||
@error_handler
|
||||
async def prompt_auto_sync_schedule(
|
||||
callback: types.CallbackQuery,
|
||||
db_user: User,
|
||||
db: AsyncSession,
|
||||
state: FSMContext,
|
||||
):
|
||||
status = remnawave_sync_service.get_status()
|
||||
current_schedule = ", ".join(t.strftime("%H:%M") for t in status.times) if status.times else "—"
|
||||
|
||||
instructions = (
|
||||
"🕒 <b>Настройка расписания автосинхронизации</b>\n\n"
|
||||
"Укажите время запуска через запятую или с новой строки в формате HH:MM.\n"
|
||||
f"Текущее расписание: <code>{current_schedule}</code>\n\n"
|
||||
"Примеры: <code>03:00, 15:30</code> или <code>00:15\n06:00\n18:45</code>\n\n"
|
||||
"Отправьте <b>отмена</b>, чтобы вернуться без изменений."
|
||||
)
|
||||
|
||||
await state.set_state(RemnaWaveSyncStates.waiting_for_schedule)
|
||||
await state.update_data(
|
||||
auto_sync_message_id=callback.message.message_id,
|
||||
auto_sync_message_chat_id=callback.message.chat.id,
|
||||
)
|
||||
|
||||
await callback.message.edit_text(
|
||||
instructions,
|
||||
parse_mode="HTML",
|
||||
reply_markup=types.InlineKeyboardMarkup(
|
||||
inline_keyboard=[
|
||||
[
|
||||
types.InlineKeyboardButton(
|
||||
text="❌ Отмена",
|
||||
callback_data="remnawave_auto_sync_cancel",
|
||||
)
|
||||
]
|
||||
]
|
||||
),
|
||||
)
|
||||
await callback.answer()
|
||||
|
||||
|
||||
@admin_required
|
||||
@error_handler
|
||||
async def cancel_auto_sync_schedule(
|
||||
callback: types.CallbackQuery,
|
||||
db_user: User,
|
||||
db: AsyncSession,
|
||||
state: FSMContext,
|
||||
):
|
||||
await state.clear()
|
||||
status = remnawave_sync_service.get_status()
|
||||
text, keyboard = _build_auto_sync_view(status)
|
||||
|
||||
await callback.message.edit_text(
|
||||
text,
|
||||
reply_markup=keyboard,
|
||||
parse_mode="HTML",
|
||||
)
|
||||
await callback.answer("Изменение расписания отменено")
|
||||
|
||||
|
||||
@admin_required
|
||||
@error_handler
|
||||
async def run_auto_sync_now(
|
||||
callback: types.CallbackQuery,
|
||||
db_user: User,
|
||||
db: AsyncSession,
|
||||
state: FSMContext,
|
||||
):
|
||||
if remnawave_sync_service.get_status().is_running:
|
||||
await callback.answer("Синхронизация уже выполняется", show_alert=True)
|
||||
return
|
||||
|
||||
await state.clear()
|
||||
await callback.message.edit_text(
|
||||
"🔄 Запуск автосинхронизации...\n\nПодождите, это может занять несколько минут.",
|
||||
parse_mode="HTML",
|
||||
)
|
||||
await callback.answer("Автосинхронизация запущена")
|
||||
|
||||
result = await remnawave_sync_service.run_sync_now(reason="manual")
|
||||
status = remnawave_sync_service.get_status()
|
||||
base_text, keyboard = _build_auto_sync_view(status)
|
||||
|
||||
if not result.get("started"):
|
||||
await callback.message.edit_text(
|
||||
"⚠️ <b>Синхронизация уже выполняется</b>\n\n" + base_text,
|
||||
reply_markup=keyboard,
|
||||
parse_mode="HTML",
|
||||
)
|
||||
return
|
||||
|
||||
if result.get("success"):
|
||||
user_stats = result.get("user_stats") or {}
|
||||
server_stats = result.get("server_stats") or {}
|
||||
summary = (
|
||||
"✅ <b>Синхронизация завершена</b>\n"
|
||||
f"👥 Пользователи: создано {user_stats.get('created', 0)}, обновлено {user_stats.get('updated', 0)}, "
|
||||
f"деактивировано {user_stats.get('deleted', user_stats.get('deactivated', 0))}, ошибок {user_stats.get('errors', 0)}\n"
|
||||
f"🌐 Серверы: создано {server_stats.get('created', 0)}, обновлено {server_stats.get('updated', 0)}, удалено {server_stats.get('removed', 0)}\n\n"
|
||||
)
|
||||
final_text = summary + base_text
|
||||
await callback.message.edit_text(
|
||||
final_text,
|
||||
reply_markup=keyboard,
|
||||
parse_mode="HTML",
|
||||
)
|
||||
else:
|
||||
error_text = result.get("error") or "Неизвестная ошибка"
|
||||
summary = (
|
||||
"❌ <b>Синхронизация завершилась с ошибкой</b>\n"
|
||||
f"Причина: {error_text}\n\n"
|
||||
)
|
||||
await callback.message.edit_text(
|
||||
summary + base_text,
|
||||
reply_markup=keyboard,
|
||||
parse_mode="HTML",
|
||||
)
|
||||
|
||||
|
||||
@admin_required
|
||||
@error_handler
|
||||
async def save_auto_sync_schedule(
|
||||
message: types.Message,
|
||||
db_user: User,
|
||||
db: AsyncSession,
|
||||
state: FSMContext,
|
||||
):
|
||||
text = (message.text or "").strip()
|
||||
data = await state.get_data()
|
||||
|
||||
if text.lower() in {"отмена", "cancel"}:
|
||||
await state.clear()
|
||||
status = remnawave_sync_service.get_status()
|
||||
view_text, keyboard = _build_auto_sync_view(status)
|
||||
message_id = data.get("auto_sync_message_id")
|
||||
chat_id = data.get("auto_sync_message_chat_id", message.chat.id)
|
||||
if message_id:
|
||||
await message.bot.edit_message_text(
|
||||
view_text,
|
||||
chat_id=chat_id,
|
||||
message_id=message_id,
|
||||
reply_markup=keyboard,
|
||||
parse_mode="HTML",
|
||||
)
|
||||
else:
|
||||
await message.answer(
|
||||
view_text,
|
||||
reply_markup=keyboard,
|
||||
parse_mode="HTML",
|
||||
)
|
||||
await message.answer("Настройка расписания отменена")
|
||||
return
|
||||
|
||||
parsed_times = settings.parse_daily_time_list(text)
|
||||
|
||||
if not parsed_times:
|
||||
await message.answer(
|
||||
"❌ Не удалось распознать время. Используйте формат HH:MM, например 03:00 или 18:45.",
|
||||
)
|
||||
return
|
||||
|
||||
normalized_value = ", ".join(t.strftime("%H:%M") for t in parsed_times)
|
||||
await bot_configuration_service.set_value(
|
||||
db,
|
||||
"REMNAWAVE_AUTO_SYNC_TIMES",
|
||||
normalized_value,
|
||||
)
|
||||
|
||||
status = remnawave_sync_service.get_status()
|
||||
view_text, keyboard = _build_auto_sync_view(status)
|
||||
message_id = data.get("auto_sync_message_id")
|
||||
chat_id = data.get("auto_sync_message_chat_id", message.chat.id)
|
||||
|
||||
if message_id:
|
||||
await message.bot.edit_message_text(
|
||||
view_text,
|
||||
chat_id=chat_id,
|
||||
message_id=message_id,
|
||||
reply_markup=keyboard,
|
||||
parse_mode="HTML",
|
||||
)
|
||||
else:
|
||||
await message.answer(
|
||||
view_text,
|
||||
reply_markup=keyboard,
|
||||
parse_mode="HTML",
|
||||
)
|
||||
|
||||
await state.clear()
|
||||
await message.answer("✅ Расписание автосинхронизации обновлено")
|
||||
|
||||
|
||||
@admin_required
|
||||
@error_handler
|
||||
async def sync_all_users(
|
||||
@@ -2688,6 +3118,11 @@ def register_handlers(dp: Dispatcher):
|
||||
dp.callback_query.register(manage_node, F.data.startswith("node_restart_"))
|
||||
dp.callback_query.register(restart_all_nodes, F.data == "admin_restart_all_nodes")
|
||||
dp.callback_query.register(show_sync_options, F.data == "admin_rw_sync")
|
||||
dp.callback_query.register(show_auto_sync_settings, F.data == "admin_rw_auto_sync")
|
||||
dp.callback_query.register(toggle_auto_sync_setting, F.data == "remnawave_auto_sync_toggle")
|
||||
dp.callback_query.register(prompt_auto_sync_schedule, F.data == "remnawave_auto_sync_times")
|
||||
dp.callback_query.register(cancel_auto_sync_schedule, F.data == "remnawave_auto_sync_cancel")
|
||||
dp.callback_query.register(run_auto_sync_now, F.data == "remnawave_auto_sync_run")
|
||||
dp.callback_query.register(sync_all_users, F.data == "sync_all_users")
|
||||
dp.callback_query.register(show_squad_migration_menu, F.data == "admin_rw_migration")
|
||||
dp.callback_query.register(paginate_migration_source, F.data.startswith("admin_migration_source_page_"))
|
||||
@@ -2716,13 +3151,19 @@ def register_handlers(dp: Dispatcher):
|
||||
dp.callback_query.register(finish_squad_creation, F.data == "create_squad_finish")
|
||||
|
||||
dp.message.register(
|
||||
process_squad_new_name,
|
||||
process_squad_new_name,
|
||||
SquadRenameStates.waiting_for_new_name,
|
||||
F.text
|
||||
)
|
||||
|
||||
|
||||
dp.message.register(
|
||||
process_squad_name,
|
||||
SquadCreateStates.waiting_for_name,
|
||||
F.text
|
||||
)
|
||||
|
||||
dp.message.register(
|
||||
save_auto_sync_schedule,
|
||||
RemnaWaveSyncStates.waiting_for_schedule,
|
||||
F.text,
|
||||
)
|
||||
|
||||
264
app/services/remnawave_sync_service.py
Normal file
264
app/services/remnawave_sync_service.py
Normal file
@@ -0,0 +1,264 @@
|
||||
import asyncio
|
||||
import logging
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timedelta, time
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.config import settings
|
||||
from app.database.database import AsyncSessionLocal
|
||||
from app.database.crud.server_squad import sync_with_remnawave
|
||||
from app.services.remnawave_service import (
|
||||
RemnaWaveConfigurationError,
|
||||
RemnaWaveService,
|
||||
)
|
||||
from app.utils.cache import cache
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class RemnaWaveAutoSyncStatus:
|
||||
enabled: bool
|
||||
times: List[time]
|
||||
next_run: Optional[datetime]
|
||||
last_run_started_at: Optional[datetime]
|
||||
last_run_finished_at: Optional[datetime]
|
||||
last_run_success: Optional[bool]
|
||||
last_run_reason: Optional[str]
|
||||
last_run_error: Optional[str]
|
||||
last_user_stats: Optional[Dict[str, Any]]
|
||||
last_server_stats: Optional[Dict[str, Any]]
|
||||
is_running: bool
|
||||
|
||||
|
||||
class RemnaWaveAutoSyncService:
|
||||
def __init__(self) -> None:
|
||||
self._scheduler_task: Optional[asyncio.Task] = None
|
||||
self._scheduler_lock = asyncio.Lock()
|
||||
self._sync_lock = asyncio.Lock()
|
||||
self._service = RemnaWaveService()
|
||||
|
||||
self._loop: Optional[asyncio.AbstractEventLoop] = None
|
||||
self._initialized = False
|
||||
self._pending_refresh = False
|
||||
self._pending_run_immediately = False
|
||||
|
||||
self._next_run: Optional[datetime] = None
|
||||
self._last_run_started_at: Optional[datetime] = None
|
||||
self._last_run_finished_at: Optional[datetime] = None
|
||||
self._last_run_success: Optional[bool] = None
|
||||
self._last_run_reason: Optional[str] = None
|
||||
self._last_run_error: Optional[str] = None
|
||||
self._last_user_stats: Optional[Dict[str, Any]] = None
|
||||
self._last_server_stats: Optional[Dict[str, Any]] = None
|
||||
|
||||
async def initialize(self) -> None:
|
||||
self._loop = asyncio.get_running_loop()
|
||||
self._initialized = True
|
||||
|
||||
run_immediately = self._pending_run_immediately
|
||||
if self._pending_refresh:
|
||||
self._pending_refresh = False
|
||||
self._pending_run_immediately = False
|
||||
await self.refresh_schedule(run_immediately=run_immediately)
|
||||
else:
|
||||
await self.refresh_schedule()
|
||||
|
||||
async def refresh_schedule(self, *, run_immediately: bool = False) -> None:
|
||||
async with self._scheduler_lock:
|
||||
if self._scheduler_task and not self._scheduler_task.done():
|
||||
self._scheduler_task.cancel()
|
||||
try:
|
||||
await self._scheduler_task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
finally:
|
||||
self._scheduler_task = None
|
||||
|
||||
if not settings.REMNAWAVE_AUTO_SYNC_ENABLED:
|
||||
self._next_run = None
|
||||
return
|
||||
|
||||
times = settings.get_remnawave_auto_sync_times()
|
||||
if not times:
|
||||
logger.warning(
|
||||
"⚠️ Автосинхронизация включена, но расписание пустое. Укажите время запуска."
|
||||
)
|
||||
self._next_run = None
|
||||
return
|
||||
|
||||
self._scheduler_task = asyncio.create_task(self._run_scheduler(times))
|
||||
|
||||
if run_immediately:
|
||||
asyncio.create_task(self.run_sync_now(reason="immediate"))
|
||||
|
||||
def schedule_refresh(self, *, run_immediately: bool = False) -> None:
|
||||
if not self._initialized:
|
||||
self._pending_refresh = True
|
||||
if run_immediately:
|
||||
self._pending_run_immediately = True
|
||||
return
|
||||
|
||||
loop = self._loop or asyncio.get_running_loop()
|
||||
loop.create_task(self.refresh_schedule(run_immediately=run_immediately))
|
||||
|
||||
async def stop(self) -> None:
|
||||
async with self._scheduler_lock:
|
||||
if self._scheduler_task and not self._scheduler_task.done():
|
||||
self._scheduler_task.cancel()
|
||||
try:
|
||||
await self._scheduler_task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
self._scheduler_task = None
|
||||
self._next_run = None
|
||||
|
||||
async def run_sync_now(self, *, reason: str = "manual") -> Dict[str, Any]:
|
||||
if self._sync_lock.locked():
|
||||
return {"started": False, "reason": "already_running"}
|
||||
|
||||
async with self._sync_lock:
|
||||
self._last_run_started_at = datetime.utcnow()
|
||||
self._last_run_finished_at = None
|
||||
self._last_run_reason = reason
|
||||
self._last_run_error = None
|
||||
self._last_run_success = None
|
||||
|
||||
try:
|
||||
user_stats, server_stats = await self._perform_sync()
|
||||
except RemnaWaveConfigurationError as error:
|
||||
message = str(error)
|
||||
self._last_run_error = message
|
||||
self._last_run_success = False
|
||||
self._last_user_stats = None
|
||||
self._last_server_stats = None
|
||||
self._last_run_finished_at = datetime.utcnow()
|
||||
logger.error("❌ Автосинхронизация RemnaWave: %s", message)
|
||||
return {
|
||||
"started": True,
|
||||
"success": False,
|
||||
"error": message,
|
||||
"user_stats": None,
|
||||
"server_stats": None,
|
||||
}
|
||||
except Exception as error:
|
||||
message = str(error)
|
||||
self._last_run_error = message
|
||||
self._last_run_success = False
|
||||
self._last_user_stats = None
|
||||
self._last_server_stats = None
|
||||
self._last_run_finished_at = datetime.utcnow()
|
||||
logger.exception("❌ Ошибка автосинхронизации RemnaWave: %s", error)
|
||||
return {
|
||||
"started": True,
|
||||
"success": False,
|
||||
"error": message,
|
||||
"user_stats": None,
|
||||
"server_stats": None,
|
||||
}
|
||||
|
||||
self._last_run_success = True
|
||||
self._last_run_error = None
|
||||
self._last_user_stats = user_stats
|
||||
self._last_server_stats = server_stats
|
||||
self._last_run_finished_at = datetime.utcnow()
|
||||
|
||||
return {
|
||||
"started": True,
|
||||
"success": True,
|
||||
"error": None,
|
||||
"user_stats": user_stats,
|
||||
"server_stats": server_stats,
|
||||
}
|
||||
|
||||
def get_status(self) -> RemnaWaveAutoSyncStatus:
|
||||
times = settings.get_remnawave_auto_sync_times()
|
||||
enabled = settings.REMNAWAVE_AUTO_SYNC_ENABLED and bool(times)
|
||||
|
||||
return RemnaWaveAutoSyncStatus(
|
||||
enabled=enabled,
|
||||
times=times,
|
||||
next_run=self._next_run,
|
||||
last_run_started_at=self._last_run_started_at,
|
||||
last_run_finished_at=self._last_run_finished_at,
|
||||
last_run_success=self._last_run_success,
|
||||
last_run_reason=self._last_run_reason,
|
||||
last_run_error=self._last_run_error,
|
||||
last_user_stats=self._last_user_stats,
|
||||
last_server_stats=self._last_server_stats,
|
||||
is_running=self._sync_lock.locked(),
|
||||
)
|
||||
|
||||
async def _run_scheduler(self, times: List[time]) -> None:
|
||||
try:
|
||||
while True:
|
||||
next_run = self._calculate_next_run(times)
|
||||
self._next_run = next_run
|
||||
|
||||
delay = (next_run - datetime.utcnow()).total_seconds()
|
||||
if delay > 0:
|
||||
await asyncio.sleep(delay)
|
||||
|
||||
await self.run_sync_now(reason="auto")
|
||||
except asyncio.CancelledError:
|
||||
raise
|
||||
finally:
|
||||
self._next_run = None
|
||||
|
||||
async def _perform_sync(self) -> Tuple[Dict[str, Any], Dict[str, Any]]:
|
||||
if not self._service.is_configured:
|
||||
raise RemnaWaveConfigurationError(
|
||||
self._service.configuration_error or "RemnaWave API не настроен"
|
||||
)
|
||||
|
||||
async with AsyncSessionLocal() as session:
|
||||
user_stats = await self._service.sync_users_from_panel(session, "all")
|
||||
server_stats = await self._sync_servers(session)
|
||||
|
||||
return user_stats, server_stats
|
||||
|
||||
async def _sync_servers(self, session: AsyncSession) -> Dict[str, Any]:
|
||||
squads = await self._service.get_all_squads()
|
||||
|
||||
if not squads:
|
||||
logger.warning("⚠️ Не удалось получить сквады из RemnaWave для автосинхронизации")
|
||||
return {"created": 0, "updated": 0, "removed": 0, "total": 0}
|
||||
|
||||
created, updated, removed = await sync_with_remnawave(session, squads)
|
||||
|
||||
try:
|
||||
await cache.delete_pattern("available_countries*")
|
||||
except Exception as error:
|
||||
logger.warning("⚠️ Не удалось очистить кеш стран после автосинхронизации: %s", error)
|
||||
|
||||
return {
|
||||
"created": created,
|
||||
"updated": updated,
|
||||
"removed": removed,
|
||||
"total": len(squads),
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def _calculate_next_run(times: List[time]) -> datetime:
|
||||
now = datetime.utcnow()
|
||||
today = now.date()
|
||||
|
||||
for scheduled in sorted(times):
|
||||
candidate = datetime.combine(today, scheduled)
|
||||
if candidate > now:
|
||||
return candidate
|
||||
|
||||
first_time = sorted(times)[0]
|
||||
next_day = today + timedelta(days=1)
|
||||
return datetime.combine(next_day, first_time)
|
||||
|
||||
|
||||
def _create_service() -> RemnaWaveAutoSyncService:
|
||||
service = RemnaWaveAutoSyncService()
|
||||
return service
|
||||
|
||||
|
||||
remnawave_sync_service = _create_service()
|
||||
@@ -248,6 +248,8 @@ class BotConfigurationService:
|
||||
"VERSION_CHECK_INTERVAL_HOURS": "VERSION",
|
||||
"TELEGRAM_STARS_RATE_RUB": "TELEGRAM",
|
||||
"REMNAWAVE_USER_DESCRIPTION_TEMPLATE": "REMNAWAVE",
|
||||
"REMNAWAVE_AUTO_SYNC_ENABLED": "REMNAWAVE",
|
||||
"REMNAWAVE_AUTO_SYNC_TIMES": "REMNAWAVE",
|
||||
}
|
||||
|
||||
CATEGORY_PREFIX_OVERRIDES: Dict[str, str] = {
|
||||
@@ -425,6 +427,20 @@ class BotConfigurationService:
|
||||
"warning": "Недоступный адрес приведет к ошибкам при управлении VPN-учетками.",
|
||||
"dependencies": "REMNAWAVE_API_KEY или REMNAWAVE_USERNAME/REMNAWAVE_PASSWORD",
|
||||
},
|
||||
"REMNAWAVE_AUTO_SYNC_ENABLED": {
|
||||
"description": "Автоматически запускает синхронизацию пользователей и серверов с панелью RemnaWave.",
|
||||
"format": "Булево значение.",
|
||||
"example": "Включено при корректно настроенных API-ключах.",
|
||||
"warning": "При включении без расписания синхронизация не будет выполнена.",
|
||||
"dependencies": "REMNAWAVE_AUTO_SYNC_TIMES",
|
||||
},
|
||||
"REMNAWAVE_AUTO_SYNC_TIMES": {
|
||||
"description": "Список времени в формате HH:MM, когда запускается автосинхронизация в течение суток.",
|
||||
"format": "Перечислите время через запятую или с новой строки (например, 03:00, 15:00).",
|
||||
"example": "03:00, 15:00",
|
||||
"warning": "Минимальный интервал между запусками не ограничен, но слишком частые синхронизации нагружают панель.",
|
||||
"dependencies": "REMNAWAVE_AUTO_SYNC_ENABLED",
|
||||
},
|
||||
"EXTERNAL_ADMIN_TOKEN": {
|
||||
"description": "Приватный токен, который использует внешняя админка для проверки запросов.",
|
||||
"format": "Значение генерируется автоматически из username бота и его токена и доступно только для чтения.",
|
||||
@@ -964,6 +980,18 @@ class BotConfigurationService:
|
||||
refresh_period_prices()
|
||||
elif key.startswith("PRICE_TRAFFIC_") or key == "TRAFFIC_PACKAGES_CONFIG":
|
||||
refresh_traffic_prices()
|
||||
elif key in {"REMNAWAVE_AUTO_SYNC_ENABLED", "REMNAWAVE_AUTO_SYNC_TIMES"}:
|
||||
try:
|
||||
from app.services.remnawave_sync_service import remnawave_sync_service
|
||||
|
||||
remnawave_sync_service.schedule_refresh(
|
||||
run_immediately=(key == "REMNAWAVE_AUTO_SYNC_ENABLED" and bool(value))
|
||||
)
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
"Не удалось обновить сервис автосинхронизации RemnaWave: %s",
|
||||
error,
|
||||
)
|
||||
except Exception as error:
|
||||
logger.error("Не удалось применить значение %s=%s: %s", key, value, error)
|
||||
|
||||
|
||||
@@ -173,6 +173,10 @@ class SquadMigrationStates(StatesGroup):
|
||||
confirming = State()
|
||||
|
||||
|
||||
class RemnaWaveSyncStates(StatesGroup):
|
||||
waiting_for_schedule = State()
|
||||
|
||||
|
||||
class AdminSubmenuStates(StatesGroup):
|
||||
in_users_submenu = State()
|
||||
in_promo_submenu = State()
|
||||
|
||||
30
main.py
30
main.py
@@ -20,6 +20,7 @@ from app.external.pal24_webhook import start_pal24_webhook_server, Pal24WebhookS
|
||||
from app.database.universal_migration import run_universal_migration
|
||||
from app.services.backup_service import backup_service
|
||||
from app.services.reporting_service import reporting_service
|
||||
from app.services.remnawave_sync_service import remnawave_sync_service
|
||||
from app.localization.loader import ensure_locale_templates
|
||||
from app.services.system_settings_service import bot_configuration_service
|
||||
from app.services.external_admin_service import ensure_external_admin_token
|
||||
@@ -185,6 +186,29 @@ async def main():
|
||||
stage.warning(f"Ошибка запуска сервиса отчетов: {e}")
|
||||
logger.error(f"❌ Ошибка запуска сервиса отчетов: {e}")
|
||||
|
||||
async with timeline.stage(
|
||||
"Автосинхронизация RemnaWave",
|
||||
"🔄",
|
||||
success_message="Сервис автосинхронизации готов",
|
||||
) as stage:
|
||||
try:
|
||||
await remnawave_sync_service.initialize()
|
||||
status = remnawave_sync_service.get_status()
|
||||
if status.enabled:
|
||||
times_text = ", ".join(t.strftime("%H:%M") for t in status.times) or "—"
|
||||
if status.next_run:
|
||||
next_run_text = status.next_run.strftime("%d.%m.%Y %H:%M")
|
||||
stage.log(
|
||||
f"Активирована: расписание {times_text}, ближайший запуск {next_run_text}"
|
||||
)
|
||||
else:
|
||||
stage.log(f"Активирована: расписание {times_text}")
|
||||
else:
|
||||
stage.log("Автосинхронизация отключена настройками")
|
||||
except Exception as e:
|
||||
stage.warning(f"Ошибка запуска автосинхронизации: {e}")
|
||||
logger.error(f"❌ Ошибка запуска автосинхронизации RemnaWave: {e}")
|
||||
|
||||
payment_service = PaymentService(bot)
|
||||
|
||||
async with timeline.stage(
|
||||
@@ -459,6 +483,12 @@ async def main():
|
||||
except Exception as e:
|
||||
logger.error(f"Ошибка остановки сервиса отчетов: {e}")
|
||||
|
||||
logger.info("ℹ️ Остановка сервиса автосинхронизации RemnaWave...")
|
||||
try:
|
||||
await remnawave_sync_service.stop()
|
||||
except Exception as e:
|
||||
logger.error(f"Ошибка остановки автосинхронизации RemnaWave: {e}")
|
||||
|
||||
logger.info("ℹ️ Остановка сервиса бекапов...")
|
||||
try:
|
||||
await backup_service.stop_auto_backup()
|
||||
|
||||
52
tests/utils/test_remnawave_auto_sync.py
Normal file
52
tests/utils/test_remnawave_auto_sync.py
Normal file
@@ -0,0 +1,52 @@
|
||||
from datetime import datetime, time as time_cls
|
||||
from types import SimpleNamespace
|
||||
|
||||
import pytest
|
||||
|
||||
from app.config import settings
|
||||
from app.services.remnawave_sync_service import RemnaWaveAutoSyncService
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"raw, expected",
|
||||
[
|
||||
("03:00, 15:30 03:00; 07:05", [time_cls(3, 0), time_cls(7, 5), time_cls(15, 30)]),
|
||||
("", []),
|
||||
(None, []),
|
||||
("25:00, 10:70, test, 09:15", [time_cls(9, 15)]),
|
||||
],
|
||||
)
|
||||
def test_parse_daily_time_list(raw, expected):
|
||||
assert settings.parse_daily_time_list(raw) == expected
|
||||
|
||||
|
||||
def _patch_datetime(monkeypatch, current):
|
||||
real_datetime = datetime
|
||||
|
||||
monkeypatch.setattr(
|
||||
"app.services.remnawave_sync_service.datetime",
|
||||
SimpleNamespace(
|
||||
utcnow=lambda: current,
|
||||
combine=lambda date_obj, time_obj: real_datetime.combine(date_obj, time_obj),
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
def test_calculate_next_run_same_day(monkeypatch):
|
||||
service = RemnaWaveAutoSyncService()
|
||||
current = datetime(2024, 1, 1, 2, 30)
|
||||
_patch_datetime(monkeypatch, current)
|
||||
|
||||
next_run = service._calculate_next_run([time_cls(1, 0), time_cls(3, 0)])
|
||||
|
||||
assert next_run == datetime(2024, 1, 1, 3, 0)
|
||||
|
||||
|
||||
def test_calculate_next_run_rollover(monkeypatch):
|
||||
service = RemnaWaveAutoSyncService()
|
||||
current = datetime(2024, 1, 1, 23, 45)
|
||||
_patch_datetime(monkeypatch, current)
|
||||
|
||||
next_run = service._calculate_next_run([time_cls(1, 0), time_cls(10, 0)])
|
||||
|
||||
assert next_run == datetime(2024, 1, 2, 1, 0)
|
||||
Reference in New Issue
Block a user