Files
remnawave-bedolaga-telegram…/app/services/system_settings_service.py

565 lines
20 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import hashlib
import json
import logging
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple, Type, Union, get_args, get_origin
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import Settings, settings
from app.database.crud.system_setting import (
delete_system_setting,
upsert_system_setting,
)
from app.database.database import AsyncSessionLocal
from app.database.models import SystemSetting
logger = logging.getLogger(__name__)
def _title_from_key(key: str) -> str:
parts = key.split("_")
if not parts:
return key
return " ".join(part.capitalize() for part in parts)
def _truncate(value: str, max_len: int = 60) -> str:
value = value.strip()
if len(value) <= max_len:
return value
return value[: max_len - 1] + ""
@dataclass(slots=True)
class SettingDefinition:
key: str
category_key: str
category_label: str
python_type: Type[Any]
type_label: str
is_optional: bool
@property
def display_name(self) -> str:
return _title_from_key(self.key)
@dataclass(slots=True)
class ChoiceOption:
value: Any
label: str
description: Optional[str] = None
class BotConfigurationService:
EXCLUDED_KEYS: set[str] = {"BOT_TOKEN", "ADMIN_IDS"}
CATEGORY_TITLES: Dict[str, str] = {
"DATABASE": "База данных",
"POSTGRES": "PostgreSQL",
"SQLITE": "SQLite",
"REDIS": "Redis",
"REMNAWAVE": "Remnawave",
"SUPPORT": "Поддержка",
"ADMIN": "Администрирование",
"CHANNEL": "Каналы",
"TRIAL": "Триал",
"DEFAULT": "Значения по умолчанию",
"PRICE": "Цены",
"TRAFFIC": "Трафик",
"REFERRAL": "Реферальная программа",
"AUTOPAY": "Автопродление",
"MONITORING": "Мониторинг",
"SERVER": "Статус серверов",
"MAINTENANCE": "Техработы",
"PAYMENT": "Оплаты",
"YOOKASSA": "YooKassa",
"CRYPTOBOT": "CryptoBot",
"MULENPAY": "MulenPay",
"PAL24": "PayPalych",
"CONNECT": "Кнопка подключения",
"HAPP": "Happ",
"VERSION": "Версии",
"BACKUP": "Бекапы",
"WEBHOOK": "Вебхуки",
"LOG": "Логи",
"DEBUG": "Отладка",
"TRIBUTE": "Tribute",
"TELEGRAM": "Telegram Stars",
}
CHOICES: Dict[str, List[ChoiceOption]] = {
"DATABASE_MODE": [
ChoiceOption("auto", "🤖 Авто"),
ChoiceOption("postgresql", "🐘 PostgreSQL"),
ChoiceOption("sqlite", "💾 SQLite"),
],
"REMNAWAVE_AUTH_TYPE": [
ChoiceOption("api_key", "🔑 API Key"),
ChoiceOption("basic_auth", "🧾 Basic Auth"),
],
"REMNAWAVE_USER_DELETE_MODE": [
ChoiceOption("delete", "🗑 Удалять"),
ChoiceOption("disable", "🚫 Деактивировать"),
],
"TRAFFIC_SELECTION_MODE": [
ChoiceOption("selectable", "📦 Выбор пакетов"),
ChoiceOption("fixed", "📏 Фиксированный лимит"),
],
"DEFAULT_TRAFFIC_RESET_STRATEGY": [
ChoiceOption("NO_RESET", "♾️ Без сброса"),
ChoiceOption("DAY", "📅 Ежедневно"),
ChoiceOption("WEEK", "🗓 Еженедельно"),
ChoiceOption("MONTH", "📆 Ежемесячно"),
],
"SUPPORT_SYSTEM_MODE": [
ChoiceOption("tickets", "🎫 Только тикеты"),
ChoiceOption("contact", "💬 Только контакт"),
ChoiceOption("both", "🔁 Оба варианта"),
],
"CONNECT_BUTTON_MODE": [
ChoiceOption("guide", "📘 Гайд"),
ChoiceOption("miniapp_subscription", "🧾 Mini App подписка"),
ChoiceOption("miniapp_custom", "🧩 Mini App (ссылка)"),
ChoiceOption("link", "🔗 Прямая ссылка"),
ChoiceOption("happ_cryptolink", "🪙 Happ CryptoLink"),
],
"SERVER_STATUS_MODE": [
ChoiceOption("disabled", "🚫 Отключено"),
ChoiceOption("external_link", "🌐 Внешняя ссылка"),
ChoiceOption("external_link_miniapp", "🧭 Mini App ссылка"),
ChoiceOption("xray", "📊 XRay Checker"),
],
"YOOKASSA_PAYMENT_MODE": [
ChoiceOption("full_payment", "💳 Полная оплата"),
ChoiceOption("partial_payment", "🪙 Частичная оплата"),
ChoiceOption("advance", "💼 Аванс"),
ChoiceOption("full_prepayment", "📦 Полная предоплата"),
ChoiceOption("partial_prepayment", "📦 Частичная предоплата"),
ChoiceOption("credit", "💰 Кредит"),
ChoiceOption("credit_payment", "💸 Погашение кредита"),
],
"YOOKASSA_PAYMENT_SUBJECT": [
ChoiceOption("commodity", "📦 Товар"),
ChoiceOption("excise", "🥃 Подакцизный товар"),
ChoiceOption("job", "🛠 Работа"),
ChoiceOption("service", "🧾 Услуга"),
ChoiceOption("gambling_bet", "🎲 Ставка"),
ChoiceOption("gambling_prize", "🏆 Выигрыш"),
ChoiceOption("lottery", "🎫 Лотерея"),
ChoiceOption("lottery_prize", "🎁 Приз лотереи"),
ChoiceOption("intellectual_activity", "🧠 Интеллектуальная деятельность"),
ChoiceOption("payment", "💱 Платеж"),
ChoiceOption("agent_commission", "🤝 Комиссия агента"),
ChoiceOption("composite", "🧩 Композитный"),
ChoiceOption("another", "📄 Другое"),
],
"YOOKASSA_VAT_CODE": [
ChoiceOption(1, "1 — НДС не облагается"),
ChoiceOption(2, "2 — НДС 0%"),
ChoiceOption(3, "3 — НДС 10%"),
ChoiceOption(4, "4 — НДС 20%"),
ChoiceOption(5, "5 — НДС 10/110"),
ChoiceOption(6, "6 — НДС 20/120"),
],
"MULENPAY_LANGUAGE": [
ChoiceOption("ru", "🇷🇺 Русский"),
ChoiceOption("en", "🇬🇧 Английский"),
],
"LOG_LEVEL": [
ChoiceOption("DEBUG", "🐞 Debug"),
ChoiceOption("INFO", " Info"),
ChoiceOption("WARNING", "⚠️ Warning"),
ChoiceOption("ERROR", "❌ Error"),
ChoiceOption("CRITICAL", "🔥 Critical"),
],
}
_definitions: Dict[str, SettingDefinition] = {}
_original_values: Dict[str, Any] = settings.model_dump()
_overrides_raw: Dict[str, Optional[str]] = {}
_callback_tokens: Dict[str, str] = {}
_token_to_key: Dict[str, str] = {}
_choice_tokens: Dict[str, Dict[Any, str]] = {}
_choice_token_lookup: Dict[str, Dict[str, Any]] = {}
@classmethod
def initialize_definitions(cls) -> None:
if cls._definitions:
return
for key, field in Settings.model_fields.items():
if key in cls.EXCLUDED_KEYS:
continue
annotation = field.annotation
python_type, is_optional = cls._normalize_type(annotation)
type_label = cls._type_to_label(python_type, is_optional)
category_key = cls._resolve_category_key(key)
category_label = cls.CATEGORY_TITLES.get(
category_key,
category_key.capitalize() if category_key else "Прочее",
)
cls._definitions[key] = SettingDefinition(
key=key,
category_key=category_key or "other",
category_label=category_label,
python_type=python_type,
type_label=type_label,
is_optional=is_optional,
)
cls._register_callback_token(key)
if key in cls.CHOICES:
cls._ensure_choice_tokens(key)
@classmethod
def _resolve_category_key(cls, key: str) -> str:
if "_" not in key:
return key.upper()
prefix = key.split("_", 1)[0]
return prefix.upper()
@classmethod
def _normalize_type(cls, annotation: Any) -> Tuple[Type[Any], bool]:
if annotation is None:
return str, True
origin = get_origin(annotation)
if origin is Union:
args = [arg for arg in get_args(annotation) if arg is not type(None)]
if len(args) == 1:
nested_type, nested_optional = cls._normalize_type(args[0])
return nested_type, True
return str, True
if annotation in {int, float, bool, str}:
return annotation, False
if annotation in {Optional[int], Optional[float], Optional[bool], Optional[str]}:
nested = get_args(annotation)[0]
return nested, True
# Paths, lists, dicts и прочее будем хранить как строки
return str, False
@classmethod
def _type_to_label(cls, python_type: Type[Any], is_optional: bool) -> str:
base = {
bool: "bool",
int: "int",
float: "float",
str: "str",
}.get(python_type, "str")
return f"optional[{base}]" if is_optional else base
@classmethod
def get_categories(cls) -> List[Tuple[str, str, int]]:
cls.initialize_definitions()
categories: Dict[str, List[SettingDefinition]] = {}
for definition in cls._definitions.values():
categories.setdefault(definition.category_key, []).append(definition)
result: List[Tuple[str, str, int]] = []
for category_key, items in categories.items():
label = items[0].category_label
result.append((category_key, label, len(items)))
result.sort(key=lambda item: item[1])
return result
@classmethod
def get_settings_for_category(cls, category_key: str) -> List[SettingDefinition]:
cls.initialize_definitions()
filtered = [
definition
for definition in cls._definitions.values()
if definition.category_key == category_key
]
filtered.sort(key=lambda definition: definition.key)
return filtered
@classmethod
def get_definition(cls, key: str) -> SettingDefinition:
cls.initialize_definitions()
return cls._definitions[key]
@classmethod
def has_override(cls, key: str) -> bool:
return key in cls._overrides_raw
@classmethod
def get_current_value(cls, key: str) -> Any:
return getattr(settings, key)
@classmethod
def get_original_value(cls, key: str) -> Any:
return cls._original_values.get(key)
@classmethod
def format_value(cls, value: Any) -> str:
if value is None:
return ""
if isinstance(value, bool):
return "✅ Да" if value else "❌ Нет"
if isinstance(value, (int, float)):
return str(value)
if isinstance(value, (list, dict, tuple, set)):
try:
return json.dumps(value, ensure_ascii=False)
except Exception:
return str(value)
return str(value)
@classmethod
def format_value_for_list(cls, key: str) -> str:
value = cls.get_current_value(key)
formatted = cls.format_value(value)
if formatted == "":
return formatted
return _truncate(formatted)
@classmethod
def get_choice_options(cls, key: str) -> List[ChoiceOption]:
cls.initialize_definitions()
return cls.CHOICES.get(key, [])
@classmethod
def has_choices(cls, key: str) -> bool:
return bool(cls.get_choice_options(key))
@classmethod
def get_callback_token(cls, key: str) -> str:
cls.initialize_definitions()
return cls._callback_tokens[key]
@classmethod
def resolve_callback_token(cls, token: str) -> str:
cls.initialize_definitions()
return cls._token_to_key[token]
@classmethod
def get_choice_token(cls, key: str, value: Any) -> Optional[str]:
cls.initialize_definitions()
cls._ensure_choice_tokens(key)
return cls._choice_tokens.get(key, {}).get(value)
@classmethod
def resolve_choice_token(cls, key: str, token: str) -> Any:
cls.initialize_definitions()
cls._ensure_choice_tokens(key)
return cls._choice_token_lookup.get(key, {})[token]
@classmethod
def _register_callback_token(cls, key: str) -> None:
if key in cls._callback_tokens:
return
base = hashlib.blake2s(key.encode("utf-8"), digest_size=6).hexdigest()
candidate = base
counter = 1
while candidate in cls._token_to_key and cls._token_to_key[candidate] != key:
suffix = cls._encode_base36(counter)
candidate = f"{base}{suffix}"[:16]
counter += 1
cls._callback_tokens[key] = candidate
cls._token_to_key[candidate] = key
@classmethod
def _ensure_choice_tokens(cls, key: str) -> None:
if key in cls._choice_tokens:
return
options = cls.CHOICES.get(key, [])
value_to_token: Dict[Any, str] = {}
token_to_value: Dict[str, Any] = {}
for index, option in enumerate(options):
token = cls._encode_base36(index)
value_to_token[option.value] = token
token_to_value[token] = option.value
cls._choice_tokens[key] = value_to_token
cls._choice_token_lookup[key] = token_to_value
@staticmethod
def _encode_base36(number: int) -> str:
if number < 0:
raise ValueError("number must be non-negative")
alphabet = "0123456789abcdefghijklmnopqrstuvwxyz"
if number == 0:
return "0"
result = []
while number:
number, rem = divmod(number, 36)
result.append(alphabet[rem])
return "".join(reversed(result))
@classmethod
async def initialize(cls) -> None:
cls.initialize_definitions()
async with AsyncSessionLocal() as session:
result = await session.execute(select(SystemSetting))
rows = result.scalars().all()
overrides: Dict[str, Optional[str]] = {}
for row in rows:
if row.key in cls._definitions:
overrides[row.key] = row.value
for key, raw_value in overrides.items():
try:
parsed_value = cls.deserialize_value(key, raw_value)
except Exception as error:
logger.error("Не удалось применить настройку %s: %s", key, error)
continue
cls._overrides_raw[key] = raw_value
cls._apply_to_settings(key, parsed_value)
@classmethod
async def reload(cls) -> None:
cls._overrides_raw.clear()
await cls.initialize()
@classmethod
def deserialize_value(cls, key: str, raw_value: Optional[str]) -> Any:
if raw_value is None:
return None
definition = cls.get_definition(key)
python_type = definition.python_type
if python_type is bool:
value_lower = raw_value.strip().lower()
if value_lower in {"1", "true", "on", "yes", "да"}:
return True
if value_lower in {"0", "false", "off", "no", "нет"}:
return False
raise ValueError(f"Неверное булево значение: {raw_value}")
if python_type is int:
return int(raw_value)
if python_type is float:
return float(raw_value)
return raw_value
@classmethod
def serialize_value(cls, key: str, value: Any) -> Optional[str]:
if value is None:
return None
definition = cls.get_definition(key)
python_type = definition.python_type
if python_type is bool:
return "true" if value else "false"
if python_type in {int, float}:
return str(value)
return str(value)
@classmethod
def parse_user_value(cls, key: str, user_input: str) -> Any:
definition = cls.get_definition(key)
text = (user_input or "").strip()
if text.lower() in {"отмена", "cancel"}:
raise ValueError("Ввод отменен пользователем")
if definition.is_optional and text.lower() in {"none", "null", "пусто", ""}:
return None
python_type = definition.python_type
if python_type is bool:
lowered = text.lower()
if lowered in {"1", "true", "on", "yes", "да", "вкл", "enable", "enabled"}:
return True
if lowered in {"0", "false", "off", "no", "нет", "выкл", "disable", "disabled"}:
return False
raise ValueError("Введите 'true' или 'false' (или 'да'/'нет')")
if python_type is int:
parsed_value: Any = int(text)
elif python_type is float:
parsed_value = float(text.replace(",", "."))
else:
parsed_value = text
choices = cls.get_choice_options(key)
if choices:
allowed_values = {option.value for option in choices}
if python_type is str:
lowered_map = {
str(option.value).lower(): option.value for option in choices
}
normalized = lowered_map.get(str(parsed_value).lower())
if normalized is not None:
parsed_value = normalized
elif parsed_value not in allowed_values:
readable = ", ".join(
f"{option.label} ({cls.format_value(option.value)})" for option in choices
)
raise ValueError(f"Доступные значения: {readable}")
elif parsed_value not in allowed_values:
readable = ", ".join(
f"{option.label} ({cls.format_value(option.value)})" for option in choices
)
raise ValueError(f"Доступные значения: {readable}")
return parsed_value
@classmethod
async def set_value(cls, db: AsyncSession, key: str, value: Any) -> None:
raw_value = cls.serialize_value(key, value)
await upsert_system_setting(db, key, raw_value)
cls._overrides_raw[key] = raw_value
cls._apply_to_settings(key, value)
@classmethod
async def reset_value(cls, db: AsyncSession, key: str) -> None:
await delete_system_setting(db, key)
cls._overrides_raw.pop(key, None)
original = cls.get_original_value(key)
cls._apply_to_settings(key, original)
@classmethod
def _apply_to_settings(cls, key: str, value: Any) -> None:
try:
setattr(settings, key, value)
except Exception as error:
logger.error("Не удалось применить значение %s=%s: %s", key, value, error)
@classmethod
def get_setting_summary(cls, key: str) -> Dict[str, Any]:
definition = cls.get_definition(key)
current = cls.get_current_value(key)
original = cls.get_original_value(key)
has_override = cls.has_override(key)
return {
"key": key,
"name": definition.display_name,
"current": cls.format_value(current),
"original": cls.format_value(original),
"type": definition.type_label,
"category_key": definition.category_key,
"category_label": definition.category_label,
"has_override": has_override,
}
bot_configuration_service = BotConfigurationService