Очистка токена внешней админки при подмене

This commit is contained in:
Egor
2025-10-08 02:31:12 +03:00
parent 6735cf17cf
commit 6225463af1
7 changed files with 382 additions and 24 deletions

View File

@@ -1,3 +1,5 @@
import hashlib
import hmac
import logging
import os
import re
@@ -276,6 +278,9 @@ class Settings(BaseSettings):
BACKUP_SEND_CHAT_ID: Optional[str] = None
BACKUP_SEND_TOPIC_ID: Optional[int] = None
EXTERNAL_ADMIN_TOKEN: Optional[str] = None
EXTERNAL_ADMIN_TOKEN_BOT_ID: Optional[int] = None
@field_validator('SERVER_STATUS_MODE', mode='before')
@classmethod
def normalize_server_status_mode(cls, value: Optional[str]) -> str:
@@ -556,6 +561,37 @@ class Settings(BaseSettings):
def get_app_config_cache_ttl(self) -> int:
return self.APP_CONFIG_CACHE_TTL
def build_external_admin_token(self, bot_username: str) -> str:
"""Генерирует детерминированный и криптографически стойкий токен внешней админки."""
normalized = (bot_username or "").strip().lstrip("@").lower()
if not normalized:
raise ValueError("Bot username is required to build external admin token")
secret = (self.BOT_TOKEN or "").strip()
if not secret:
raise ValueError("Bot token is required to build external admin token")
digest = hmac.new(
key=secret.encode("utf-8"),
msg=f"remnawave.external_admin::{normalized}".encode("utf-8"),
digestmod=hashlib.sha256,
).hexdigest()
return digest[:48]
def get_external_admin_token(self) -> Optional[str]:
token = (self.EXTERNAL_ADMIN_TOKEN or "").strip()
return token or None
def get_external_admin_bot_id(self) -> Optional[int]:
try:
return int(self.EXTERNAL_ADMIN_TOKEN_BOT_ID) if self.EXTERNAL_ADMIN_TOKEN_BOT_ID else None
except (TypeError, ValueError): # pragma: no cover - защитная ветка для некорректных значений
logging.getLogger(__name__).warning(
"Некорректный идентификатор бота для внешней админки: %s",
self.EXTERNAL_ADMIN_TOKEN_BOT_ID,
)
return None
def is_traffic_selectable(self) -> bool:
return self.TRAFFIC_SELECTION_MODE.lower() == "selectable"

View File

@@ -18,7 +18,10 @@ from app.config import settings
from app.services.remnawave_service import RemnaWaveService
from app.services.payment_service import PaymentService
from app.services.tribute_service import TributeService
from app.services.system_settings_service import bot_configuration_service
from app.services.system_settings_service import (
ReadOnlySettingError,
bot_configuration_service,
)
from app.states import BotConfigStates
from app.utils.decorators import admin_required, error_handler
from app.utils.currency_converter import currency_converter
@@ -107,6 +110,12 @@ CATEGORY_GROUP_METADATA: Dict[str, Dict[str, object]] = {
"icon": "",
"categories": ("WEB_API", "WEBHOOK", "LOG", "DEBUG"),
},
"external_admin": {
"title": "🛡️ Внешняя админка",
"description": "Токен, по которому внешняя админка проверяет запросы.",
"icon": "🛡️",
"categories": ("EXTERNAL_ADMIN",),
},
}
CATEGORY_GROUP_ORDER: Tuple[str, ...] = (
@@ -123,6 +132,7 @@ CATEGORY_GROUP_ORDER: Tuple[str, ...] = (
"server",
"maintenance",
"advanced",
"external_admin",
)
CATEGORY_GROUP_DEFINITIONS: Tuple[Tuple[str, str, Tuple[str, ...]], ...] = tuple(
@@ -687,6 +697,12 @@ async def apply_preset(
try:
await bot_configuration_service.set_value(db, setting_key, value)
applied.append(setting_key)
except ReadOnlySettingError:
logging.getLogger(__name__).info(
"Пропускаем настройку %s из пресета %s: только для чтения",
setting_key,
preset_key,
)
except Exception as error:
logging.getLogger(__name__).warning(
"Не удалось применить пресет %s для %s: %s",
@@ -854,8 +870,14 @@ async def handle_import_message(
errors.append(f"{setting_key}: {error}")
continue
await bot_configuration_service.set_value(db, setting_key, value_to_apply)
applied.append(setting_key)
if bot_configuration_service.is_read_only(setting_key):
skipped.append(setting_key)
continue
try:
await bot_configuration_service.set_value(db, setting_key, value_to_apply)
applied.append(setting_key)
except ReadOnlySettingError:
skipped.append(setting_key)
await db.commit()
@@ -1361,9 +1383,10 @@ def _build_setting_keyboard(
definition = bot_configuration_service.get_definition(key)
rows: list[list[types.InlineKeyboardButton]] = []
callback_token = bot_configuration_service.get_callback_token(key)
is_read_only = bot_configuration_service.is_read_only(key)
choice_options = bot_configuration_service.get_choice_options(key)
if choice_options:
if choice_options and not is_read_only:
current_value = bot_configuration_service.get_current_value(key)
choice_buttons: list[types.InlineKeyboardButton] = []
for option in choice_options:
@@ -1385,7 +1408,7 @@ def _build_setting_keyboard(
for chunk in _chunk(choice_buttons, 2):
rows.append(list(chunk))
if definition.python_type is bool:
if definition.python_type is bool and not is_read_only:
rows.append([
types.InlineKeyboardButton(
text="🔁 Переключить",
@@ -1395,16 +1418,17 @@ def _build_setting_keyboard(
)
])
rows.append([
types.InlineKeyboardButton(
text="✏️ Изменить",
callback_data=(
f"botcfg_edit:{group_key}:{category_page}:{settings_page}:{callback_token}"
),
)
])
if not is_read_only:
rows.append([
types.InlineKeyboardButton(
text="✏️ Изменить",
callback_data=(
f"botcfg_edit:{group_key}:{category_page}:{settings_page}:{callback_token}"
),
)
])
if bot_configuration_service.has_override(key):
if bot_configuration_service.has_override(key) and not is_read_only:
rows.append([
types.InlineKeyboardButton(
text="♻️ Сбросить",
@@ -1414,6 +1438,14 @@ def _build_setting_keyboard(
)
])
if is_read_only:
rows.append([
types.InlineKeyboardButton(
text="🔒 Только для чтения",
callback_data="botcfg_group:noop",
)
])
rows.append([
types.InlineKeyboardButton(
text="⬅️ Назад",
@@ -1438,6 +1470,11 @@ def _render_setting_text(key: str) -> str:
f"📌 <b>Текущее:</b> {summary['current']}",
f"📦 <b>По умолчанию:</b> {summary['original']}",
f"✳️ <b>Переопределено:</b> {'Да' if summary['has_override'] else 'Нет'}",
*(
["🔒 <b>Режим:</b> Только для чтения (управляется автоматически)"]
if summary.get("is_read_only")
else []
),
"",
f"📘 <b>Описание:</b> {guidance['description']}",
f"📐 <b>Формат:</b> {guidance['format']}",
@@ -2066,6 +2103,9 @@ async def start_edit_setting(
except KeyError:
await callback.answer("Эта настройка больше недоступна", show_alert=True)
return
if bot_configuration_service.is_read_only(key):
await callback.answer("Эта настройка доступна только для чтения", show_alert=True)
return
definition = bot_configuration_service.get_definition(key)
summary = bot_configuration_service.get_setting_summary(key)
@@ -2131,13 +2171,23 @@ async def handle_edit_setting(
await state.clear()
return
if bot_configuration_service.is_read_only(key):
await message.answer("⚠️ Эта настройка доступна только для чтения.")
await state.clear()
return
try:
value = bot_configuration_service.parse_user_value(key, message.text or "")
except ValueError as error:
await message.answer(f"⚠️ {error}")
return
await bot_configuration_service.set_value(db, key, value)
try:
await bot_configuration_service.set_value(db, key, value)
except ReadOnlySettingError:
await message.answer("⚠️ Эта настройка доступна только для чтения.")
await state.clear()
return
await db.commit()
text = _render_setting_text(key)
@@ -2172,13 +2222,23 @@ async def handle_direct_setting_input(
if not key:
return
if bot_configuration_service.is_read_only(key):
await message.answer("⚠️ Эта настройка доступна только для чтения.")
await state.clear()
return
try:
value = bot_configuration_service.parse_user_value(key, message.text or "")
except ValueError as error:
await message.answer(f"⚠️ {error}")
return
await bot_configuration_service.set_value(db, key, value)
try:
await bot_configuration_service.set_value(db, key, value)
except ReadOnlySettingError:
await message.answer("⚠️ Эта настройка доступна только для чтения.")
await state.clear()
return
await db.commit()
text = _render_setting_text(key)
@@ -2220,7 +2280,14 @@ async def reset_setting(
except KeyError:
await callback.answer("Эта настройка больше недоступна", show_alert=True)
return
await bot_configuration_service.reset_value(db, key)
if bot_configuration_service.is_read_only(key):
await callback.answer("Эта настройка доступна только для чтения", show_alert=True)
return
try:
await bot_configuration_service.reset_value(db, key)
except ReadOnlySettingError:
await callback.answer("Эта настройка доступна только для чтения", show_alert=True)
return
await db.commit()
text = _render_setting_text(key)
@@ -2260,9 +2327,16 @@ async def toggle_setting(
except KeyError:
await callback.answer("Эта настройка больше недоступна", show_alert=True)
return
if bot_configuration_service.is_read_only(key):
await callback.answer("Эта настройка доступна только для чтения", show_alert=True)
return
current = bot_configuration_service.get_current_value(key)
new_value = not bool(current)
await bot_configuration_service.set_value(db, key, new_value)
try:
await bot_configuration_service.set_value(db, key, new_value)
except ReadOnlySettingError:
await callback.answer("Эта настройка доступна только для чтения", show_alert=True)
return
await db.commit()
text = _render_setting_text(key)
@@ -2304,6 +2378,9 @@ async def apply_setting_choice(
except KeyError:
await callback.answer("Эта настройка больше недоступна", show_alert=True)
return
if bot_configuration_service.is_read_only(key):
await callback.answer("Эта настройка доступна только для чтения", show_alert=True)
return
try:
value = bot_configuration_service.resolve_choice_token(key, choice_token)
@@ -2311,7 +2388,11 @@ async def apply_setting_choice(
await callback.answer("Это значение больше недоступно", show_alert=True)
return
await bot_configuration_service.set_value(db, key, value)
try:
await bot_configuration_service.set_value(db, key, value)
except ReadOnlySettingError:
await callback.answer("Эта настройка доступна только для чтения", show_alert=True)
return
await db.commit()
text = _render_setting_text(key)

View File

@@ -0,0 +1,160 @@
"""Утилиты для синхронизации токена внешней админки."""
from __future__ import annotations
import logging
from typing import Optional
from sqlalchemy import select
from sqlalchemy.exc import SQLAlchemyError
from app.config import settings
from app.database.database import AsyncSessionLocal
from app.database.models import SystemSetting
from app.services.system_settings_service import (
ReadOnlySettingError,
bot_configuration_service,
)
logger = logging.getLogger(__name__)
async def ensure_external_admin_token(
bot_username: Optional[str],
bot_id: Optional[int],
) -> Optional[str]:
"""Генерирует и сохраняет токен внешней админки, если требуется."""
username_raw = (bot_username or "").strip()
if not username_raw:
logger.warning(
"⚠️ Не удалось обеспечить токен внешней админки: username бота отсутствует",
)
return None
normalized_username = username_raw.lstrip("@").lower()
if not normalized_username:
logger.warning(
"⚠️ Не удалось обеспечить токен внешней админки: username пустой после нормализации",
)
return None
try:
token = settings.build_external_admin_token(normalized_username)
except Exception as error: # pragma: no cover - защитный блок
logger.error("❌ Ошибка генерации токена внешней админки: %s", error)
return None
try:
async with AsyncSessionLocal() as session:
result = await session.execute(
select(SystemSetting.key, SystemSetting.value).where(
SystemSetting.key.in_(
["EXTERNAL_ADMIN_TOKEN", "EXTERNAL_ADMIN_TOKEN_BOT_ID"]
)
)
)
rows = dict(result.all())
existing_token = rows.get("EXTERNAL_ADMIN_TOKEN")
existing_bot_id_raw = rows.get("EXTERNAL_ADMIN_TOKEN_BOT_ID")
existing_bot_id: Optional[int] = None
if existing_bot_id_raw is not None:
try:
existing_bot_id = int(existing_bot_id_raw)
except (TypeError, ValueError): # pragma: no cover - защита от мусорных значений
logger.warning(
"⚠️ Не удалось разобрать сохраненный идентификатор бота внешней админки: %s",
existing_bot_id_raw,
)
if existing_token == token and existing_bot_id == bot_id:
if settings.get_external_admin_token() != token:
settings.EXTERNAL_ADMIN_TOKEN = token
if settings.EXTERNAL_ADMIN_TOKEN_BOT_ID != existing_bot_id:
settings.EXTERNAL_ADMIN_TOKEN_BOT_ID = existing_bot_id
return token
if existing_bot_id is not None and bot_id is not None and existing_bot_id != bot_id:
logger.error(
"❌ Обнаружено несовпадение ID бота для токена внешней админки: сохранен %s, текущий %s",
existing_bot_id,
bot_id,
)
try:
await bot_configuration_service.reset_value(
session,
"EXTERNAL_ADMIN_TOKEN",
force=True,
)
await bot_configuration_service.reset_value(
session,
"EXTERNAL_ADMIN_TOKEN_BOT_ID",
force=True,
)
await session.commit()
logger.warning(
"⚠️ Токен внешней админки очищен из-за несовпадения идентификаторов бота",
)
except Exception as cleanup_error: # pragma: no cover - защитный блок
await session.rollback()
logger.error(
"Не удалось очистить токен внешней админки после обнаружения подмены: %s",
cleanup_error,
)
finally:
settings.EXTERNAL_ADMIN_TOKEN = None
settings.EXTERNAL_ADMIN_TOKEN_BOT_ID = None
return None
updates: list[tuple[str, object]] = []
if existing_token != token:
updates.append(("EXTERNAL_ADMIN_TOKEN", token))
if bot_id is not None and existing_bot_id != bot_id:
updates.append(("EXTERNAL_ADMIN_TOKEN_BOT_ID", bot_id))
if not updates:
# Токен совпал, но могли отсутствовать значения в настройках приложения
if settings.get_external_admin_token() != (existing_token or token):
settings.EXTERNAL_ADMIN_TOKEN = existing_token or token
if existing_bot_id is not None and (
settings.EXTERNAL_ADMIN_TOKEN_BOT_ID != existing_bot_id
):
settings.EXTERNAL_ADMIN_TOKEN_BOT_ID = existing_bot_id
elif (
bot_id is not None
and settings.EXTERNAL_ADMIN_TOKEN_BOT_ID != bot_id
and existing_bot_id is None
):
settings.EXTERNAL_ADMIN_TOKEN_BOT_ID = bot_id
return existing_token or token
try:
for key, value in updates:
await bot_configuration_service.set_value(
session,
key,
value,
force=True,
)
await session.commit()
logger.info(
"✅ Токен внешней админки синхронизирован для @%s",
normalized_username,
)
except ReadOnlySettingError: # pragma: no cover - force=True предотвращает исключение
await session.rollback()
logger.warning(
"⚠️ Не удалось сохранить токен внешней админки из-за ограничения доступа",
)
return None
return token
except SQLAlchemyError as error:
logger.error("❌ Ошибка сохранения токена внешней админки: %s", error)
return None

View File

@@ -56,9 +56,16 @@ class ChoiceOption:
description: Optional[str] = None
class ReadOnlySettingError(RuntimeError):
"""Исключение, выбрасываемое при попытке изменить настройку только для чтения."""
class BotConfigurationService:
EXCLUDED_KEYS: set[str] = {"BOT_TOKEN", "ADMIN_IDS"}
READ_ONLY_KEYS: set[str] = {"EXTERNAL_ADMIN_TOKEN", "EXTERNAL_ADMIN_TOKEN_BOT_ID"}
PLAIN_TEXT_KEYS: set[str] = {"EXTERNAL_ADMIN_TOKEN", "EXTERNAL_ADMIN_TOKEN_BOT_ID"}
CATEGORY_TITLES: Dict[str, str] = {
"CORE": "🤖 Основные настройки",
"SUPPORT": "💬 Поддержка и тикеты",
@@ -71,6 +78,7 @@ class BotConfigurationService:
"TRIBUTE": "🎁 Tribute",
"MULENPAY": "💰 MulenPay",
"PAL24": "🏦 PAL24 / PayPalych",
"EXTERNAL_ADMIN": "🛡️ Внешняя админка",
"SUBSCRIPTIONS_CORE": "📅 Подписки и лимиты",
"PERIODS": "📆 Периоды подписок",
"SUBSCRIPTION_PRICES": "💵 Стоимость тарифов",
@@ -118,6 +126,7 @@ class BotConfigurationService:
"PAL24": "PAL24 / PayPalych подключения и лимиты.",
"TRIBUTE": "Tribute и донат-сервисы.",
"TELEGRAM": "Telegram Stars и их стоимость.",
"EXTERNAL_ADMIN": "Токен внешней админки для проверки запросов.",
"SUBSCRIPTIONS_CORE": "Лимиты устройств, трафика и базовые цены подписок.",
"PERIODS": "Доступные периоды подписок и продлений.",
"SUBSCRIPTION_PRICES": "Стоимость подписок по периодам в копейках.",
@@ -255,6 +264,7 @@ class BotConfigurationService:
"MULENPAY_": "MULENPAY",
"PAL24_": "PAL24",
"PAYMENT_": "PAYMENT",
"EXTERNAL_ADMIN_": "EXTERNAL_ADMIN",
"CONNECT_BUTTON_HAPP": "HAPP",
"HAPP_": "HAPP",
"SKIP_": "SKIP",
@@ -394,6 +404,20 @@ class BotConfigurationService:
"warning": "Недоступный адрес приведет к ошибкам при управлении VPN-учетками.",
"dependencies": "REMNAWAVE_API_KEY или REMNAWAVE_USERNAME/REMNAWAVE_PASSWORD",
},
"EXTERNAL_ADMIN_TOKEN": {
"description": "Приватный токен, который использует внешняя админка для проверки запросов.",
"format": "Значение генерируется автоматически из username бота и его токена и доступно только для чтения.",
"example": "Генерируется автоматически",
"warning": "Токен обновится при смене username или токена бота.",
"dependencies": "Username телеграм-бота, токен бота",
},
"EXTERNAL_ADMIN_TOKEN_BOT_ID": {
"description": "Идентификатор телеграм-бота, с которым связан токен внешней админки.",
"format": "Проставляется автоматически после первого запуска и не редактируется вручную.",
"example": "123456789",
"warning": "Несовпадение ID блокирует обновление токена, предотвращая его подмену на другом боте.",
"dependencies": "Результат вызова getMe() в Telegram Bot API",
},
}
@classmethod
@@ -405,6 +429,10 @@ class BotConfigurationService:
definition = cls.get_definition(key)
return definition.python_type is bool
@classmethod
def is_read_only(cls, key: str) -> bool:
return key in cls.READ_ONLY_KEYS
@classmethod
def _format_numeric_with_unit(cls, key: str, value: Union[int, float]) -> Optional[str]:
if isinstance(value, bool):
@@ -455,6 +483,8 @@ class BotConfigurationService:
cleaned = value.strip()
if not cleaned:
return ""
if key in cls.PLAIN_TEXT_KEYS:
return cleaned
if any(keyword in key.upper() for keyword in ("TOKEN", "SECRET", "PASSWORD", "KEY")):
return "••••••••"
items = cls._split_comma_values(cleaned)
@@ -860,7 +890,17 @@ class BotConfigurationService:
return parsed_value
@classmethod
async def set_value(cls, db: AsyncSession, key: str, value: Any) -> None:
async def set_value(
cls,
db: AsyncSession,
key: str,
value: Any,
*,
force: bool = False,
) -> None:
if cls.is_read_only(key) and not force:
raise ReadOnlySettingError(f"Setting {key} is read-only")
raw_value = cls.serialize_value(key, value)
await upsert_system_setting(db, key, raw_value)
cls._overrides_raw[key] = raw_value
@@ -870,7 +910,16 @@ class BotConfigurationService:
await cls._sync_default_web_api_token()
@classmethod
async def reset_value(cls, db: AsyncSession, key: str) -> None:
async def reset_value(
cls,
db: AsyncSession,
key: str,
*,
force: bool = False,
) -> None:
if cls.is_read_only(key) and not force:
raise ReadOnlySettingError(f"Setting {key} is read-only")
await delete_system_setting(db, key)
cls._overrides_raw.pop(key, None)
original = cls.get_original_value(key)
@@ -925,6 +974,7 @@ class BotConfigurationService:
"category_key": definition.category_key,
"category_label": definition.category_label,
"has_override": has_override,
"is_read_only": cls.is_read_only(key),
}

View File

@@ -5,7 +5,10 @@ from typing import Any, Optional
from fastapi import APIRouter, Depends, HTTPException, Query, Security, status
from sqlalchemy.ext.asyncio import AsyncSession
from app.services.system_settings_service import bot_configuration_service
from app.services.system_settings_service import (
ReadOnlySettingError,
bot_configuration_service,
)
from ..dependencies import get_db_session, require_api_token
from ..schemas.config import (
@@ -94,6 +97,7 @@ def _serialize_definition(definition, include_choices: bool = True) -> SettingDe
current=current,
original=original,
has_override=has_override,
read_only=bot_configuration_service.is_read_only(definition.key),
choices=choices,
)
@@ -153,7 +157,10 @@ async def update_setting(
raise HTTPException(status.HTTP_404_NOT_FOUND, "Setting not found") from error
value = _coerce_value(key, payload.value)
await bot_configuration_service.set_value(db, key, value)
try:
await bot_configuration_service.set_value(db, key, value)
except ReadOnlySettingError as error:
raise HTTPException(status.HTTP_403_FORBIDDEN, str(error)) from error
await db.commit()
return _serialize_definition(definition)
@@ -170,6 +177,9 @@ async def reset_setting(
except KeyError as error:
raise HTTPException(status.HTTP_404_NOT_FOUND, "Setting not found") from error
await bot_configuration_service.reset_value(db, key)
try:
await bot_configuration_service.reset_value(db, key)
except ReadOnlySettingError as error:
raise HTTPException(status.HTTP_403_FORBIDDEN, str(error)) from error
await db.commit()
return _serialize_definition(definition)

View File

@@ -45,6 +45,7 @@ class SettingDefinition(BaseModel):
current: Any | None = Field(default=None)
original: Any | None = Field(default=None)
has_override: bool
read_only: bool = Field(default=False)
choices: list[SettingChoice] = Field(default_factory=list)
model_config = ConfigDict(extra="forbid")

20
main.py
View File

@@ -22,6 +22,7 @@ from app.services.backup_service import backup_service
from app.services.reporting_service import reporting_service
from app.localization.loader import ensure_locale_templates
from app.services.system_settings_service import bot_configuration_service
from app.services.external_admin_service import ensure_external_admin_token
from app.services.broadcast_service import broadcast_service
from app.utils.startup_timeline import StartupTimeline
@@ -186,6 +187,25 @@ async def main():
payment_service = PaymentService(bot)
async with timeline.stage(
"Внешняя админка",
"🛡️",
success_message="Токен внешней админки готов",
) as stage:
try:
bot_user = await bot.get_me()
token = await ensure_external_admin_token(
bot_user.username,
bot_user.id,
)
if token:
stage.log("Токен синхронизирован")
else:
stage.warning("Не удалось получить токен внешней админки")
except Exception as error: # pragma: no cover - защитный блок
stage.warning(f"Ошибка подготовки внешней админки: {error}")
logger.error("❌ Ошибка подготовки внешней админки: %s", error)
webhook_needed = (
settings.TRIBUTE_ENABLED
or settings.is_cryptobot_enabled()