mirror of
https://github.com/BEDOLAGA-DEV/remnawave-bedolaga-telegram-bot.git
synced 2026-03-02 16:20:49 +00:00
Merge pull request #415 from Fr1ngg/bedolaga/move-env-settings-to-admin-panel-zgsjdr
Add admin-configurable runtime settings
This commit is contained in:
@@ -39,6 +39,7 @@ from app.handlers.admin import (
|
||||
welcome_text as admin_welcome_text,
|
||||
tickets as admin_tickets,
|
||||
reports as admin_reports,
|
||||
configuration as admin_configuration,
|
||||
)
|
||||
from app.handlers.stars_payments import register_stars_handlers
|
||||
|
||||
@@ -141,6 +142,7 @@ async def setup_bot() -> tuple[Bot, Dispatcher]:
|
||||
admin_welcome_text.register_welcome_text_handlers(dp)
|
||||
admin_tickets.register_handlers(dp)
|
||||
admin_reports.register_handlers(dp)
|
||||
admin_configuration.register_handlers(dp)
|
||||
common.register_handlers(dp)
|
||||
register_stars_handlers(dp)
|
||||
logger.info("⭐ Зарегистрированы обработчики Telegram Stars платежей")
|
||||
|
||||
214
app/config.py
214
app/config.py
@@ -4,14 +4,59 @@ import re
|
||||
import html
|
||||
from collections import defaultdict
|
||||
from datetime import time
|
||||
from typing import List, Optional, Union, Dict
|
||||
from typing import Any, Dict, List, Optional, Union, get_args, get_origin
|
||||
import json
|
||||
from pydantic_settings import BaseSettings
|
||||
from pydantic import field_validator, Field
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
class Settings(BaseSettings):
|
||||
|
||||
|
||||
_initial_values: Dict[str, Any] = {}
|
||||
EDITABLE_EXCLUDED_KEYS: set[str] = {"BOT_TOKEN", "ADMIN_IDS"}
|
||||
CATEGORY_ALIASES: Dict[str, str] = {
|
||||
"ADMIN": "Администрирование",
|
||||
"APP": "App Config",
|
||||
"AUTOPAY": "Автоплатежи",
|
||||
"AVAILABLE": "Доступность",
|
||||
"BACKUP": "Резервные копии",
|
||||
"BASE": "Базовые значения",
|
||||
"CAMPAIGN": "Кампании",
|
||||
"CHANNEL": "Телеграм-каналы",
|
||||
"CONNECT": "Кнопка подключения",
|
||||
"CRYPTOBOT": "CryptoBot",
|
||||
"DATABASE": "База данных",
|
||||
"DEFAULT": "Значения по умолчанию",
|
||||
"DEBUG": "Отладка",
|
||||
"ENABLE": "Флаги",
|
||||
"HAPP": "Happ",
|
||||
"INACTIVE": "Неактивные",
|
||||
"LOG": "Логирование",
|
||||
"LOGO": "Логотип",
|
||||
"MAINTENANCE": "Техработы",
|
||||
"MINIAPP": "Mini App",
|
||||
"MONITORING": "Мониторинг",
|
||||
"NOTIFICATION": "Уведомления",
|
||||
"PAYMENT": "Платежи",
|
||||
"PAL24": "PayPalych",
|
||||
"POSTGRES": "PostgreSQL",
|
||||
"PRICE": "Цены",
|
||||
"REFERRAL": "Рефералы",
|
||||
"REMNAWAVE": "Remnawave API",
|
||||
"SERVER": "Серверы",
|
||||
"SKIP": "Пропуски",
|
||||
"SQLITE": "SQLite",
|
||||
"SUPPORT": "Поддержка",
|
||||
"TELEGRAM": "Telegram",
|
||||
"TRAFFIC": "Трафик",
|
||||
"TRIAL": "Триал",
|
||||
"TRIBUTE": "Tribute",
|
||||
"VERSION": "Версии",
|
||||
"WEBHOOK": "Webhook",
|
||||
"YOOKASSA": "YooKassa",
|
||||
}
|
||||
|
||||
BOT_TOKEN: str
|
||||
ADMIN_IDS: str = ""
|
||||
SUPPORT_USERNAME: str = "@support"
|
||||
@@ -964,7 +1009,170 @@ class Settings(BaseSettings):
|
||||
|
||||
def is_support_contact_enabled(self) -> bool:
|
||||
return self.get_support_system_mode() in {"contact", "both"}
|
||||
|
||||
|
||||
def model_post_init(self, __context: Any) -> None: # type: ignore[override]
|
||||
try:
|
||||
super().model_post_init(__context)
|
||||
except AttributeError:
|
||||
pass
|
||||
object.__setattr__(self, "_initial_values", self.model_dump())
|
||||
|
||||
def get_initial_value(self, key: str) -> Any:
|
||||
return (self._initial_values or {}).get(key, getattr(self, key, None))
|
||||
|
||||
def get_initial_editable_values(self) -> Dict[str, Any]:
|
||||
return {
|
||||
key: self.get_initial_value(key)
|
||||
for key in self.model_fields.keys()
|
||||
if self.is_editable_field(key)
|
||||
}
|
||||
|
||||
def is_editable_field(self, key: str) -> bool:
|
||||
return key in self.model_fields and key not in self.EDITABLE_EXCLUDED_KEYS
|
||||
|
||||
def get_field_category_key(self, key: str) -> str:
|
||||
if "_" in key:
|
||||
return key.split("_", 1)[0].upper()
|
||||
return "GENERAL"
|
||||
|
||||
def get_field_category_label(self, key: str) -> str:
|
||||
category_key = self.get_field_category_key(key)
|
||||
if category_key == "GENERAL":
|
||||
return "Прочее"
|
||||
return self.CATEGORY_ALIASES.get(category_key, category_key.capitalize())
|
||||
|
||||
def get_field_label(self, key: str) -> str:
|
||||
return key.replace("_", " ").strip().title()
|
||||
|
||||
def get_field_type_annotation(self, key: str) -> Any:
|
||||
field = self.model_fields.get(key)
|
||||
if not field:
|
||||
raise KeyError(key)
|
||||
return field.annotation
|
||||
|
||||
def get_field_type_name(self, key: str) -> str:
|
||||
annotation = self.get_field_type_annotation(key)
|
||||
origin = get_origin(annotation)
|
||||
if origin is Union:
|
||||
args = [arg for arg in get_args(annotation) if arg is not type(None)]
|
||||
if not args:
|
||||
return "str"
|
||||
return f"Optional[{self._type_to_name(args[0])}]"
|
||||
return self._type_to_name(annotation)
|
||||
|
||||
def _type_to_name(self, annotation: Any) -> str:
|
||||
if annotation in {str, Optional[str]}:
|
||||
return "str"
|
||||
if annotation is bool:
|
||||
return "bool"
|
||||
if annotation is int:
|
||||
return "int"
|
||||
if annotation is float:
|
||||
return "float"
|
||||
if annotation in {list, List[str]}:
|
||||
return "list"
|
||||
if annotation in {dict, Dict[str, Any]}:
|
||||
return "dict"
|
||||
if hasattr(annotation, "__name__"):
|
||||
return annotation.__name__
|
||||
return str(annotation)
|
||||
|
||||
def format_value_for_display(self, key: str, value: Any | None = None) -> str:
|
||||
actual_value = getattr(self, key, None) if value is None else value
|
||||
if actual_value is None:
|
||||
return "—"
|
||||
if isinstance(actual_value, bool):
|
||||
return "✅ Включено" if actual_value else "🚫 Выключено"
|
||||
if isinstance(actual_value, (list, dict)):
|
||||
try:
|
||||
return json.dumps(actual_value, ensure_ascii=False)
|
||||
except Exception:
|
||||
return str(actual_value)
|
||||
return str(actual_value)
|
||||
|
||||
def serialize_value_for_storage(self, key: str, value: Any | None = None) -> Any:
|
||||
actual_value = getattr(self, key, None) if value is None else value
|
||||
if isinstance(actual_value, Path):
|
||||
return str(actual_value)
|
||||
if isinstance(actual_value, set):
|
||||
return list(actual_value)
|
||||
return actual_value
|
||||
|
||||
def cast_value(self, key: str, value: Any) -> Any:
|
||||
annotation = self.get_field_type_annotation(key)
|
||||
return self._convert_value(annotation, value)
|
||||
|
||||
def parse_raw_value(self, key: str, raw_value: str) -> Any:
|
||||
annotation = self.get_field_type_annotation(key)
|
||||
return self._convert_value(annotation, raw_value)
|
||||
|
||||
def _convert_value(self, annotation: Any, value: Any) -> Any:
|
||||
origin = get_origin(annotation)
|
||||
if origin is Union:
|
||||
args = [arg for arg in get_args(annotation) if arg is not type(None)]
|
||||
if not args:
|
||||
return value
|
||||
if value in ("", None, "none", "None", "null", "Null"):
|
||||
return None
|
||||
return self._convert_value(args[0], value)
|
||||
|
||||
if annotation is bool:
|
||||
if isinstance(value, bool):
|
||||
return value
|
||||
if isinstance(value, (int, float)):
|
||||
return bool(value)
|
||||
value_str = str(value).strip().lower()
|
||||
if value_str in {"true", "1", "yes", "on", "да"}:
|
||||
return True
|
||||
if value_str in {"false", "0", "no", "off", "нет"}:
|
||||
return False
|
||||
raise ValueError("Некорректное булево значение")
|
||||
|
||||
if annotation is int:
|
||||
if isinstance(value, int):
|
||||
return value
|
||||
if value in ("", None):
|
||||
raise ValueError("Значение не может быть пустым")
|
||||
return int(str(value).strip())
|
||||
|
||||
if annotation is float:
|
||||
if isinstance(value, float):
|
||||
return value
|
||||
if isinstance(value, int):
|
||||
return float(value)
|
||||
if value in ("", None):
|
||||
raise ValueError("Значение не может быть пустым")
|
||||
return float(str(value).replace(',', '.'))
|
||||
|
||||
if annotation in {str, Any}:
|
||||
return "" if value is None else str(value)
|
||||
|
||||
if annotation in {list, List[str]}:
|
||||
if isinstance(value, list):
|
||||
return value
|
||||
return [item.strip() for item in str(value).split(',') if item.strip()]
|
||||
|
||||
if annotation in {dict, Dict[str, Any]}:
|
||||
if isinstance(value, dict):
|
||||
return value
|
||||
try:
|
||||
parsed = json.loads(value)
|
||||
if isinstance(parsed, dict):
|
||||
return parsed
|
||||
except Exception:
|
||||
pass
|
||||
raise ValueError("Ожидается JSON-объект")
|
||||
|
||||
return value
|
||||
|
||||
def apply_override(self, key: str, value: Any) -> None:
|
||||
object.__setattr__(self, key, value)
|
||||
|
||||
def apply_overrides(self, overrides: Dict[str, Any]) -> None:
|
||||
for key, value in overrides.items():
|
||||
if self.is_editable_field(key):
|
||||
self.apply_override(key, value)
|
||||
|
||||
model_config = {
|
||||
"env_file": ".env",
|
||||
"env_file_encoding": "utf-8",
|
||||
|
||||
50
app/database/crud/app_settings.py
Normal file
50
app/database/crud/app_settings.py
Normal file
@@ -0,0 +1,50 @@
|
||||
from datetime import datetime
|
||||
from typing import List, Optional
|
||||
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.database.models import AppSetting
|
||||
|
||||
|
||||
class AppSettingsCRUD:
|
||||
@staticmethod
|
||||
async def list_all(db: AsyncSession) -> List[AppSetting]:
|
||||
result = await db.execute(select(AppSetting).order_by(AppSetting.setting_key))
|
||||
return list(result.scalars())
|
||||
|
||||
@staticmethod
|
||||
async def get_by_key(db: AsyncSession, setting_key: str) -> Optional[AppSetting]:
|
||||
result = await db.execute(
|
||||
select(AppSetting).where(AppSetting.setting_key == setting_key)
|
||||
)
|
||||
return result.scalar_one_or_none()
|
||||
|
||||
@staticmethod
|
||||
async def upsert(db: AsyncSession, setting_key: str, value: str) -> AppSetting:
|
||||
record = await AppSettingsCRUD.get_by_key(db, setting_key)
|
||||
now = datetime.utcnow()
|
||||
|
||||
if record:
|
||||
record.value = value
|
||||
record.updated_at = now
|
||||
else:
|
||||
record = AppSetting(
|
||||
setting_key=setting_key,
|
||||
value=value,
|
||||
created_at=now,
|
||||
updated_at=now,
|
||||
)
|
||||
db.add(record)
|
||||
|
||||
await db.flush()
|
||||
return record
|
||||
|
||||
@staticmethod
|
||||
async def delete_by_key(db: AsyncSession, setting_key: str) -> bool:
|
||||
record = await AppSettingsCRUD.get_by_key(db, setting_key)
|
||||
if not record:
|
||||
return False
|
||||
await db.delete(record)
|
||||
await db.flush()
|
||||
return True
|
||||
@@ -25,6 +25,16 @@ from sqlalchemy.sql import func
|
||||
Base = declarative_base()
|
||||
|
||||
|
||||
class AppSetting(Base):
|
||||
__tablename__ = "app_settings"
|
||||
|
||||
id = Column(Integer, primary_key=True, autoincrement=True)
|
||||
setting_key = Column(String(120), unique=True, nullable=False, index=True)
|
||||
value = Column(Text, nullable=True)
|
||||
created_at = Column(DateTime, default=func.now(), nullable=False)
|
||||
updated_at = Column(DateTime, default=func.now(), onupdate=func.now(), nullable=False)
|
||||
|
||||
|
||||
server_squad_promo_groups = Table(
|
||||
"server_squad_promo_groups",
|
||||
Base.metadata,
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
import json
|
||||
import logging
|
||||
from sqlalchemy import text, inspect
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from sqlalchemy import text
|
||||
from app.database.database import engine
|
||||
from app.config import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -172,6 +173,98 @@ async def check_index_exists(table_name: str, index_name: str) -> bool:
|
||||
)
|
||||
return False
|
||||
|
||||
|
||||
async def ensure_app_settings_table() -> bool:
|
||||
try:
|
||||
table_exists = await check_table_exists('app_settings')
|
||||
|
||||
async with engine.begin() as conn:
|
||||
db_type = await get_database_type()
|
||||
|
||||
if not table_exists:
|
||||
if db_type == 'sqlite':
|
||||
create_sql = """
|
||||
CREATE TABLE app_settings (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
setting_key VARCHAR(120) NOT NULL UNIQUE,
|
||||
value TEXT NULL,
|
||||
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
||||
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
|
||||
)
|
||||
"""
|
||||
elif db_type == 'postgresql':
|
||||
create_sql = """
|
||||
CREATE TABLE IF NOT EXISTS app_settings (
|
||||
id SERIAL PRIMARY KEY,
|
||||
setting_key VARCHAR(120) UNIQUE NOT NULL,
|
||||
value TEXT NULL,
|
||||
created_at TIMESTAMP WITHOUT TIME ZONE DEFAULT CURRENT_TIMESTAMP,
|
||||
updated_at TIMESTAMP WITHOUT TIME ZONE DEFAULT CURRENT_TIMESTAMP
|
||||
)
|
||||
"""
|
||||
elif db_type == 'mysql':
|
||||
create_sql = """
|
||||
CREATE TABLE IF NOT EXISTS app_settings (
|
||||
id INT AUTO_INCREMENT PRIMARY KEY,
|
||||
setting_key VARCHAR(120) NOT NULL UNIQUE,
|
||||
value TEXT NULL,
|
||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||||
) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci
|
||||
"""
|
||||
else:
|
||||
create_sql = None
|
||||
|
||||
if create_sql:
|
||||
await conn.execute(text(create_sql))
|
||||
logger.info("✅ Таблица app_settings создана")
|
||||
else:
|
||||
logger.info("ℹ️ Таблица app_settings уже существует")
|
||||
|
||||
defaults = settings.get_initial_editable_values()
|
||||
|
||||
if defaults:
|
||||
for key, default_value in defaults.items():
|
||||
if not settings.is_editable_field(key):
|
||||
continue
|
||||
|
||||
serialized = json.dumps(
|
||||
settings.serialize_value_for_storage(key, default_value),
|
||||
ensure_ascii=False
|
||||
)
|
||||
|
||||
if db_type == 'sqlite':
|
||||
insert_sql = """
|
||||
INSERT OR IGNORE INTO app_settings (setting_key, value, created_at, updated_at)
|
||||
VALUES (:setting_key, :value, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
|
||||
"""
|
||||
elif db_type == 'postgresql':
|
||||
insert_sql = """
|
||||
INSERT INTO app_settings (setting_key, value, created_at, updated_at)
|
||||
VALUES (:setting_key, :value, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
|
||||
ON CONFLICT (setting_key) DO NOTHING
|
||||
"""
|
||||
elif db_type == 'mysql':
|
||||
insert_sql = """
|
||||
INSERT INTO app_settings (setting_key, value, created_at, updated_at)
|
||||
VALUES (:setting_key, :value, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
|
||||
ON DUPLICATE KEY UPDATE setting_key = VALUES(setting_key)
|
||||
"""
|
||||
else:
|
||||
insert_sql = None
|
||||
|
||||
if insert_sql:
|
||||
await conn.execute(
|
||||
text(insert_sql),
|
||||
{"setting_key": key, "value": serialized}
|
||||
)
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Ошибка создания или обновления таблицы app_settings: {e}")
|
||||
return False
|
||||
|
||||
async def create_cryptobot_payments_table():
|
||||
table_exists = await check_table_exists('cryptobot_payments')
|
||||
if table_exists:
|
||||
@@ -1839,7 +1932,14 @@ async def run_universal_migration():
|
||||
try:
|
||||
db_type = await get_database_type()
|
||||
logger.info(f"Тип базы данных: {db_type}")
|
||||
|
||||
|
||||
logger.info("=== СОЗДАНИЕ ТАБЛИЦЫ APP_SETTINGS ===")
|
||||
app_settings_ready = await ensure_app_settings_table()
|
||||
if app_settings_ready:
|
||||
logger.info("✅ Таблица app_settings готова")
|
||||
else:
|
||||
logger.warning("⚠️ Проблемы с таблицей app_settings")
|
||||
|
||||
referral_migration_success = await add_referral_system_columns()
|
||||
if not referral_migration_success:
|
||||
logger.warning("⚠️ Проблемы с миграцией реферальной системы")
|
||||
|
||||
300
app/handlers/admin/configuration.py
Normal file
300
app/handlers/admin/configuration.py
Normal file
@@ -0,0 +1,300 @@
|
||||
import html
|
||||
import logging
|
||||
import math
|
||||
from typing import Dict, List, Tuple
|
||||
|
||||
from aiogram import Bot, Dispatcher, F, types
|
||||
from aiogram.exceptions import TelegramBadRequest
|
||||
from aiogram.fsm.context import FSMContext
|
||||
from aiogram.types import InlineKeyboardMarkup, InlineKeyboardButton
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.config import settings
|
||||
from app.database.models import User
|
||||
from app.keyboards.admin import (
|
||||
get_bot_config_categories_keyboard,
|
||||
get_bot_config_settings_keyboard,
|
||||
)
|
||||
from app.services.app_settings_service import AppSettingsService
|
||||
from app.states import AdminStates
|
||||
from app.utils.decorators import admin_required, error_handler
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
CONFIGS_PER_PAGE = 8
|
||||
CANCEL_COMMANDS = {"отмена", "cancel", "/cancel"}
|
||||
|
||||
|
||||
@admin_required
|
||||
@error_handler
|
||||
async def show_bot_config_menu(
|
||||
callback: types.CallbackQuery,
|
||||
db_user: User,
|
||||
db: AsyncSession,
|
||||
state: FSMContext,
|
||||
):
|
||||
await state.clear()
|
||||
categories = AppSettingsService.get_categories()
|
||||
keyboard = get_bot_config_categories_keyboard(categories)
|
||||
|
||||
intro_lines = [
|
||||
"🧩 <b>Конфигурация бота</b>",
|
||||
"",
|
||||
"Изменения применяются сразу.",
|
||||
"Отправьте <code>reset</code> для возврата значения по умолчанию.",
|
||||
"Некоторые изменения могут потребовать перезапуска бота.",
|
||||
]
|
||||
|
||||
if not categories:
|
||||
intro_lines.append("\nНастройки недоступны для редактирования.")
|
||||
|
||||
text = "\n".join(intro_lines)
|
||||
|
||||
await callback.message.edit_text(text, reply_markup=keyboard, parse_mode="HTML")
|
||||
await state.update_data(
|
||||
bot_config_message_id=callback.message.message_id,
|
||||
bot_config_category_key=None,
|
||||
bot_config_category_page=1,
|
||||
)
|
||||
await callback.answer()
|
||||
|
||||
|
||||
@admin_required
|
||||
@error_handler
|
||||
async def show_bot_config_category(
|
||||
callback: types.CallbackQuery,
|
||||
db_user: User,
|
||||
db: AsyncSession,
|
||||
state: FSMContext,
|
||||
):
|
||||
data = callback.data
|
||||
if data == "admin_bot_config_cat:noop":
|
||||
await callback.answer()
|
||||
return
|
||||
|
||||
parts = data.split(":")
|
||||
if len(parts) < 2:
|
||||
await callback.answer()
|
||||
return
|
||||
|
||||
category_key = parts[1].upper()
|
||||
try:
|
||||
page = int(parts[2]) if len(parts) > 2 else 1
|
||||
except ValueError:
|
||||
page = 1
|
||||
|
||||
await state.clear()
|
||||
await _render_category_view(callback.message, state, category_key, page)
|
||||
await callback.answer()
|
||||
|
||||
|
||||
@admin_required
|
||||
@error_handler
|
||||
async def prompt_bot_config_edit(
|
||||
callback: types.CallbackQuery,
|
||||
db_user: User,
|
||||
db: AsyncSession,
|
||||
state: FSMContext,
|
||||
):
|
||||
key = callback.data.split(":", 1)[-1]
|
||||
try:
|
||||
field_info = AppSettingsService.get_field_info(key)
|
||||
except ValueError as error:
|
||||
await callback.answer(str(error), show_alert=True)
|
||||
return
|
||||
|
||||
state_data = await state.get_data()
|
||||
category_key = state_data.get("bot_config_category_key") or field_info["category_key"]
|
||||
page = state_data.get("bot_config_category_page", 1)
|
||||
message_id = state_data.get("bot_config_message_id", callback.message.message_id)
|
||||
|
||||
default_display = settings.format_value_for_display(key, field_info["default_value"])
|
||||
current_display = field_info["display_value"] or "—"
|
||||
|
||||
text = (
|
||||
"✏️ <b>Изменение настройки</b>\n\n"
|
||||
f"<b>Ключ:</b> <code>{key}</code>\n"
|
||||
f"<b>Название:</b> {html.escape(field_info['label'])}\n"
|
||||
f"<b>Тип:</b> {field_info['type']}\n"
|
||||
f"<b>Текущее значение:</b> <code>{html.escape(str(current_display))}</code>\n"
|
||||
f"<b>По умолчанию:</b> <code>{html.escape(str(default_display))}</code>\n\n"
|
||||
"Отправьте новое значение одним сообщением.\n"
|
||||
"Для возврата по умолчанию отправьте <code>reset</code>.\n"
|
||||
"Для отмены отправьте <code>отмена</code> или <code>/cancel</code>."
|
||||
)
|
||||
|
||||
back_callback = f"admin_bot_config_cat:{category_key}:{page}"
|
||||
keyboard = InlineKeyboardMarkup(
|
||||
inline_keyboard=[
|
||||
[InlineKeyboardButton(text="⬅️ Назад", callback_data=back_callback)]
|
||||
]
|
||||
)
|
||||
|
||||
await callback.message.edit_text(text, reply_markup=keyboard, parse_mode="HTML")
|
||||
|
||||
await state.update_data(
|
||||
bot_config_edit_key=key,
|
||||
bot_config_category_key=category_key,
|
||||
bot_config_category_page=page,
|
||||
bot_config_message_id=message_id,
|
||||
)
|
||||
await state.set_state(AdminStates.editing_bot_config_value)
|
||||
await callback.answer()
|
||||
|
||||
|
||||
@admin_required
|
||||
@error_handler
|
||||
async def process_bot_config_edit(
|
||||
message: types.Message,
|
||||
db_user: User,
|
||||
db: AsyncSession,
|
||||
state: FSMContext,
|
||||
):
|
||||
state_data = await state.get_data()
|
||||
key = state_data.get("bot_config_edit_key")
|
||||
category_key = state_data.get("bot_config_category_key")
|
||||
page = state_data.get("bot_config_category_page", 1)
|
||||
message_id = state_data.get("bot_config_message_id")
|
||||
|
||||
if not key:
|
||||
await message.answer("⚠️ Не удалось определить настройку. Выберите её снова из меню.")
|
||||
await state.clear()
|
||||
return
|
||||
|
||||
text = (message.text or "").strip()
|
||||
|
||||
if text.lower() in CANCEL_COMMANDS:
|
||||
await message.answer("❌ Изменение отменено.")
|
||||
await state.clear()
|
||||
if category_key and message_id:
|
||||
await _edit_category_message(message.bot, message.chat.id, message_id, category_key, page, state)
|
||||
else:
|
||||
await message.answer("ℹ️ Вернитесь к списку настроек через меню администратора.")
|
||||
return
|
||||
|
||||
try:
|
||||
result = await AppSettingsService.update_setting(key, text)
|
||||
except ValueError as error:
|
||||
await message.answer(f"❌ {error}")
|
||||
return
|
||||
|
||||
display_value = html.escape(str(result["display"])) if result.get("display") is not None else "—"
|
||||
await message.answer(
|
||||
f"✅ Настройка <code>{key}</code> обновлена до значения <code>{display_value}</code>",
|
||||
parse_mode="HTML"
|
||||
)
|
||||
|
||||
await state.clear()
|
||||
|
||||
if category_key and message_id:
|
||||
await _edit_category_message(message.bot, message.chat.id, message_id, category_key, page, state)
|
||||
else:
|
||||
await message.answer("ℹ️ Откройте категорию настроек заново через меню администратора.")
|
||||
|
||||
|
||||
async def _render_category_view(
|
||||
message: types.Message,
|
||||
state: FSMContext,
|
||||
category_key: str,
|
||||
page: int,
|
||||
) -> None:
|
||||
text, keyboard, actual_page = _build_category_view(category_key, page)
|
||||
try:
|
||||
await message.edit_text(text, reply_markup=keyboard, parse_mode="HTML")
|
||||
except TelegramBadRequest as error:
|
||||
logger.debug("Не удалось обновить сообщение конфигурации: %s", error)
|
||||
await state.update_data(
|
||||
bot_config_category_key=category_key,
|
||||
bot_config_category_page=actual_page,
|
||||
bot_config_message_id=message.message_id,
|
||||
)
|
||||
|
||||
|
||||
async def _edit_category_message(
|
||||
bot: Bot,
|
||||
chat_id: int,
|
||||
message_id: int,
|
||||
category_key: str,
|
||||
page: int,
|
||||
state: FSMContext,
|
||||
) -> None:
|
||||
text, keyboard, actual_page = _build_category_view(category_key, page)
|
||||
try:
|
||||
await bot.edit_message_text(
|
||||
text=text,
|
||||
chat_id=chat_id,
|
||||
message_id=message_id,
|
||||
reply_markup=keyboard,
|
||||
parse_mode="HTML",
|
||||
)
|
||||
except TelegramBadRequest as error:
|
||||
logger.debug("Не удалось обновить сообщение конфигурации: %s", error)
|
||||
return
|
||||
|
||||
await state.update_data(
|
||||
bot_config_category_key=category_key,
|
||||
bot_config_category_page=actual_page,
|
||||
bot_config_message_id=message_id,
|
||||
)
|
||||
|
||||
|
||||
def _build_category_view(
|
||||
category_key: str,
|
||||
page: int,
|
||||
) -> Tuple[str, InlineKeyboardMarkup, int]:
|
||||
fields = AppSettingsService.get_category_fields(category_key)
|
||||
total = len(fields)
|
||||
total_pages = max(1, math.ceil(total / CONFIGS_PER_PAGE))
|
||||
page = max(1, min(page, total_pages))
|
||||
start_index = (page - 1) * CONFIGS_PER_PAGE
|
||||
end_index = start_index + CONFIGS_PER_PAGE
|
||||
page_fields = fields[start_index:end_index]
|
||||
|
||||
category_label = (
|
||||
fields[0]["category_label"]
|
||||
if fields
|
||||
else settings.get_field_category_label(category_key)
|
||||
)
|
||||
|
||||
lines = [
|
||||
f"🧩 <b>{category_label}</b>",
|
||||
"",
|
||||
"Выберите настройку для редактирования.",
|
||||
"Отправьте <code>reset</code> для возврата по умолчанию.",
|
||||
"",
|
||||
]
|
||||
|
||||
if not page_fields:
|
||||
lines.append("Настройки отсутствуют.")
|
||||
else:
|
||||
for field in page_fields:
|
||||
indicator = "⭐" if field.get("is_overridden") else "•"
|
||||
value_display = field.get("display_value")
|
||||
value_str = html.escape(str(value_display)) if value_display is not None else "—"
|
||||
value_str = value_str.replace("\n", " ")
|
||||
if len(value_str) > 120:
|
||||
value_str = value_str[:117] + "…"
|
||||
lines.append(f"{indicator} <code>{field['key']}</code> → <code>{value_str}</code>")
|
||||
|
||||
text = "\n".join(lines)
|
||||
keyboard = get_bot_config_settings_keyboard(category_key, fields, page, CONFIGS_PER_PAGE, total_pages)
|
||||
return text, keyboard, page
|
||||
|
||||
|
||||
def register_handlers(dp: Dispatcher):
|
||||
dp.callback_query.register(
|
||||
show_bot_config_menu,
|
||||
F.data == "admin_bot_config"
|
||||
)
|
||||
dp.callback_query.register(
|
||||
show_bot_config_category,
|
||||
F.data.startswith("admin_bot_config_cat:")
|
||||
)
|
||||
dp.callback_query.register(
|
||||
prompt_bot_config_edit,
|
||||
F.data.startswith("admin_bot_config_edit:")
|
||||
)
|
||||
dp.message.register(
|
||||
process_bot_config_edit,
|
||||
AdminStates.editing_bot_config_value
|
||||
)
|
||||
@@ -1,4 +1,4 @@
|
||||
from typing import List, Optional, Tuple, Any
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
from aiogram.types import InlineKeyboardMarkup, InlineKeyboardButton
|
||||
|
||||
from app.localization.texts import get_texts
|
||||
@@ -93,12 +93,15 @@ def get_admin_support_submenu_keyboard(language: str = "ru") -> InlineKeyboardMa
|
||||
|
||||
def get_admin_settings_submenu_keyboard(language: str = "ru") -> InlineKeyboardMarkup:
|
||||
texts = get_texts(language)
|
||||
|
||||
|
||||
return InlineKeyboardMarkup(inline_keyboard=[
|
||||
[
|
||||
InlineKeyboardButton(text=texts.ADMIN_REMNAWAVE, callback_data="admin_remnawave"),
|
||||
InlineKeyboardButton(text=texts.ADMIN_MONITORING, callback_data="admin_monitoring")
|
||||
],
|
||||
[
|
||||
InlineKeyboardButton(text="🧩 Конфигурация бота", callback_data="admin_bot_config")
|
||||
],
|
||||
[
|
||||
InlineKeyboardButton(
|
||||
text=texts.t("ADMIN_MONITORING_SETTINGS", "⚙️ Настройки мониторинга"),
|
||||
@@ -115,6 +118,86 @@ def get_admin_settings_submenu_keyboard(language: str = "ru") -> InlineKeyboardM
|
||||
])
|
||||
|
||||
|
||||
def get_bot_config_categories_keyboard(categories: List[Dict[str, Any]]) -> InlineKeyboardMarkup:
|
||||
rows: List[List[InlineKeyboardButton]] = []
|
||||
|
||||
if categories:
|
||||
for i in range(0, len(categories), 2):
|
||||
chunk = categories[i:i + 2]
|
||||
row: List[InlineKeyboardButton] = []
|
||||
for category in chunk:
|
||||
count = len(category.get("fields", []))
|
||||
text = f"{category['label']} ({count})"
|
||||
row.append(
|
||||
InlineKeyboardButton(
|
||||
text=text,
|
||||
callback_data=f"admin_bot_config_cat:{category['key']}:1"
|
||||
)
|
||||
)
|
||||
rows.append(row)
|
||||
|
||||
rows.append([InlineKeyboardButton(text="⬅️ Назад", callback_data="admin_submenu_settings")])
|
||||
|
||||
return InlineKeyboardMarkup(inline_keyboard=rows)
|
||||
|
||||
|
||||
def get_bot_config_settings_keyboard(
|
||||
category_key: str,
|
||||
fields: List[Dict[str, Any]],
|
||||
page: int,
|
||||
per_page: int,
|
||||
total_pages: int
|
||||
) -> InlineKeyboardMarkup:
|
||||
rows: List[List[InlineKeyboardButton]] = []
|
||||
|
||||
start_index = (page - 1) * per_page
|
||||
end_index = start_index + per_page
|
||||
page_fields = fields[start_index:end_index]
|
||||
|
||||
for field in page_fields:
|
||||
label = field.get("label") or field["key"].title()
|
||||
if field.get("is_overridden"):
|
||||
text = f"⭐ {label}"
|
||||
else:
|
||||
text = label
|
||||
rows.append([
|
||||
InlineKeyboardButton(
|
||||
text=text[:40],
|
||||
callback_data=f"admin_bot_config_edit:{field['key']}"
|
||||
)
|
||||
])
|
||||
|
||||
if total_pages > 1:
|
||||
nav_row: List[InlineKeyboardButton] = []
|
||||
if page > 1:
|
||||
nav_row.append(
|
||||
InlineKeyboardButton(
|
||||
text="⬅️",
|
||||
callback_data=f"admin_bot_config_cat:{category_key}:{page - 1}"
|
||||
)
|
||||
)
|
||||
nav_row.append(
|
||||
InlineKeyboardButton(
|
||||
text=f"{page}/{total_pages}",
|
||||
callback_data="admin_bot_config_cat:noop"
|
||||
)
|
||||
)
|
||||
if page < total_pages:
|
||||
nav_row.append(
|
||||
InlineKeyboardButton(
|
||||
text="➡️",
|
||||
callback_data=f"admin_bot_config_cat:{category_key}:{page + 1}"
|
||||
)
|
||||
)
|
||||
rows.append(nav_row)
|
||||
|
||||
rows.append([
|
||||
InlineKeyboardButton(text="⬅️ Категории", callback_data="admin_bot_config")
|
||||
])
|
||||
|
||||
return InlineKeyboardMarkup(inline_keyboard=rows)
|
||||
|
||||
|
||||
def get_admin_system_submenu_keyboard(language: str = "ru") -> InlineKeyboardMarkup:
|
||||
texts = get_texts(language)
|
||||
|
||||
|
||||
171
app/services/app_settings_service.py
Normal file
171
app/services/app_settings_service.py
Normal file
@@ -0,0 +1,171 @@
|
||||
import json
|
||||
import logging
|
||||
from typing import Any, Dict, List
|
||||
|
||||
from app.config import settings
|
||||
from app.database.crud.app_settings import AppSettingsCRUD
|
||||
from app.database.database import AsyncSessionLocal
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class AppSettingsService:
|
||||
RESET_COMMANDS = {"--reset", "reset", "/reset", "default", "сброс", "по умолчанию"}
|
||||
|
||||
@classmethod
|
||||
async def load_overrides(cls) -> None:
|
||||
try:
|
||||
async with AsyncSessionLocal() as session:
|
||||
records = await AppSettingsCRUD.list_all(session)
|
||||
existing_keys = {record.setting_key for record in records}
|
||||
defaults = settings.get_initial_editable_values()
|
||||
|
||||
added = False
|
||||
for key, default_value in defaults.items():
|
||||
if not settings.is_editable_field(key) or key in existing_keys:
|
||||
continue
|
||||
serialized = cls._serialize_for_storage(key, default_value)
|
||||
await AppSettingsCRUD.upsert(session, key, serialized)
|
||||
added = True
|
||||
|
||||
if added:
|
||||
await session.commit()
|
||||
records = await AppSettingsCRUD.list_all(session)
|
||||
else:
|
||||
await session.commit()
|
||||
except Exception as error:
|
||||
logger.error("Не удалось синхронизировать таблицу app_settings: %s", error)
|
||||
records = []
|
||||
|
||||
overrides: Dict[str, Any] = {}
|
||||
for record in records:
|
||||
key = record.setting_key
|
||||
if not settings.is_editable_field(key):
|
||||
continue
|
||||
try:
|
||||
value = cls._deserialize_value(key, record.value)
|
||||
except Exception as error:
|
||||
logger.warning("Не удалось разобрать значение настройки %s: %s", key, error)
|
||||
continue
|
||||
overrides[key] = value
|
||||
|
||||
if overrides:
|
||||
settings.apply_overrides(overrides)
|
||||
logger.info("Применены %d пользовательских настроек", len(overrides))
|
||||
|
||||
@classmethod
|
||||
def get_categories(cls) -> List[Dict[str, Any]]:
|
||||
categories: Dict[str, Dict[str, Any]] = {}
|
||||
for field in cls._collect_fields():
|
||||
category_key = field["category_key"]
|
||||
category = categories.setdefault(
|
||||
category_key,
|
||||
{
|
||||
"key": category_key,
|
||||
"label": field["category_label"],
|
||||
"fields": [],
|
||||
},
|
||||
)
|
||||
category["fields"].append(field)
|
||||
|
||||
for category in categories.values():
|
||||
category["fields"].sort(key=lambda item: item["label"].lower())
|
||||
|
||||
def sort_key(item: Dict[str, Any]) -> tuple[int, str]:
|
||||
return (1 if item["label"] == "Прочее" else 0, item["label"].lower())
|
||||
|
||||
return sorted(categories.values(), key=sort_key)
|
||||
|
||||
@classmethod
|
||||
def get_category_fields(cls, category_key: str) -> List[Dict[str, Any]]:
|
||||
category_key = category_key.upper()
|
||||
fields = [
|
||||
field
|
||||
for field in cls._collect_fields()
|
||||
if field["category_key"] == category_key
|
||||
]
|
||||
fields.sort(key=lambda item: item["label"].lower())
|
||||
return fields
|
||||
|
||||
@classmethod
|
||||
def get_field_info(cls, key: str) -> Dict[str, Any]:
|
||||
if not settings.is_editable_field(key):
|
||||
raise ValueError("Недоступная настройка")
|
||||
field = settings.model_fields.get(key)
|
||||
if not field:
|
||||
raise ValueError("Настройка не найдена")
|
||||
return {
|
||||
"key": key,
|
||||
"label": settings.get_field_label(key),
|
||||
"type": settings.get_field_type_name(key),
|
||||
"current_value": getattr(settings, key),
|
||||
"display_value": settings.format_value_for_display(key),
|
||||
"default_value": settings.get_initial_value(key),
|
||||
"category_key": settings.get_field_category_key(key),
|
||||
"category_label": settings.get_field_category_label(key),
|
||||
"is_overridden": settings.get_initial_value(key) != getattr(settings, key),
|
||||
}
|
||||
|
||||
@classmethod
|
||||
async def update_setting(cls, key: str, raw_value: str) -> Dict[str, Any]:
|
||||
if not settings.is_editable_field(key):
|
||||
raise ValueError("Эту настройку нельзя изменять через админку")
|
||||
|
||||
if raw_value is None:
|
||||
raise ValueError("Значение не может быть пустым")
|
||||
|
||||
stripped = raw_value.strip()
|
||||
use_default = stripped.lower() in cls.RESET_COMMANDS
|
||||
|
||||
if use_default:
|
||||
parsed_value = settings.get_initial_value(key)
|
||||
else:
|
||||
parsed_value = settings.parse_raw_value(key, stripped)
|
||||
|
||||
serialized = cls._serialize_for_storage(key, parsed_value)
|
||||
|
||||
async with AsyncSessionLocal() as session:
|
||||
await AppSettingsCRUD.upsert(session, key, serialized)
|
||||
await session.commit()
|
||||
|
||||
settings.apply_override(key, parsed_value)
|
||||
|
||||
return {
|
||||
"key": key,
|
||||
"value": parsed_value,
|
||||
"display": settings.format_value_for_display(key, parsed_value),
|
||||
"is_default": settings.get_initial_value(key) == parsed_value,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def _collect_fields(cls) -> List[Dict[str, Any]]:
|
||||
fields: List[Dict[str, Any]] = []
|
||||
for key in settings.model_fields.keys():
|
||||
if not settings.is_editable_field(key):
|
||||
continue
|
||||
fields.append(
|
||||
{
|
||||
"key": key,
|
||||
"label": settings.get_field_label(key),
|
||||
"type": settings.get_field_type_name(key),
|
||||
"value": getattr(settings, key),
|
||||
"display_value": settings.format_value_for_display(key),
|
||||
"category_key": settings.get_field_category_key(key),
|
||||
"category_label": settings.get_field_category_label(key),
|
||||
"is_overridden": settings.get_initial_value(key) != getattr(settings, key),
|
||||
}
|
||||
)
|
||||
return fields
|
||||
|
||||
@classmethod
|
||||
def _serialize_for_storage(cls, key: str, value: Any) -> str:
|
||||
prepared = settings.serialize_value_for_storage(key, value)
|
||||
return json.dumps(prepared, ensure_ascii=False)
|
||||
|
||||
@classmethod
|
||||
def _deserialize_value(cls, key: str, raw_value: str | None) -> Any:
|
||||
if raw_value is None or raw_value == "":
|
||||
stored = None
|
||||
else:
|
||||
stored = json.loads(raw_value)
|
||||
return settings.cast_value(key, stored)
|
||||
@@ -87,6 +87,7 @@ class AdminStates(StatesGroup):
|
||||
|
||||
editing_rules_page = State()
|
||||
editing_notification_value = State()
|
||||
editing_bot_config_value = State()
|
||||
|
||||
confirming_sync = State()
|
||||
|
||||
|
||||
18
main.py
18
main.py
@@ -21,6 +21,7 @@ from app.database.universal_migration import run_universal_migration
|
||||
from app.services.backup_service import backup_service
|
||||
from app.services.reporting_service import reporting_service
|
||||
from app.localization.loader import ensure_locale_templates
|
||||
from app.services.app_settings_service import AppSettingsService
|
||||
|
||||
|
||||
class GracefulExit:
|
||||
@@ -73,18 +74,29 @@ async def main():
|
||||
logger.info("🔧 Выполняем проверку и миграцию базы данных...")
|
||||
try:
|
||||
migration_success = await run_universal_migration()
|
||||
|
||||
|
||||
if migration_success:
|
||||
logger.info("✅ Миграция базы данных завершена успешно")
|
||||
else:
|
||||
logger.warning("⚠️ Миграция завершилась с предупреждениями, но продолжаем запуск")
|
||||
|
||||
|
||||
except Exception as migration_error:
|
||||
logger.error(f"❌ Ошибка выполнения миграции: {migration_error}")
|
||||
logger.warning("⚠️ Продолжаем запуск без миграции")
|
||||
else:
|
||||
logger.info("ℹ️ Миграция пропущена (SKIP_MIGRATION=true)")
|
||||
|
||||
|
||||
try:
|
||||
await AppSettingsService.load_overrides()
|
||||
new_level = getattr(logging, settings.LOG_LEVEL, logging.INFO)
|
||||
root_logger = logging.getLogger()
|
||||
root_logger.setLevel(new_level)
|
||||
for handler in root_logger.handlers:
|
||||
handler.setLevel(new_level)
|
||||
logger.info("✅ Настройки из базы данных применены")
|
||||
except Exception as settings_error:
|
||||
logger.error(f"❌ Не удалось загрузить настройки из базы данных: {settings_error}")
|
||||
|
||||
logger.info("🤖 Настройка бота...")
|
||||
bot, dp = await setup_bot()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user