Добавить токен внешней админки и защиту настройки

This commit is contained in:
Egor
2025-10-08 01:46:06 +03:00
parent 6735cf17cf
commit b643f45c01
7 changed files with 274 additions and 24 deletions

View File

@@ -1,3 +1,4 @@
import hashlib
import logging
import os
import re
@@ -276,6 +277,8 @@ class Settings(BaseSettings):
BACKUP_SEND_CHAT_ID: Optional[str] = None
BACKUP_SEND_TOPIC_ID: Optional[int] = None
EXTERNAL_ADMIN_TOKEN: Optional[str] = None
@field_validator('SERVER_STATUS_MODE', mode='before')
@classmethod
def normalize_server_status_mode(cls, value: Optional[str]) -> str:
@@ -556,6 +559,20 @@ class Settings(BaseSettings):
def get_app_config_cache_ttl(self) -> int:
return self.APP_CONFIG_CACHE_TTL
def build_external_admin_token(self, bot_username: str) -> str:
"""Генерирует детерминированный токен внешней админки по username."""
normalized = (bot_username or "").strip().lstrip("@").lower()
if not normalized:
raise ValueError("Bot username is required to build external admin token")
seed = f"remnawave.external_admin::{normalized}"
digest = hashlib.sha256(seed.encode("utf-8")).hexdigest()
return digest[:48]
def get_external_admin_token(self) -> Optional[str]:
token = (self.EXTERNAL_ADMIN_TOKEN or "").strip()
return token or None
def is_traffic_selectable(self) -> bool:
return self.TRAFFIC_SELECTION_MODE.lower() == "selectable"

View File

@@ -18,7 +18,10 @@ from app.config import settings
from app.services.remnawave_service import RemnaWaveService
from app.services.payment_service import PaymentService
from app.services.tribute_service import TributeService
from app.services.system_settings_service import bot_configuration_service
from app.services.system_settings_service import (
ReadOnlySettingError,
bot_configuration_service,
)
from app.states import BotConfigStates
from app.utils.decorators import admin_required, error_handler
from app.utils.currency_converter import currency_converter
@@ -107,6 +110,12 @@ CATEGORY_GROUP_METADATA: Dict[str, Dict[str, object]] = {
"icon": "",
"categories": ("WEB_API", "WEBHOOK", "LOG", "DEBUG"),
},
"external_admin": {
"title": "🛡️ Внешняя админка",
"description": "Токен, по которому внешняя админка проверяет запросы.",
"icon": "🛡️",
"categories": ("EXTERNAL_ADMIN",),
},
}
CATEGORY_GROUP_ORDER: Tuple[str, ...] = (
@@ -123,6 +132,7 @@ CATEGORY_GROUP_ORDER: Tuple[str, ...] = (
"server",
"maintenance",
"advanced",
"external_admin",
)
CATEGORY_GROUP_DEFINITIONS: Tuple[Tuple[str, str, Tuple[str, ...]], ...] = tuple(
@@ -687,6 +697,12 @@ async def apply_preset(
try:
await bot_configuration_service.set_value(db, setting_key, value)
applied.append(setting_key)
except ReadOnlySettingError:
logging.getLogger(__name__).info(
"Пропускаем настройку %s из пресета %s: только для чтения",
setting_key,
preset_key,
)
except Exception as error:
logging.getLogger(__name__).warning(
"Не удалось применить пресет %s для %s: %s",
@@ -854,8 +870,14 @@ async def handle_import_message(
errors.append(f"{setting_key}: {error}")
continue
await bot_configuration_service.set_value(db, setting_key, value_to_apply)
applied.append(setting_key)
if bot_configuration_service.is_read_only(setting_key):
skipped.append(setting_key)
continue
try:
await bot_configuration_service.set_value(db, setting_key, value_to_apply)
applied.append(setting_key)
except ReadOnlySettingError:
skipped.append(setting_key)
await db.commit()
@@ -1361,9 +1383,10 @@ def _build_setting_keyboard(
definition = bot_configuration_service.get_definition(key)
rows: list[list[types.InlineKeyboardButton]] = []
callback_token = bot_configuration_service.get_callback_token(key)
is_read_only = bot_configuration_service.is_read_only(key)
choice_options = bot_configuration_service.get_choice_options(key)
if choice_options:
if choice_options and not is_read_only:
current_value = bot_configuration_service.get_current_value(key)
choice_buttons: list[types.InlineKeyboardButton] = []
for option in choice_options:
@@ -1385,7 +1408,7 @@ def _build_setting_keyboard(
for chunk in _chunk(choice_buttons, 2):
rows.append(list(chunk))
if definition.python_type is bool:
if definition.python_type is bool and not is_read_only:
rows.append([
types.InlineKeyboardButton(
text="🔁 Переключить",
@@ -1395,16 +1418,17 @@ def _build_setting_keyboard(
)
])
rows.append([
types.InlineKeyboardButton(
text="✏️ Изменить",
callback_data=(
f"botcfg_edit:{group_key}:{category_page}:{settings_page}:{callback_token}"
),
)
])
if not is_read_only:
rows.append([
types.InlineKeyboardButton(
text="✏️ Изменить",
callback_data=(
f"botcfg_edit:{group_key}:{category_page}:{settings_page}:{callback_token}"
),
)
])
if bot_configuration_service.has_override(key):
if bot_configuration_service.has_override(key) and not is_read_only:
rows.append([
types.InlineKeyboardButton(
text="♻️ Сбросить",
@@ -1414,6 +1438,14 @@ def _build_setting_keyboard(
)
])
if is_read_only:
rows.append([
types.InlineKeyboardButton(
text="🔒 Только для чтения",
callback_data="botcfg_group:noop",
)
])
rows.append([
types.InlineKeyboardButton(
text="⬅️ Назад",
@@ -1438,6 +1470,11 @@ def _render_setting_text(key: str) -> str:
f"📌 <b>Текущее:</b> {summary['current']}",
f"📦 <b>По умолчанию:</b> {summary['original']}",
f"✳️ <b>Переопределено:</b> {'Да' if summary['has_override'] else 'Нет'}",
*(
["🔒 <b>Режим:</b> Только для чтения (управляется автоматически)"]
if summary.get("is_read_only")
else []
),
"",
f"📘 <b>Описание:</b> {guidance['description']}",
f"📐 <b>Формат:</b> {guidance['format']}",
@@ -2066,6 +2103,9 @@ async def start_edit_setting(
except KeyError:
await callback.answer("Эта настройка больше недоступна", show_alert=True)
return
if bot_configuration_service.is_read_only(key):
await callback.answer("Эта настройка доступна только для чтения", show_alert=True)
return
definition = bot_configuration_service.get_definition(key)
summary = bot_configuration_service.get_setting_summary(key)
@@ -2131,13 +2171,23 @@ async def handle_edit_setting(
await state.clear()
return
if bot_configuration_service.is_read_only(key):
await message.answer("⚠️ Эта настройка доступна только для чтения.")
await state.clear()
return
try:
value = bot_configuration_service.parse_user_value(key, message.text or "")
except ValueError as error:
await message.answer(f"⚠️ {error}")
return
await bot_configuration_service.set_value(db, key, value)
try:
await bot_configuration_service.set_value(db, key, value)
except ReadOnlySettingError:
await message.answer("⚠️ Эта настройка доступна только для чтения.")
await state.clear()
return
await db.commit()
text = _render_setting_text(key)
@@ -2172,13 +2222,23 @@ async def handle_direct_setting_input(
if not key:
return
if bot_configuration_service.is_read_only(key):
await message.answer("⚠️ Эта настройка доступна только для чтения.")
await state.clear()
return
try:
value = bot_configuration_service.parse_user_value(key, message.text or "")
except ValueError as error:
await message.answer(f"⚠️ {error}")
return
await bot_configuration_service.set_value(db, key, value)
try:
await bot_configuration_service.set_value(db, key, value)
except ReadOnlySettingError:
await message.answer("⚠️ Эта настройка доступна только для чтения.")
await state.clear()
return
await db.commit()
text = _render_setting_text(key)
@@ -2220,7 +2280,14 @@ async def reset_setting(
except KeyError:
await callback.answer("Эта настройка больше недоступна", show_alert=True)
return
await bot_configuration_service.reset_value(db, key)
if bot_configuration_service.is_read_only(key):
await callback.answer("Эта настройка доступна только для чтения", show_alert=True)
return
try:
await bot_configuration_service.reset_value(db, key)
except ReadOnlySettingError:
await callback.answer("Эта настройка доступна только для чтения", show_alert=True)
return
await db.commit()
text = _render_setting_text(key)
@@ -2260,9 +2327,16 @@ async def toggle_setting(
except KeyError:
await callback.answer("Эта настройка больше недоступна", show_alert=True)
return
if bot_configuration_service.is_read_only(key):
await callback.answer("Эта настройка доступна только для чтения", show_alert=True)
return
current = bot_configuration_service.get_current_value(key)
new_value = not bool(current)
await bot_configuration_service.set_value(db, key, new_value)
try:
await bot_configuration_service.set_value(db, key, new_value)
except ReadOnlySettingError:
await callback.answer("Эта настройка доступна только для чтения", show_alert=True)
return
await db.commit()
text = _render_setting_text(key)
@@ -2304,6 +2378,9 @@ async def apply_setting_choice(
except KeyError:
await callback.answer("Эта настройка больше недоступна", show_alert=True)
return
if bot_configuration_service.is_read_only(key):
await callback.answer("Эта настройка доступна только для чтения", show_alert=True)
return
try:
value = bot_configuration_service.resolve_choice_token(key, choice_token)
@@ -2311,7 +2388,11 @@ async def apply_setting_choice(
await callback.answer("Это значение больше недоступно", show_alert=True)
return
await bot_configuration_service.set_value(db, key, value)
try:
await bot_configuration_service.set_value(db, key, value)
except ReadOnlySettingError:
await callback.answer("Эта настройка доступна только для чтения", show_alert=True)
return
await db.commit()
text = _render_setting_text(key)

View File

@@ -0,0 +1,81 @@
"""Утилиты для синхронизации токена внешней админки."""
from __future__ import annotations
import logging
from typing import Optional
from sqlalchemy import select
from sqlalchemy.exc import SQLAlchemyError
from app.config import settings
from app.database.database import AsyncSessionLocal
from app.database.models import SystemSetting
from app.services.system_settings_service import (
ReadOnlySettingError,
bot_configuration_service,
)
logger = logging.getLogger(__name__)
async def ensure_external_admin_token(bot_username: Optional[str]) -> Optional[str]:
"""Генерирует и сохраняет токен внешней админки, если требуется."""
username_raw = (bot_username or "").strip()
if not username_raw:
logger.warning(
"⚠️ Не удалось обеспечить токен внешней админки: username бота отсутствует",
)
return None
normalized_username = username_raw.lstrip("@").lower()
if not normalized_username:
logger.warning(
"⚠️ Не удалось обеспечить токен внешней админки: username пустой после нормализации",
)
return None
try:
token = settings.build_external_admin_token(normalized_username)
except Exception as error: # pragma: no cover - защитный блок
logger.error("❌ Ошибка генерации токена внешней админки: %s", error)
return None
try:
async with AsyncSessionLocal() as session:
result = await session.execute(
select(SystemSetting.value).where(SystemSetting.key == "EXTERNAL_ADMIN_TOKEN")
)
existing = result.scalar_one_or_none()
if existing == token:
if settings.get_external_admin_token() != token:
settings.EXTERNAL_ADMIN_TOKEN = token
return token
try:
await bot_configuration_service.set_value(
session,
"EXTERNAL_ADMIN_TOKEN",
token,
force=True,
)
await session.commit()
logger.info(
"✅ Токен внешней админки синхронизирован для @%s",
normalized_username,
)
except ReadOnlySettingError: # pragma: no cover - force=True предотвращает исключение
await session.rollback()
logger.warning(
"⚠️ Не удалось сохранить токен внешней админки из-за ограничения доступа",
)
return None
return token
except SQLAlchemyError as error:
logger.error("❌ Ошибка сохранения токена внешней админки: %s", error)
return None

View File

@@ -56,9 +56,16 @@ class ChoiceOption:
description: Optional[str] = None
class ReadOnlySettingError(RuntimeError):
"""Исключение, выбрасываемое при попытке изменить настройку только для чтения."""
class BotConfigurationService:
EXCLUDED_KEYS: set[str] = {"BOT_TOKEN", "ADMIN_IDS"}
READ_ONLY_KEYS: set[str] = {"EXTERNAL_ADMIN_TOKEN"}
PLAIN_TEXT_KEYS: set[str] = {"EXTERNAL_ADMIN_TOKEN"}
CATEGORY_TITLES: Dict[str, str] = {
"CORE": "🤖 Основные настройки",
"SUPPORT": "💬 Поддержка и тикеты",
@@ -71,6 +78,7 @@ class BotConfigurationService:
"TRIBUTE": "🎁 Tribute",
"MULENPAY": "💰 MulenPay",
"PAL24": "🏦 PAL24 / PayPalych",
"EXTERNAL_ADMIN": "🛡️ Внешняя админка",
"SUBSCRIPTIONS_CORE": "📅 Подписки и лимиты",
"PERIODS": "📆 Периоды подписок",
"SUBSCRIPTION_PRICES": "💵 Стоимость тарифов",
@@ -118,6 +126,7 @@ class BotConfigurationService:
"PAL24": "PAL24 / PayPalych подключения и лимиты.",
"TRIBUTE": "Tribute и донат-сервисы.",
"TELEGRAM": "Telegram Stars и их стоимость.",
"EXTERNAL_ADMIN": "Токен внешней админки для проверки запросов.",
"SUBSCRIPTIONS_CORE": "Лимиты устройств, трафика и базовые цены подписок.",
"PERIODS": "Доступные периоды подписок и продлений.",
"SUBSCRIPTION_PRICES": "Стоимость подписок по периодам в копейках.",
@@ -255,6 +264,7 @@ class BotConfigurationService:
"MULENPAY_": "MULENPAY",
"PAL24_": "PAL24",
"PAYMENT_": "PAYMENT",
"EXTERNAL_ADMIN_": "EXTERNAL_ADMIN",
"CONNECT_BUTTON_HAPP": "HAPP",
"HAPP_": "HAPP",
"SKIP_": "SKIP",
@@ -394,6 +404,13 @@ class BotConfigurationService:
"warning": "Недоступный адрес приведет к ошибкам при управлении VPN-учетками.",
"dependencies": "REMNAWAVE_API_KEY или REMNAWAVE_USERNAME/REMNAWAVE_PASSWORD",
},
"EXTERNAL_ADMIN_TOKEN": {
"description": "Детерминированный токен, который использует внешняя админка для проверки запросов.",
"format": "Значение генерируется автоматически из username бота и доступно только для чтения.",
"example": "Генерируется автоматически",
"warning": "Токен обновится только при смене username бота.",
"dependencies": "Username телеграм-бота",
},
}
@classmethod
@@ -405,6 +422,10 @@ class BotConfigurationService:
definition = cls.get_definition(key)
return definition.python_type is bool
@classmethod
def is_read_only(cls, key: str) -> bool:
return key in cls.READ_ONLY_KEYS
@classmethod
def _format_numeric_with_unit(cls, key: str, value: Union[int, float]) -> Optional[str]:
if isinstance(value, bool):
@@ -455,6 +476,8 @@ class BotConfigurationService:
cleaned = value.strip()
if not cleaned:
return ""
if key in cls.PLAIN_TEXT_KEYS:
return cleaned
if any(keyword in key.upper() for keyword in ("TOKEN", "SECRET", "PASSWORD", "KEY")):
return "••••••••"
items = cls._split_comma_values(cleaned)
@@ -860,7 +883,17 @@ class BotConfigurationService:
return parsed_value
@classmethod
async def set_value(cls, db: AsyncSession, key: str, value: Any) -> None:
async def set_value(
cls,
db: AsyncSession,
key: str,
value: Any,
*,
force: bool = False,
) -> None:
if cls.is_read_only(key) and not force:
raise ReadOnlySettingError(f"Setting {key} is read-only")
raw_value = cls.serialize_value(key, value)
await upsert_system_setting(db, key, raw_value)
cls._overrides_raw[key] = raw_value
@@ -870,7 +903,16 @@ class BotConfigurationService:
await cls._sync_default_web_api_token()
@classmethod
async def reset_value(cls, db: AsyncSession, key: str) -> None:
async def reset_value(
cls,
db: AsyncSession,
key: str,
*,
force: bool = False,
) -> None:
if cls.is_read_only(key) and not force:
raise ReadOnlySettingError(f"Setting {key} is read-only")
await delete_system_setting(db, key)
cls._overrides_raw.pop(key, None)
original = cls.get_original_value(key)
@@ -925,6 +967,7 @@ class BotConfigurationService:
"category_key": definition.category_key,
"category_label": definition.category_label,
"has_override": has_override,
"is_read_only": cls.is_read_only(key),
}

View File

@@ -5,7 +5,10 @@ from typing import Any, Optional
from fastapi import APIRouter, Depends, HTTPException, Query, Security, status
from sqlalchemy.ext.asyncio import AsyncSession
from app.services.system_settings_service import bot_configuration_service
from app.services.system_settings_service import (
ReadOnlySettingError,
bot_configuration_service,
)
from ..dependencies import get_db_session, require_api_token
from ..schemas.config import (
@@ -94,6 +97,7 @@ def _serialize_definition(definition, include_choices: bool = True) -> SettingDe
current=current,
original=original,
has_override=has_override,
read_only=bot_configuration_service.is_read_only(definition.key),
choices=choices,
)
@@ -153,7 +157,10 @@ async def update_setting(
raise HTTPException(status.HTTP_404_NOT_FOUND, "Setting not found") from error
value = _coerce_value(key, payload.value)
await bot_configuration_service.set_value(db, key, value)
try:
await bot_configuration_service.set_value(db, key, value)
except ReadOnlySettingError as error:
raise HTTPException(status.HTTP_403_FORBIDDEN, str(error)) from error
await db.commit()
return _serialize_definition(definition)
@@ -170,6 +177,9 @@ async def reset_setting(
except KeyError as error:
raise HTTPException(status.HTTP_404_NOT_FOUND, "Setting not found") from error
await bot_configuration_service.reset_value(db, key)
try:
await bot_configuration_service.reset_value(db, key)
except ReadOnlySettingError as error:
raise HTTPException(status.HTTP_403_FORBIDDEN, str(error)) from error
await db.commit()
return _serialize_definition(definition)

View File

@@ -45,6 +45,7 @@ class SettingDefinition(BaseModel):
current: Any | None = Field(default=None)
original: Any | None = Field(default=None)
has_override: bool
read_only: bool = Field(default=False)
choices: list[SettingChoice] = Field(default_factory=list)
model_config = ConfigDict(extra="forbid")

17
main.py
View File

@@ -22,6 +22,7 @@ from app.services.backup_service import backup_service
from app.services.reporting_service import reporting_service
from app.localization.loader import ensure_locale_templates
from app.services.system_settings_service import bot_configuration_service
from app.services.external_admin_service import ensure_external_admin_token
from app.services.broadcast_service import broadcast_service
from app.utils.startup_timeline import StartupTimeline
@@ -186,6 +187,22 @@ async def main():
payment_service = PaymentService(bot)
async with timeline.stage(
"Внешняя админка",
"🛡️",
success_message="Токен внешней админки готов",
) as stage:
try:
bot_user = await bot.get_me()
token = await ensure_external_admin_token(bot_user.username)
if token:
stage.log("Токен синхронизирован")
else:
stage.warning("Не удалось получить токен внешней админки")
except Exception as error: # pragma: no cover - защитный блок
stage.warning(f"Ошибка подготовки внешней админки: {error}")
logger.error("❌ Ошибка подготовки внешней админки: %s", error)
webhook_needed = (
settings.TRIBUTE_ENABLED
or settings.is_cryptobot_enabled()