import hashlib
import hmac
import logging
import math
import os
import re
import html
from collections import defaultdict
from datetime import time
from typing import Dict, List, Optional, Union
from urllib.parse import urlparse
from zoneinfo import ZoneInfo
from pydantic_settings import BaseSettings
from pydantic import field_validator, Field
from pathlib import Path
DEFAULT_DISPLAY_NAME_BANNED_KEYWORDS: list[str] = [
# Пустой по умолчанию - администратор может добавить ключевые слова через DISPLAY_NAME_BANNED_KEYWORDS
# Примеры: "tme", "joingroup", "support", "admin"
]
USER_TAG_PATTERN = re.compile(r"^[A-Z0-9_]{1,16}$")
logger = logging.getLogger(__name__)
class Settings(BaseSettings):
BOT_TOKEN: str
BOT_USERNAME: Optional[str] = None
ADMIN_IDS: str = ""
SUPPORT_USERNAME: str = "@support"
SUPPORT_MENU_ENABLED: bool = True
SUPPORT_SYSTEM_MODE: str = "both" # one of: tickets, contact, both
SUPPORT_MENU_ENABLED: bool = True
# SLA for support tickets
SUPPORT_TICKET_SLA_ENABLED: bool = True
SUPPORT_TICKET_SLA_MINUTES: int = 5
SUPPORT_TICKET_SLA_CHECK_INTERVAL_SECONDS: int = 60
SUPPORT_TICKET_SLA_REMINDER_COOLDOWN_MINUTES: int = 15
# MiniApp tickets settings
MINIAPP_TICKETS_ENABLED: bool = True # Enable/disable tickets section in miniapp
MINIAPP_SUPPORT_TYPE: str = "tickets" # one of: tickets, profile, url
MINIAPP_SUPPORT_URL: str = "" # Custom URL to redirect when tickets disabled (only for url type)
ADMIN_NOTIFICATIONS_ENABLED: bool = False
ADMIN_NOTIFICATIONS_CHAT_ID: Optional[str] = None
ADMIN_NOTIFICATIONS_TOPIC_ID: Optional[int] = None
ADMIN_NOTIFICATIONS_TICKET_TOPIC_ID: Optional[int] = None
ADMIN_NOTIFICATIONS_NALOG_TOPIC_ID: Optional[int] = None
# Настройки очереди чеков NaloGO
NALOGO_QUEUE_CHECK_INTERVAL: int = 300 # Интервал проверки очереди (секунды)
NALOGO_QUEUE_RECEIPT_DELAY: int = 3 # Задержка между отправкой чеков (секунды)
NALOGO_QUEUE_MAX_ATTEMPTS: int = 10 # Максимум попыток отправки чека
ADMIN_REPORTS_ENABLED: bool = False
ADMIN_REPORTS_CHAT_ID: Optional[str] = None
ADMIN_REPORTS_TOPIC_ID: Optional[int] = None
ADMIN_REPORTS_SEND_TIME: Optional[str] = None
CHANNEL_SUB_ID: Optional[str] = None
CHANNEL_LINK: Optional[str] = None
CHANNEL_IS_REQUIRED_SUB: bool = False
CHANNEL_DISABLE_TRIAL_ON_UNSUBSCRIBE: bool = True
CHANNEL_REQUIRED_FOR_ALL: bool = False
DATABASE_URL: Optional[str] = None
POSTGRES_HOST: str = "postgres"
POSTGRES_PORT: int = 5432
POSTGRES_DB: str = "remnawave_bot"
POSTGRES_USER: str = "remnawave_user"
POSTGRES_PASSWORD: str = "secure_password_123"
SQLITE_PATH: str = "./data/bot.db"
LOCALES_PATH: str = "./locales"
TIMEZONE: str = Field(default_factory=lambda: os.getenv("TZ", "UTC"))
DATABASE_MODE: str = "auto"
REDIS_URL: str = "redis://localhost:6379/0"
CART_TTL_SECONDS: int = 3600 # Время жизни корзины пользователя в Redis (1 час)
REMNAWAVE_API_URL: Optional[str] = None
REMNAWAVE_API_KEY: Optional[str] = None
REMNAWAVE_SECRET_KEY: Optional[str] = None
REMNAWAVE_USERNAME: Optional[str] = None
REMNAWAVE_PASSWORD: Optional[str] = None
REMNAWAVE_CADDY_TOKEN: Optional[str] = None
REMNAWAVE_AUTH_TYPE: str = "api_key" # api_key, basic, bearer, cookies, caddy
REMNAWAVE_USER_DESCRIPTION_TEMPLATE: str = "Bot user: {full_name} {username}"
REMNAWAVE_USER_USERNAME_TEMPLATE: str = "user_{telegram_id}"
REMNAWAVE_USER_DELETE_MODE: str = "delete" # "delete" или "disable"
REMNAWAVE_AUTO_SYNC_ENABLED: bool = False
REMNAWAVE_AUTO_SYNC_TIMES: str = "03:00"
CABINET_REMNA_SUB_CONFIG: Optional[str] = None # UUID конфига страницы подписки из RemnaWave
TRIAL_DURATION_DAYS: int = 3
TRIAL_TRAFFIC_LIMIT_GB: int = 10
TRIAL_DEVICE_LIMIT: int = 2
TRIAL_ADD_REMAINING_DAYS_TO_PAID: bool = False
TRIAL_PAYMENT_ENABLED: bool = False
TRIAL_ACTIVATION_PRICE: int = 0
TRIAL_USER_TAG: Optional[str] = None
DEFAULT_TRAFFIC_LIMIT_GB: int = 100
DEFAULT_DEVICE_LIMIT: int = 1
DEFAULT_TRAFFIC_RESET_STRATEGY: str = "MONTH"
RESET_TRAFFIC_ON_PAYMENT: bool = False
MAX_DEVICES_LIMIT: int = 20
TRIAL_WARNING_HOURS: int = 2
ENABLE_NOTIFICATIONS: bool = True
NOTIFICATION_RETRY_ATTEMPTS: int = 3
MONITORING_LOGS_RETENTION_DAYS: int = 30
NOTIFICATION_CACHE_HOURS: int = 24
SERVER_STATUS_MODE: str = "disabled"
SERVER_STATUS_EXTERNAL_URL: Optional[str] = None
SERVER_STATUS_METRICS_URL: Optional[str] = None
SERVER_STATUS_METRICS_USERNAME: Optional[str] = None
SERVER_STATUS_METRICS_PASSWORD: Optional[str] = None
SERVER_STATUS_METRICS_VERIFY_SSL: bool = True
SERVER_STATUS_REQUEST_TIMEOUT: int = 10
SERVER_STATUS_ITEMS_PER_PAGE: int = 10
BASE_SUBSCRIPTION_PRICE: int = 50000
AVAILABLE_SUBSCRIPTION_PERIODS: str = "14,30,60,90,180,360"
AVAILABLE_RENEWAL_PERIODS: str = "30,90,180"
PRICE_14_DAYS: int = 50000
PRICE_30_DAYS: int = 99000
PRICE_60_DAYS: int = 189000
PRICE_90_DAYS: int = 269000
PRICE_180_DAYS: int = 499000
PRICE_360_DAYS: int = 899000
PAID_SUBSCRIPTION_USER_TAG: Optional[str] = None
PRICE_TRAFFIC_5GB: int = 2000
PRICE_TRAFFIC_10GB: int = 3500
PRICE_TRAFFIC_25GB: int = 7000
PRICE_TRAFFIC_50GB: int = 11000
PRICE_TRAFFIC_100GB: int = 15000
PRICE_TRAFFIC_250GB: int = 17000
PRICE_TRAFFIC_500GB: int = 19000
PRICE_TRAFFIC_1000GB: int = 19500
PRICE_TRAFFIC_UNLIMITED: int = 20000
TRAFFIC_PACKAGES_CONFIG: str = ""
PRICE_PER_DEVICE: int = 5000
DEVICES_SELECTION_ENABLED: bool = True
DEVICES_SELECTION_DISABLED_AMOUNT: Optional[int] = None
# Настройки модема
MODEM_ENABLED: bool = False
MODEM_PRICE_PER_MONTH: int = 10000 # Цена модема в копейках за месяц
MODEM_PERIOD_DISCOUNTS: str = "" # Скидки на модем: "месяцев:процент,месяцев:процент" (напр. "3:10,6:15,12:20")
BASE_PROMO_GROUP_PERIOD_DISCOUNTS_ENABLED: bool = False
BASE_PROMO_GROUP_PERIOD_DISCOUNTS: str = ""
# Режим выбора трафика:
# - selectable: пользователь выбирает трафик при покупке и может докупать
# - fixed: фиксированный лимит, без выбора и без докупки
# - fixed_with_topup: фиксированный лимит при покупке, но докупка разрешена (при продлении сброс до лимита)
TRAFFIC_SELECTION_MODE: str = "selectable"
FIXED_TRAFFIC_LIMIT_GB: int = 100
BUY_TRAFFIC_BUTTON_VISIBLE: bool = True
# Режим продаж подписок:
# - classic: классический режим (выбор серверов, трафика, устройств, периода отдельно)
# - tariffs: режим тарифов (готовые пакеты с фиксированными параметрами)
SALES_MODE: str = "classic"
# ID тарифа для триала в режиме тарифов (0 = использовать стандартные настройки триала)
# Если указан ID тарифа, параметры триала берутся из тарифа (traffic_limit_gb, device_limit, allowed_squads)
# Длительность триала всё равно берётся из TRIAL_DURATION_DAYS
TRIAL_TARIFF_ID: int = 0
# Настройки докупки трафика
TRAFFIC_TOPUP_ENABLED: bool = True # Включить/выключить функцию докупки трафика
# Пакеты для докупки трафика (формат: "гб:цена:enabled", пустая строка = использовать TRAFFIC_PACKAGES_CONFIG)
TRAFFIC_TOPUP_PACKAGES_CONFIG: str = ""
# Настройки сброса трафика
# Режимы расчета цены сброса:
# "period" - фиксированная цена = стоимость периода 30 дней (старое поведение)
# "traffic" - цена зависит от текущего лимита трафика (цена пакета трафика)
# "traffic_with_purchased" - цена = базовый трафик + докупленный трафик (рекомендуется)
TRAFFIC_RESET_PRICE_MODE: str = "traffic_with_purchased"
# Базовая цена сброса в копейках (используется если режим "period" или как минимальная цена)
TRAFFIC_RESET_BASE_PRICE: int = 0 # 0 = использовать PERIOD_PRICES[30]
REFERRAL_MINIMUM_TOPUP_KOPEKS: int = 10000
REFERRAL_FIRST_TOPUP_BONUS_KOPEKS: int = 10000
REFERRAL_INVITER_BONUS_KOPEKS: int = 10000
REFERRAL_COMMISSION_PERCENT: int = 25
REFERRAL_PROGRAM_ENABLED: bool = True
REFERRAL_NOTIFICATIONS_ENABLED: bool = True
REFERRAL_NOTIFICATION_RETRY_ATTEMPTS: int = 3
# Настройки вывода реферального баланса
REFERRAL_WITHDRAWAL_ENABLED: bool = False # Включить возможность вывода
REFERRAL_WITHDRAWAL_MIN_AMOUNT_KOPEKS: int = 100000 # Мин. сумма вывода (1000₽)
REFERRAL_WITHDRAWAL_COOLDOWN_DAYS: int = 30 # Частота запросов на вывод
REFERRAL_WITHDRAWAL_ONLY_REFERRAL_BALANCE: bool = True # Только реф. баланс (False = реф + свой)
REFERRAL_WITHDRAWAL_NOTIFICATIONS_TOPIC_ID: Optional[int] = None # Топик для уведомлений
# Настройки анализа на подозрительность
REFERRAL_WITHDRAWAL_SUSPICIOUS_MIN_DEPOSIT_KOPEKS: int = 50000 # Мин. сумма от 1 реферала (500₽)
REFERRAL_WITHDRAWAL_SUSPICIOUS_MAX_DEPOSITS_PER_MONTH: int = 10 # Макс. пополнений от 1 реферала/мес
REFERRAL_WITHDRAWAL_SUSPICIOUS_NO_PURCHASES_RATIO: float = 2.0 # Пополнил в X раз больше чем потратил
# Тестовый режим для вывода (позволяет админам вручную начислять реф. доход)
REFERRAL_WITHDRAWAL_TEST_MODE: bool = False
# Конкурсы (глобальный флаг, будет расширяться под разные типы)
CONTESTS_ENABLED: bool = False
CONTESTS_BUTTON_VISIBLE: bool = False
# Для обратной совместимости со старыми конфигами
REFERRAL_CONTESTS_ENABLED: bool = False
BLACKLIST_CHECK_ENABLED: bool = False
BLACKLIST_GITHUB_URL: Optional[str] = None
BLACKLIST_UPDATE_INTERVAL_HOURS: int = 24
BLACKLIST_IGNORE_ADMINS: bool = True
# Настройки простой покупки
SIMPLE_SUBSCRIPTION_ENABLED: bool = False
SIMPLE_SUBSCRIPTION_PERIOD_DAYS: int = 30
SIMPLE_SUBSCRIPTION_DEVICE_LIMIT: int = 1
SIMPLE_SUBSCRIPTION_TRAFFIC_GB: int = 0 # 0 означает безлимит
SIMPLE_SUBSCRIPTION_SQUAD_UUID: Optional[str] = None
# Настройки конструктора меню (API)
MENU_LAYOUT_ENABLED: bool = False # Включить управление меню через API
# Настройки мониторинга трафика
TRAFFIC_MONITORING_ENABLED: bool = False # Глобальный переключатель (для обратной совместимости)
TRAFFIC_THRESHOLD_GB_PER_DAY: float = 10.0 # Порог трафика в ГБ за сутки (для обратной совместимости)
TRAFFIC_MONITORING_INTERVAL_HOURS: int = 24 # Интервал проверки в часах (для обратной совместимости)
SUSPICIOUS_NOTIFICATIONS_TOPIC_ID: Optional[int] = None
# Новый мониторинг трафика v2
# Быстрая проверка (текущий использованный трафик)
TRAFFIC_FAST_CHECK_ENABLED: bool = False
TRAFFIC_FAST_CHECK_INTERVAL_MINUTES: int = 10 # Интервал проверки в минутах
TRAFFIC_FAST_CHECK_THRESHOLD_GB: float = 5.0 # Порог в ГБ для быстрой проверки
# Суточная проверка (трафик за 24 часа)
TRAFFIC_DAILY_CHECK_ENABLED: bool = False
TRAFFIC_DAILY_CHECK_TIME: str = "00:00" # Время суточной проверки (HH:MM)
TRAFFIC_DAILY_THRESHOLD_GB: float = 50.0 # Порог суточного трафика в ГБ
# Фильтрация по серверам (UUID нод через запятую)
TRAFFIC_MONITORED_NODES: str = "" # Только эти ноды (пусто = все)
TRAFFIC_IGNORED_NODES: str = "" # Исключить эти ноды
TRAFFIC_EXCLUDED_USER_UUIDS: str = "" # Исключить пользователей (UUID через запятую)
# Параллельность и кулдаун
TRAFFIC_CHECK_BATCH_SIZE: int = 1000 # Размер батча для получения пользователей
TRAFFIC_CHECK_CONCURRENCY: int = 10 # Параллельных запросов
TRAFFIC_NOTIFICATION_COOLDOWN_MINUTES: int = 60 # Кулдаун уведомлений (минуты)
TRAFFIC_SNAPSHOT_TTL_HOURS: int = 24 # TTL для snapshot трафика в Redis (часы)
# Настройки суточных подписок
DAILY_SUBSCRIPTIONS_ENABLED: bool = True # Включить автоматическое списание для суточных тарифов
DAILY_SUBSCRIPTIONS_CHECK_INTERVAL_MINUTES: int = 30 # Интервал проверки в минутах
AUTOPAY_WARNING_DAYS: str = "3,1"
ENABLE_AUTOPAY: bool = False
DEFAULT_AUTOPAY_ENABLED: bool = False
DEFAULT_AUTOPAY_DAYS_BEFORE: int = 3
MIN_BALANCE_FOR_AUTOPAY_KOPEKS: int = 10000
SUBSCRIPTION_RENEWAL_BALANCE_THRESHOLD_KOPEKS: int = 20000
MONITORING_INTERVAL: int = 60
INACTIVE_USER_DELETE_MONTHS: int = 3
MAINTENANCE_MODE: bool = False
MAINTENANCE_CHECK_INTERVAL: int = 30
MAINTENANCE_AUTO_ENABLE: bool = True
MAINTENANCE_MONITORING_ENABLED: bool = True
MAINTENANCE_RETRY_ATTEMPTS: int = 1
MAINTENANCE_MESSAGE: str = "🔧 Ведутся технические работы. Сервис временно недоступен. Попробуйте позже."
TELEGRAM_STARS_ENABLED: bool = True
TELEGRAM_STARS_RATE_RUB: float = 1.3
TRIBUTE_ENABLED: bool = False
TRIBUTE_API_KEY: Optional[str] = None
TRIBUTE_DONATE_LINK: Optional[str] = None
TRIBUTE_WEBHOOK_PATH: str = "/tribute-webhook"
TRIBUTE_WEBHOOK_HOST: str = "0.0.0.0"
TRIBUTE_WEBHOOK_PORT: int = 8081
YOOKASSA_ENABLED: bool = False
YOOKASSA_SHOP_ID: Optional[str] = None
YOOKASSA_SECRET_KEY: Optional[str] = None
YOOKASSA_RETURN_URL: Optional[str] = None
YOOKASSA_DEFAULT_RECEIPT_EMAIL: Optional[str] = None
YOOKASSA_VAT_CODE: int = 1
YOOKASSA_SBP_ENABLED: bool = False
YOOKASSA_PAYMENT_MODE: str = "full_payment"
YOOKASSA_PAYMENT_SUBJECT: str = "service"
YOOKASSA_WEBHOOK_PATH: str = "/yookassa-webhook"
YOOKASSA_WEBHOOK_HOST: str = "0.0.0.0"
YOOKASSA_WEBHOOK_PORT: int = 8082
YOOKASSA_TRUSTED_PROXY_NETWORKS: str = ""
YOOKASSA_MIN_AMOUNT_KOPEKS: int = 5000
YOOKASSA_MAX_AMOUNT_KOPEKS: int = 1000000
YOOKASSA_QUICK_AMOUNT_SELECTION_ENABLED: bool = False
DISABLE_TOPUP_BUTTONS: bool = False
SUPPORT_TOPUP_ENABLED: bool = True
PAYMENT_VERIFICATION_AUTO_CHECK_ENABLED: bool = False
PAYMENT_VERIFICATION_AUTO_CHECK_INTERVAL_MINUTES: int = 10
NALOGO_ENABLED: bool = False
NALOGO_INN: Optional[str] = None
NALOGO_PASSWORD: Optional[str] = None
NALOGO_DEVICE_ID: Optional[str] = None
NALOGO_STORAGE_PATH: str = "./nalogo_tokens.json"
AUTO_PURCHASE_AFTER_TOPUP_ENABLED: bool = False
AUTO_ACTIVATE_AFTER_TOPUP_ENABLED: bool = False
# Показывать предупреждение об активации подписки после пополнения баланса
# Если True - после пополнения показывает большое сообщение с кнопками:
# "Активировать", "Продлить", "Добавить устройства"
SHOW_ACTIVATION_PROMPT_AFTER_TOPUP: bool = False
# Отключение превью ссылок в сообщениях бота
DISABLE_WEB_PAGE_PREVIEW: bool = False
ACTIVATE_BUTTON_VISIBLE: bool = False
ACTIVATE_BUTTON_TEXT: str = "активировать"
PAYMENT_BALANCE_DESCRIPTION: str = "Пополнение баланса"
PAYMENT_SUBSCRIPTION_DESCRIPTION: str = "Оплата подписки"
PAYMENT_SERVICE_NAME: str = "Интернет-сервис"
PAYMENT_BALANCE_TEMPLATE: str = "{service_name} - {description}"
PAYMENT_SUBSCRIPTION_TEMPLATE: str = "{service_name} - {description}"
CRYPTOBOT_ENABLED: bool = False
CRYPTOBOT_API_TOKEN: Optional[str] = None
CRYPTOBOT_WEBHOOK_SECRET: Optional[str] = None
CRYPTOBOT_BASE_URL: str = "https://pay.crypt.bot"
CRYPTOBOT_TESTNET: bool = False
CRYPTOBOT_WEBHOOK_PATH: str = "/cryptobot-webhook"
CRYPTOBOT_WEBHOOK_PORT: int = 8083
CRYPTOBOT_DEFAULT_ASSET: str = "USDT"
CRYPTOBOT_ASSETS: str = "USDT,TON,BTC,ETH"
CRYPTOBOT_INVOICE_EXPIRES_HOURS: int = 24
HELEKET_ENABLED: bool = False
HELEKET_MERCHANT_ID: Optional[str] = None
HELEKET_API_KEY: Optional[str] = None
HELEKET_BASE_URL: str = "https://api.heleket.com/v1"
HELEKET_DEFAULT_CURRENCY: str = "USDT"
HELEKET_DEFAULT_NETWORK: Optional[str] = None
HELEKET_INVOICE_LIFETIME: int = 3600
HELEKET_MARKUP_PERCENT: float = 0.0
HELEKET_WEBHOOK_PATH: str = "/heleket-webhook"
HELEKET_WEBHOOK_HOST: str = "0.0.0.0"
HELEKET_WEBHOOK_PORT: int = 8086
HELEKET_CALLBACK_URL: Optional[str] = None
HELEKET_RETURN_URL: Optional[str] = None
HELEKET_SUCCESS_URL: Optional[str] = None
MULENPAY_ENABLED: bool = False
MULENPAY_API_KEY: Optional[str] = None
MULENPAY_SECRET_KEY: Optional[str] = None
MULENPAY_SHOP_ID: Optional[int] = None
MULENPAY_BASE_URL: str = "https://mulenpay.ru/api"
MULENPAY_WEBHOOK_PATH: str = "/mulenpay-webhook"
MULENPAY_DISPLAY_NAME: str = "Mulen Pay"
MULENPAY_DESCRIPTION: str = "Пополнение баланса"
MULENPAY_LANGUAGE: str = "ru"
MULENPAY_VAT_CODE: int = 0
DISPLAY_NAME_BANNED_KEYWORDS: str = "\n".join(
DEFAULT_DISPLAY_NAME_BANNED_KEYWORDS
)
MULENPAY_PAYMENT_SUBJECT: int = 4
MULENPAY_PAYMENT_MODE: int = 4
MULENPAY_MIN_AMOUNT_KOPEKS: int = 10000
MULENPAY_MAX_AMOUNT_KOPEKS: int = 10000000
MULENPAY_IFRAME_EXPECTED_ORIGIN: Optional[str] = None
PAL24_ENABLED: bool = False
PAL24_API_TOKEN: Optional[str] = None
PAL24_SHOP_ID: Optional[str] = None
PAL24_SIGNATURE_TOKEN: Optional[str] = None
PAL24_BASE_URL: str = "https://pal24.pro/api/v1/"
PAL24_WEBHOOK_PATH: str = "/pal24-webhook"
PAL24_WEBHOOK_PORT: int = 8084
PAL24_PAYMENT_DESCRIPTION: str = "Пополнение баланса"
PAL24_MIN_AMOUNT_KOPEKS: int = 10000
PAL24_MAX_AMOUNT_KOPEKS: int = 100000000
PAL24_REQUEST_TIMEOUT: int = 30
PAL24_SBP_BUTTON_TEXT: Optional[str] = None
PAL24_CARD_BUTTON_TEXT: Optional[str] = None
PAL24_SBP_BUTTON_VISIBLE: bool = True
PAL24_CARD_BUTTON_VISIBLE: bool = True
PLATEGA_ENABLED: bool = False
PLATEGA_MERCHANT_ID: Optional[str] = None
PLATEGA_SECRET: Optional[str] = None
PLATEGA_DISPLAY_NAME: str = "Platega"
PLATEGA_BASE_URL: str = "https://app.platega.io"
PLATEGA_RETURN_URL: Optional[str] = None
PLATEGA_FAILED_URL: Optional[str] = None
PLATEGA_CURRENCY: str = "RUB"
PLATEGA_ACTIVE_METHODS: str = "2,10,11,12,13"
PLATEGA_MIN_AMOUNT_KOPEKS: int = 10000
PLATEGA_MAX_AMOUNT_KOPEKS: int = 100000000
PLATEGA_WEBHOOK_PATH: str = "/platega-webhook"
PLATEGA_WEBHOOK_HOST: str = "0.0.0.0"
PLATEGA_WEBHOOK_PORT: int = 8086
WATA_ENABLED: bool = False
WATA_BASE_URL: str = "https://api.wata.pro/api/h2h"
WATA_ACCESS_TOKEN: Optional[str] = None
WATA_TERMINAL_PUBLIC_ID: Optional[str] = None
WATA_PAYMENT_DESCRIPTION: str = "Пополнение баланса"
WATA_PAYMENT_TYPE: str = "OneTime"
WATA_SUCCESS_REDIRECT_URL: Optional[str] = None
WATA_FAIL_REDIRECT_URL: Optional[str] = None
WATA_LINK_TTL_MINUTES: Optional[int] = None
WATA_MIN_AMOUNT_KOPEKS: int = 10000
WATA_MAX_AMOUNT_KOPEKS: int = 100000000
WATA_REQUEST_TIMEOUT: int = 30
WATA_WEBHOOK_PATH: str = "/wata-webhook"
WATA_WEBHOOK_HOST: str = "0.0.0.0"
WATA_WEBHOOK_PORT: int = 8085
WATA_PUBLIC_KEY_URL: Optional[str] = None
WATA_PUBLIC_KEY_CACHE_SECONDS: int = 3600
# CloudPayments
CLOUDPAYMENTS_ENABLED: bool = False
CLOUDPAYMENTS_PUBLIC_ID: Optional[str] = None
CLOUDPAYMENTS_API_SECRET: Optional[str] = None
CLOUDPAYMENTS_API_URL: str = "https://api.cloudpayments.ru"
CLOUDPAYMENTS_WIDGET_URL: str = "https://widget.cloudpayments.ru/show"
CLOUDPAYMENTS_DESCRIPTION: str = "Пополнение баланса"
CLOUDPAYMENTS_CURRENCY: str = "RUB"
CLOUDPAYMENTS_MIN_AMOUNT_KOPEKS: int = 5000
CLOUDPAYMENTS_MAX_AMOUNT_KOPEKS: int = 10000000
CLOUDPAYMENTS_WEBHOOK_PATH: str = "/cloudpayments-webhook"
CLOUDPAYMENTS_WEBHOOK_HOST: str = "0.0.0.0"
CLOUDPAYMENTS_WEBHOOK_PORT: int = 8087
CLOUDPAYMENTS_RETURN_URL: Optional[str] = None
CLOUDPAYMENTS_SKIN: str = "mini" # mini, classic, modern
CLOUDPAYMENTS_REQUIRE_EMAIL: bool = False
CLOUDPAYMENTS_TEST_MODE: bool = False
# Freekassa
FREEKASSA_ENABLED: bool = False
FREEKASSA_SHOP_ID: Optional[int] = None
FREEKASSA_API_KEY: Optional[str] = None
FREEKASSA_SECRET_WORD_1: Optional[str] = None # Для формы оплаты
FREEKASSA_SECRET_WORD_2: Optional[str] = None # Для webhook
FREEKASSA_DISPLAY_NAME: str = "Freekassa"
FREEKASSA_CURRENCY: str = "RUB"
FREEKASSA_MIN_AMOUNT_KOPEKS: int = 10000 # 100 руб
FREEKASSA_MAX_AMOUNT_KOPEKS: int = 100000000 # 1 000 000 руб
FREEKASSA_PAYMENT_TIMEOUT_SECONDS: int = 3600
FREEKASSA_WEBHOOK_PATH: str = "/freekassa-webhook"
FREEKASSA_WEBHOOK_HOST: str = "0.0.0.0"
FREEKASSA_WEBHOOK_PORT: int = 8088
# Способ оплаты: None = форма выбора, 42 = обычный СБП, 44 = NSPK СБП
FREEKASSA_PAYMENT_SYSTEM_ID: Optional[int] = None
# Использовать API для создания заказов (нужно для NSPK СБП)
FREEKASSA_USE_API: bool = False
# Публичный IP сервера для Freekassa API (если не задан - определяется автоматически)
SERVER_PUBLIC_IP: Optional[str] = None
MAIN_MENU_MODE: str = "default"
CONNECT_BUTTON_MODE: str = "guide"
MINIAPP_CUSTOM_URL: str = ""
MINIAPP_STATIC_PATH: str = "miniapp"
MINIAPP_PURCHASE_URL: str = ""
MINIAPP_SERVICE_NAME_EN: str = "Bedolaga VPN"
MINIAPP_SERVICE_NAME_RU: str = "Bedolaga VPN"
MINIAPP_SERVICE_DESCRIPTION_EN: str = "Secure & Fast Connection"
MINIAPP_SERVICE_DESCRIPTION_RU: str = "Безопасное и быстрое подключение"
CONNECT_BUTTON_HAPP_DOWNLOAD_ENABLED: bool = False
HAPP_CRYPTOLINK_REDIRECT_TEMPLATE: Optional[str] = None
HAPP_DOWNLOAD_LINK_IOS: Optional[str] = None
HAPP_DOWNLOAD_LINK_ANDROID: Optional[str] = None
HAPP_DOWNLOAD_LINK_MACOS: Optional[str] = None
HAPP_DOWNLOAD_LINK_WINDOWS: Optional[str] = None
HAPP_DOWNLOAD_LINK_PC: Optional[str] = None
HIDE_SUBSCRIPTION_LINK: bool = False
ENABLE_LOGO_MODE: bool = True
LOGO_FILE: str = "vpn_logo.png"
SKIP_RULES_ACCEPT: bool = False
SKIP_REFERRAL_CODE: bool = False
DEFAULT_LANGUAGE: str = "ru"
AVAILABLE_LANGUAGES: str = "ru,en"
LANGUAGE_SELECTION_ENABLED: bool = True
# Округление цен при отображении (≤50 коп вниз, >50 коп вверх)
PRICE_ROUNDING_ENABLED: bool = True
LOG_LEVEL: str = "INFO"
LOG_FILE: str = "logs/bot.log"
# === Log Rotation Settings ===
LOG_ROTATION_ENABLED: bool = False # По умолчанию старое поведение
LOG_ROTATION_TIME: str = "00:00" # Время ротации (HH:MM)
LOG_ROTATION_KEEP_DAYS: int = 7 # Хранить архивы N дней
LOG_ROTATION_COMPRESS: bool = True # Сжимать архивы gzip
LOG_ROTATION_SEND_TO_TELEGRAM: bool = False # Отправлять в канал
LOG_ROTATION_CHAT_ID: Optional[str] = None # Канал для логов (или BACKUP_SEND_CHAT_ID)
LOG_ROTATION_TOPIC_ID: Optional[int] = None # Топик в канале
# Пути к лог-файлам (при LOG_ROTATION_ENABLED=true)
LOG_DIR: str = "logs"
LOG_INFO_FILE: str = "info.log"
LOG_WARNING_FILE: str = "warning.log"
LOG_ERROR_FILE: str = "error.log"
LOG_PAYMENTS_FILE: str = "payments.log"
# === Ban Notification Messages ===
# Сообщение о блокировке за превышение лимита устройств
# Переменные: {ip_count}, {limit}, {ban_minutes}, {node_info}
BAN_MSG_PUNISHMENT: str = (
"🚫 АККАУНТ ЗАБЛОКИРОВАН\n"
"━━━━━━━━━━━━━━━━━━━━━\n\n"
"❌ Причина: Превышен лимит устройств\n"
"{node_info}\n"
"📊 Детали нарушения:\n"
"├ 📱 Устройств подключено: {ip_count}\n"
"├ 📋 Разрешено по тарифу: {limit}\n"
"└ ⏱ Время блокировки: {ban_minutes} мин\n\n"
"━━━━━━━━━━━━━━━━━━━━━\n"
"💡 Что делать:\n"
"1. Отключите лишние устройства от VPN\n"
"2. Дождитесь окончания блокировки\n"
"3. Подключитесь заново\n\n"
"🔄 Доступ восстановится автоматически"
)
# Сообщение о разблокировке
BAN_MSG_ENABLED: str = (
"✅ АККАУНТ РАЗБЛОКИРОВАН\n"
"━━━━━━━━━━━━━━━━━━━━━\n\n"
"🎉 Ваш аккаунт успешно разблокирован!\n\n"
"Теперь вы можете снова пользоваться VPN.\n\n"
"━━━━━━━━━━━━━━━━━━━━━\n"
"⚠️ Рекомендации:\n"
"• Следите за количеством устройств\n"
"• Отключайте VPN когда не используете\n"
"• Не превышайте лимит по тарифу"
)
# Сообщение о блокировке за WiFi
# Переменные: {ban_minutes}, {network_info}, {node_info}
BAN_MSG_WIFI: str = (
"🚫 АККАУНТ ЗАБЛОКИРОВАН\n"
"━━━━━━━━━━━━━━━━━━━━━\n\n"
"❌ Причина: Использование WiFi сети\n"
"{node_info}\n"
"📊 Детали:\n"
"├ 📶 Тип подключения: WiFi\n"
"{network_info}"
"└ ⏱ Время блокировки: {ban_minutes} мин\n\n"
"━━━━━━━━━━━━━━━━━━━━━\n"
"💡 Что делать:\n"
"1. Отключитесь от WiFi\n"
"2. Используйте мобильный интернет\n"
"3. Дождитесь окончания блокировки\n\n"
"🔄 Доступ восстановится автоматически"
)
# Сообщение о блокировке за мобильную сеть
# Переменные: {ban_minutes}, {network_info}, {node_info}
BAN_MSG_MOBILE: str = (
"🚫 АККАУНТ ЗАБЛОКИРОВАН\n"
"━━━━━━━━━━━━━━━━━━━━━\n\n"
"❌ Причина: Использование мобильной сети\n"
"{node_info}\n"
"📊 Детали:\n"
"├ 📱 Тип подключения: Мобильная сеть\n"
"{network_info}"
"└ ⏱ Время блокировки: {ban_minutes} мин\n\n"
"━━━━━━━━━━━━━━━━━━━━━\n"
"💡 Что делать:\n"
"1. Подключитесь к WiFi\n"
"2. Дождитесь окончания блокировки\n"
"3. Используйте VPN только через WiFi\n\n"
"🔄 Доступ восстановится автоматически"
)
# Сообщение-предупреждение
# Переменные: {warning_message}
BAN_MSG_WARNING: str = (
"⚠️ ПРЕДУПРЕЖДЕНИЕ\n"
"━━━━━━━━━━━━━━━━━━━━━\n\n"
"{warning_message}\n\n"
"━━━━━━━━━━━━━━━━━━━━━\n"
"❗ При повторном нарушении аккаунт будет заблокирован"
)
DEBUG: bool = False
WEBHOOK_URL: Optional[str] = None
WEBHOOK_PATH: str = "/webhook"
WEBHOOK_SECRET_TOKEN: Optional[str] = None
WEBHOOK_DROP_PENDING_UPDATES: bool = True
WEBHOOK_MAX_QUEUE_SIZE: int = 1024
WEBHOOK_WORKERS: int = 4
WEBHOOK_ENQUEUE_TIMEOUT: float = 0.1
WEBHOOK_WORKER_SHUTDOWN_TIMEOUT: float = 30.0
BOT_RUN_MODE: str = "polling"
WEB_API_ENABLED: bool = False
WEB_API_HOST: str = "0.0.0.0"
WEB_API_PORT: int = 8080
WEB_API_WORKERS: int = 1
WEB_API_ALLOWED_ORIGINS: str = "*"
WEB_API_DOCS_ENABLED: bool = False
WEB_API_TITLE: str = "Remnawave Bot Admin API"
WEB_API_VERSION: str = "1.0.0"
WEB_API_DEFAULT_TOKEN: Optional[str] = None
WEB_API_DEFAULT_TOKEN_NAME: str = "Bootstrap Token"
WEB_API_TOKEN_HASH_ALGORITHM: str = "sha256"
WEB_API_REQUEST_LOGGING: bool = True
APP_CONFIG_PATH: str = "app-config.json"
ENABLE_DEEP_LINKS: bool = True
APP_CONFIG_CACHE_TTL: int = 3600
VERSION_CHECK_ENABLED: bool = True
VERSION_CHECK_REPO: str = "fr1ngg/remnawave-bedolaga-telegram-bot"
VERSION_CHECK_INTERVAL_HOURS: int = 1
BACKUP_AUTO_ENABLED: bool = True
BACKUP_INTERVAL_HOURS: int = 24
BACKUP_TIME: str = "03:00"
BACKUP_MAX_KEEP: int = 7
BACKUP_COMPRESSION: bool = True
BACKUP_INCLUDE_LOGS: bool = False
BACKUP_LOCATION: str = "/app/data/backups"
BACKUP_SEND_ENABLED: bool = False
BACKUP_SEND_CHAT_ID: Optional[str] = None
BACKUP_SEND_TOPIC_ID: Optional[int] = None
BACKUP_ARCHIVE_PASSWORD: Optional[str] = None
EXTERNAL_ADMIN_TOKEN: Optional[str] = None
EXTERNAL_ADMIN_TOKEN_BOT_ID: Optional[int] = None
# Cabinet (Personal Account) settings
CABINET_ENABLED: bool = False
CABINET_JWT_SECRET: Optional[str] = None
CABINET_ACCESS_TOKEN_EXPIRE_MINUTES: int = 15
CABINET_REFRESH_TOKEN_EXPIRE_DAYS: int = 7
CABINET_ALLOWED_ORIGINS: str = ""
CABINET_EMAIL_VERIFICATION_ENABLED: bool = True
CABINET_EMAIL_VERIFICATION_EXPIRE_HOURS: int = 24
CABINET_PASSWORD_RESET_EXPIRE_HOURS: int = 1
# SMTP settings for cabinet email
SMTP_HOST: Optional[str] = None
SMTP_PORT: int = 587
SMTP_USER: Optional[str] = None
SMTP_PASSWORD: Optional[str] = None
SMTP_FROM_EMAIL: Optional[str] = None
SMTP_FROM_NAME: str = "VPN Service"
SMTP_USE_TLS: bool = True
# Ban System Integration (BedolagaBan monitoring)
BAN_SYSTEM_ENABLED: bool = False
BAN_SYSTEM_API_URL: Optional[str] = None # e.g., http://ban-server:8000
BAN_SYSTEM_API_TOKEN: Optional[str] = None
BAN_SYSTEM_REQUEST_TIMEOUT: int = 30
@field_validator('MAIN_MENU_MODE', mode='before')
@classmethod
def normalize_main_menu_mode(cls, value: Optional[str]) -> str:
if not value:
return "default"
normalized = str(value).strip().lower()
aliases = {
"classic": "default",
"default": "default",
"full": "default",
"standard": "default",
"text": "text",
"text_only": "text",
"textual": "text",
"minimal": "text",
}
mode = aliases.get(normalized, normalized)
if mode not in {"default", "text"}:
raise ValueError("MAIN_MENU_MODE must be one of: default, text")
return mode
@field_validator('SERVER_STATUS_MODE', mode='before')
@classmethod
def normalize_server_status_mode(cls, value: Optional[str]) -> str:
if not value:
return "disabled"
normalized = str(value).strip().lower()
aliases = {
"off": "disabled",
"none": "disabled",
"disabled": "disabled",
"external": "external_link",
"link": "external_link",
"url": "external_link",
"external_link": "external_link",
"miniapp": "external_link_miniapp",
"mini_app": "external_link_miniapp",
"mini-app": "external_link_miniapp",
"webapp": "external_link_miniapp",
"web_app": "external_link_miniapp",
"web-app": "external_link_miniapp",
"external_link_miniapp": "external_link_miniapp",
"xray": "xray",
"xraychecker": "xray",
"xray_metrics": "xray",
"metrics": "xray",
}
mode = aliases.get(normalized, normalized)
if mode not in {"disabled", "external_link", "external_link_miniapp", "xray"}:
raise ValueError(
"SERVER_STATUS_MODE must be one of: disabled, external_link, external_link_miniapp, xray"
)
return mode
@field_validator('SERVER_STATUS_ITEMS_PER_PAGE', mode='before')
@classmethod
def ensure_positive_server_status_page_size(cls, value: Optional[int]) -> int:
try:
if value is None:
return 10
value_int = int(value)
return max(1, value_int)
except (TypeError, ValueError):
return 10
@field_validator('SERVER_STATUS_REQUEST_TIMEOUT', mode='before')
@classmethod
def ensure_positive_server_status_timeout(cls, value: Optional[int]) -> int:
try:
if value is None:
return 10
value_int = int(value)
return max(1, value_int)
except (TypeError, ValueError):
return 10
@field_validator('LOG_FILE', mode='before')
@classmethod
def ensure_log_dir(cls, v):
log_path = Path(v)
log_path.parent.mkdir(parents=True, exist_ok=True)
return str(log_path)
def get_database_url(self) -> str:
if self.DATABASE_URL and self.DATABASE_URL.strip():
return self.DATABASE_URL
mode = self.DATABASE_MODE.lower()
if mode == "sqlite":
return self._get_sqlite_url()
elif mode == "postgresql":
return self._get_postgresql_url()
elif mode == "auto":
if os.getenv("DOCKER_ENV") == "true" or os.path.exists("/.dockerenv"):
return self._get_postgresql_url()
else:
return self._get_sqlite_url()
else:
return self._get_auto_database_url()
def _get_sqlite_url(self) -> str:
sqlite_path = Path(self.SQLITE_PATH)
sqlite_path.parent.mkdir(parents=True, exist_ok=True)
return f"sqlite+aiosqlite:///{sqlite_path.absolute()}"
def _get_postgresql_url(self) -> str:
return (f"postgresql+asyncpg://{self.POSTGRES_USER}:{self.POSTGRES_PASSWORD}"
f"@{self.POSTGRES_HOST}:{self.POSTGRES_PORT}/{self.POSTGRES_DB}")
def _get_auto_database_url(self) -> str:
if (os.getenv("DOCKER_ENV") == "true" or
os.path.exists("/.dockerenv")):
return self._get_postgresql_url()
else:
return self._get_sqlite_url()
def is_postgresql(self) -> bool:
"""Проверяет, используется ли PostgreSQL"""
return "postgresql" in self.get_database_url()
def is_sqlite(self) -> bool:
"""Проверяет, используется ли SQLite"""
return "sqlite" in self.get_database_url()
def is_admin(self, user_id: int) -> bool:
return user_id in self.get_admin_ids()
def get_admin_ids(self) -> List[int]:
try:
admin_ids = self.ADMIN_IDS
if isinstance(admin_ids, str):
if not admin_ids.strip():
return []
return [int(x.strip()) for x in admin_ids.split(',') if x.strip()]
return []
except (ValueError, AttributeError):
return []
def get_remnawave_auth_params(self) -> Dict[str, Optional[str]]:
return {
"base_url": self.REMNAWAVE_API_URL,
"api_key": self.REMNAWAVE_API_KEY,
"secret_key": self.REMNAWAVE_SECRET_KEY,
"username": self.REMNAWAVE_USERNAME,
"password": self.REMNAWAVE_PASSWORD,
"caddy_token": self.REMNAWAVE_CADDY_TOKEN,
"auth_type": self.REMNAWAVE_AUTH_TYPE
}
def get_pal24_sbp_button_text(self, fallback: str) -> str:
value = (self.PAL24_SBP_BUTTON_TEXT or "").strip()
return value or fallback
def get_pal24_card_button_text(self, fallback: str) -> str:
value = (self.PAL24_CARD_BUTTON_TEXT or "").strip()
return value or fallback
def is_pal24_sbp_button_visible(self) -> bool:
return self.PAL24_SBP_BUTTON_VISIBLE
def is_pal24_card_button_visible(self) -> bool:
return self.PAL24_CARD_BUTTON_VISIBLE
def get_remnawave_user_delete_mode(self) -> str:
"""Возвращает режим удаления пользователей: 'delete' или 'disable'"""
mode = self.REMNAWAVE_USER_DELETE_MODE.lower().strip()
return mode if mode in ["delete", "disable"] else "delete"
def format_remnawave_user_description(
self,
*,
full_name: str,
username: Optional[str],
telegram_id: int
) -> str:
template = self.REMNAWAVE_USER_DESCRIPTION_TEMPLATE or "Bot user: {full_name} {username}"
template_for_formatting = template.replace("@{username}", "{username}")
username_clean = (username or "").lstrip("@")
values = defaultdict(str, {
"full_name": full_name,
"username": f"@{username_clean}" if username_clean else "",
"username_clean": username_clean,
"telegram_id": str(telegram_id)
})
description = template_for_formatting.format_map(values)
if not username_clean:
description = re.sub(r'@(?=\W|$)', '', description)
description = re.sub(r'\(\s*\)', '', description)
description = re.sub(r'\s+', ' ', description).strip()
return description
def format_remnawave_username(
self,
*,
full_name: str,
username: Optional[str],
telegram_id: int
) -> str:
template = self.REMNAWAVE_USER_USERNAME_TEMPLATE or "user_{telegram_id}"
username_clean = (username or "").lstrip("@")
full_name_value = full_name or ""
values = defaultdict(str, {
"full_name": full_name_value,
"username": username_clean,
"username_clean": username_clean,
"telegram_id": str(telegram_id),
})
raw_username = template.format_map(values).strip()
sanitized_username = re.sub(r"[^0-9A-Za-z._-]+", "_", raw_username)
sanitized_username = re.sub(r"_+", "_", sanitized_username).strip("._-")
if not sanitized_username:
sanitized_username = f"user_{telegram_id}"
return sanitized_username[:64]
@staticmethod
def parse_daily_time_list(raw_value: Optional[str]) -> List[time]:
if not raw_value:
return []
segments = re.split(r"[\s,;]+", raw_value.strip())
seen: set[tuple[int, int]] = set()
parsed: List[time] = []
for segment in segments:
if not segment:
continue
try:
hours_str, minutes_str = segment.split(":", 1)
hours = int(hours_str)
minutes = int(minutes_str)
except (ValueError, AttributeError):
continue
if not (0 <= hours < 24 and 0 <= minutes < 60):
continue
key = (hours, minutes)
if key in seen:
continue
seen.add(key)
parsed.append(time(hour=hours, minute=minutes))
parsed.sort()
return parsed
def get_remnawave_auto_sync_times(self) -> List[time]:
return self.parse_daily_time_list(self.REMNAWAVE_AUTO_SYNC_TIMES)
def get_traffic_monitored_nodes(self) -> List[str]:
"""Возвращает список UUID нод для мониторинга (пусто = все)"""
if not self.TRAFFIC_MONITORED_NODES:
return []
# Убираем комментарии (все после #)
value = self.TRAFFIC_MONITORED_NODES.split("#")[0].strip()
if not value:
return []
return [n.strip() for n in value.split(",") if n.strip()]
def get_traffic_ignored_nodes(self) -> List[str]:
"""Возвращает список UUID нод для исключения из мониторинга"""
if not self.TRAFFIC_IGNORED_NODES:
return []
# Убираем комментарии (все после #)
value = self.TRAFFIC_IGNORED_NODES.split("#")[0].strip()
if not value:
return []
return [n.strip() for n in value.split(",") if n.strip()]
def get_traffic_excluded_user_uuids(self) -> List[str]:
"""Возвращает список UUID пользователей для исключения из мониторинга (например, тунельные/служебные)"""
if not self.TRAFFIC_EXCLUDED_USER_UUIDS:
return []
# Убираем комментарии (все после #)
value = self.TRAFFIC_EXCLUDED_USER_UUIDS.split("#")[0].strip()
if not value:
return []
return [uuid.strip().lower() for uuid in value.split(",") if uuid.strip()]
def get_traffic_daily_check_time(self) -> Optional[time]:
"""Возвращает время суточной проверки трафика"""
times = self.parse_daily_time_list(self.TRAFFIC_DAILY_CHECK_TIME)
return times[0] if times else None
def get_display_name_banned_keywords(self) -> List[str]:
raw_value = self.DISPLAY_NAME_BANNED_KEYWORDS
if raw_value is None:
return []
if isinstance(raw_value, str):
candidates = re.split(r"[\n,]+", raw_value)
else:
candidates = list(raw_value)
unique: List[str] = []
seen: set[str] = set()
for candidate in candidates:
normalized = str(candidate).strip().lower()
if not normalized:
continue
if normalized in seen:
continue
seen.add(normalized)
unique.append(normalized)
return unique
def get_autopay_warning_days(self) -> List[int]:
try:
days = self.AUTOPAY_WARNING_DAYS
if isinstance(days, str):
if not days.strip():
return [3, 1]
return [int(x.strip()) for x in days.split(',') if x.strip()]
return [3, 1]
except (ValueError, AttributeError):
return [3, 1]
def is_autopay_enabled_by_default(self) -> bool:
value = getattr(self, "DEFAULT_AUTOPAY_ENABLED", True)
if isinstance(value, str):
normalized = value.strip().lower()
return normalized in {"1", "true", "yes", "on"}
return bool(value)
def is_auto_purchase_after_topup_enabled(self) -> bool:
value = getattr(self, "AUTO_PURCHASE_AFTER_TOPUP_ENABLED", False)
if isinstance(value, str):
normalized = value.strip().lower()
return normalized in {"1", "true", "yes", "on"}
return bool(value)
def is_auto_activate_after_topup_enabled(self) -> bool:
"""Умная автоактивация после пополнения баланса (без корзины)."""
value = getattr(self, "AUTO_ACTIVATE_AFTER_TOPUP_ENABLED", False)
if isinstance(value, str):
normalized = value.strip().lower()
return normalized in {"1", "true", "yes", "on"}
return bool(value)
def is_quick_amount_buttons_enabled(self) -> bool:
"""Показывать ли кнопки быстрого выбора суммы пополнения."""
return self.YOOKASSA_QUICK_AMOUNT_SELECTION_ENABLED and not self.DISABLE_TOPUP_BUTTONS
def get_available_languages(self) -> List[str]:
defaults = ["ru", "en", "ua", "zh"]
try:
langs = self.AVAILABLE_LANGUAGES
except AttributeError:
return defaults
candidates: List[str]
if isinstance(langs, str):
if not langs.strip():
return defaults
candidates = [chunk.strip() for chunk in langs.split(',')]
elif isinstance(langs, (list, tuple, set)):
candidates = [str(item).strip() for item in langs]
else:
return defaults
cleaned: List[str] = []
seen: set[str] = set()
for code in candidates:
if not code:
continue
normalized = code.lower()
if normalized in seen:
continue
seen.add(normalized)
cleaned.append(code)
return cleaned or defaults
def is_language_selection_enabled(self) -> bool:
return bool(getattr(self, "LANGUAGE_SELECTION_ENABLED", True))
def format_price(self, price_kopeks: int, round_kopeks: Optional[bool] = None) -> str:
"""
Форматирует цену в копейках для отображения пользователю.
Args:
price_kopeks: Сумма в копейках
round_kopeks: Если True, округляет копейки (≤50 вниз, >50 вверх).
Если None, использует настройку PRICE_ROUNDING_ENABLED.
Returns:
Отформатированная строка цены (например, "150 ₽")
"""
# Используем настройку если не передано явно
should_round = round_kopeks if round_kopeks is not None else self.PRICE_ROUNDING_ENABLED
sign = "-" if price_kopeks < 0 else ""
abs_kopeks = abs(price_kopeks)
rubles, kopeks = divmod(abs_kopeks, 100)
if should_round:
# Округление: ≤50 коп вниз, >50 коп вверх
if kopeks > 50:
rubles += 1
return f"{sign}{rubles} ₽"
# Без округления - показываем точное значение
if kopeks:
value = f"{sign}{rubles}.{kopeks:02d}".rstrip("0").rstrip(".")
return f"{value} ₽"
return f"{sign}{rubles} ₽"
def get_reports_chat_id(self) -> Optional[str]:
if self.ADMIN_REPORTS_CHAT_ID:
return self.ADMIN_REPORTS_CHAT_ID
return self.ADMIN_NOTIFICATIONS_CHAT_ID
def get_reports_topic_id(self) -> Optional[int]:
return self.ADMIN_REPORTS_TOPIC_ID or None
def get_reports_send_time(self) -> Optional[time]:
value = self.ADMIN_REPORTS_SEND_TIME
if not value:
return None
try:
hours_str, minutes_str = value.strip().split(":", 1)
hours = int(hours_str)
minutes = int(minutes_str)
if not (0 <= hours <= 23 and 0 <= minutes <= 59):
raise ValueError
return time(hour=hours, minute=minutes)
except (ValueError, AttributeError):
logging.getLogger(__name__).warning(
"Некорректное значение ADMIN_REPORTS_SEND_TIME: %s", value
)
return None
def kopeks_to_rubles(self, kopeks: int) -> float:
return kopeks / 100
def rubles_to_kopeks(self, rubles: float) -> int:
return int(rubles * 100)
@staticmethod
def _normalize_user_tag(value: Optional[str], setting_name: str) -> Optional[str]:
if value is None:
return None
cleaned = str(value).strip().upper()
if not cleaned:
return None
if len(cleaned) > 16:
logger.warning(
"Некорректная длина %s: максимум 16 символов, получено %s",
setting_name,
len(cleaned),
)
return None
if not USER_TAG_PATTERN.fullmatch(cleaned):
logger.warning(
"Некорректный формат %s: допустимы только A-Z, 0-9 и подчёркивание",
setting_name,
)
return None
return cleaned
def get_trial_warning_hours(self) -> int:
return self.TRIAL_WARNING_HOURS
def get_trial_user_tag(self) -> Optional[str]:
return self._normalize_user_tag(self.TRIAL_USER_TAG, "TRIAL_USER_TAG")
def get_paid_subscription_user_tag(self) -> Optional[str]:
return self._normalize_user_tag(
self.PAID_SUBSCRIPTION_USER_TAG,
"PAID_SUBSCRIPTION_USER_TAG",
)
def get_bot_username(self) -> Optional[str]:
username = getattr(self, "BOT_USERNAME", None)
if not username:
return None
normalized = str(username).strip().lstrip("@")
return normalized or None
def is_notifications_enabled(self) -> bool:
return self.ENABLE_NOTIFICATIONS
def get_main_menu_mode(self) -> str:
return getattr(self, "MAIN_MENU_MODE", "default")
def is_text_main_menu_mode(self) -> bool:
return self.get_main_menu_mode() == "text"
def get_main_menu_miniapp_url(self) -> Optional[str]:
for candidate in [self.MINIAPP_CUSTOM_URL, self.MINIAPP_PURCHASE_URL]:
value = (candidate or "").strip()
if value:
return value
return None
def get_app_config_path(self) -> str:
if os.path.isabs(self.APP_CONFIG_PATH):
return self.APP_CONFIG_PATH
project_root = Path(__file__).parent.parent
return str(project_root / self.APP_CONFIG_PATH)
def is_deep_links_enabled(self) -> bool:
return self.ENABLE_DEEP_LINKS
def get_miniapp_branding(self) -> Dict[str, Dict[str, Optional[str]]]:
def _clean(value: Optional[str]) -> Optional[str]:
if value is None:
return None
value_str = str(value).strip()
return value_str or None
name_en = _clean(self.MINIAPP_SERVICE_NAME_EN)
name_ru = _clean(self.MINIAPP_SERVICE_NAME_RU)
desc_en = _clean(self.MINIAPP_SERVICE_DESCRIPTION_EN)
desc_ru = _clean(self.MINIAPP_SERVICE_DESCRIPTION_RU)
default_name = name_en or name_ru or "RemnaWave VPN"
default_description = desc_en or desc_ru or "Secure & Fast Connection"
return {
"service_name": {
"default": default_name,
"en": name_en,
"ru": name_ru,
},
"service_description": {
"default": default_description,
"en": desc_en,
"ru": desc_ru,
},
}
def get_app_config_cache_ttl(self) -> int:
return self.APP_CONFIG_CACHE_TTL
def build_external_admin_token(self, bot_username: str) -> str:
"""Генерирует детерминированный и криптографически стойкий токен внешней админки."""
normalized = (bot_username or "").strip().lstrip("@").lower()
if not normalized:
raise ValueError("Bot username is required to build external admin token")
secret = (self.BOT_TOKEN or "").strip()
if not secret:
raise ValueError("Bot token is required to build external admin token")
digest = hmac.new(
key=secret.encode("utf-8"),
msg=f"remnawave.external_admin::{normalized}".encode("utf-8"),
digestmod=hashlib.sha256,
).hexdigest()
return digest[:48]
def get_external_admin_token(self) -> Optional[str]:
token = (self.EXTERNAL_ADMIN_TOKEN or "").strip()
return token or None
def get_external_admin_bot_id(self) -> Optional[int]:
try:
return int(self.EXTERNAL_ADMIN_TOKEN_BOT_ID) if self.EXTERNAL_ADMIN_TOKEN_BOT_ID else None
except (TypeError, ValueError): # pragma: no cover - защитная ветка для некорректных значений
logging.getLogger(__name__).warning(
"Некорректный идентификатор бота для внешней админки: %s",
self.EXTERNAL_ADMIN_TOKEN_BOT_ID,
)
return None
def is_traffic_selectable(self) -> bool:
return self.TRAFFIC_SELECTION_MODE.lower() == "selectable"
def is_traffic_fixed(self) -> bool:
"""Возвращает True если выбор трафика отключён (fixed или fixed_with_topup)"""
return self.TRAFFIC_SELECTION_MODE.lower() in ("fixed", "fixed_with_topup")
def is_traffic_topup_blocked(self) -> bool:
"""Возвращает True если докупка трафика полностью заблокирована (только fixed)"""
return self.TRAFFIC_SELECTION_MODE.lower() == "fixed"
def get_fixed_traffic_limit(self) -> int:
return self.FIXED_TRAFFIC_LIMIT_GB
def is_traffic_topup_enabled(self) -> bool:
return self.TRAFFIC_TOPUP_ENABLED
def get_traffic_topup_packages(self) -> List[Dict]:
"""Возвращает пакеты для докупки трафика. Если не настроены - использует TRAFFIC_PACKAGES_CONFIG."""
config_str = self.TRAFFIC_TOPUP_PACKAGES_CONFIG.strip()
if not config_str:
# Если не настроены отдельные пакеты для докупки - используем основные
return self.get_traffic_packages()
packages = []
for package_config in config_str.split(','):
package_config = package_config.strip()
if not package_config:
continue
parts = package_config.split(':')
if len(parts) >= 2:
try:
gb = int(parts[0])
price = int(parts[1])
enabled = parts[2].lower() == 'true' if len(parts) > 2 else True
packages.append({"gb": gb, "price": price, "enabled": enabled})
except (ValueError, IndexError):
continue
return packages if packages else self.get_traffic_packages()
def get_traffic_topup_price(self, gb: Optional[int]) -> int:
"""Возвращает цену докупки для указанного количества ГБ."""
packages = self.get_traffic_topup_packages()
enabled_packages = [pkg for pkg in packages if pkg["enabled"]]
if not enabled_packages:
return 0
# Ищем точное совпадение
for pkg in enabled_packages:
if pkg["gb"] == gb:
return pkg["price"]
# Если не нашли - возвращаем 0
return 0
def get_traffic_reset_price_mode(self) -> str:
return self.TRAFFIC_RESET_PRICE_MODE.lower()
def get_traffic_reset_base_price(self) -> int:
return self.TRAFFIC_RESET_BASE_PRICE
def is_devices_selection_enabled(self) -> bool:
return self.DEVICES_SELECTION_ENABLED
def get_devices_selection_disabled_amount(self) -> Optional[int]:
raw_value = self.DEVICES_SELECTION_DISABLED_AMOUNT
if raw_value in (None, ""):
return None
try:
value = int(raw_value)
except (TypeError, ValueError):
logger.warning(
"Некорректное значение DEVICES_SELECTION_DISABLED_AMOUNT: %s",
raw_value,
)
return None
if value < 0:
return 0
return value
def get_disabled_mode_device_limit(self) -> Optional[int]:
return self.get_devices_selection_disabled_amount()
def is_modem_enabled(self) -> bool:
return bool(self.MODEM_ENABLED)
def is_tariffs_mode(self) -> bool:
"""Проверяет, включен ли режим продаж 'Тарифы'."""
return self.SALES_MODE == "tariffs"
def is_classic_mode(self) -> bool:
"""Проверяет, включен ли классический режим продаж."""
return self.SALES_MODE != "tariffs"
def get_sales_mode(self) -> str:
"""Возвращает текущий режим продаж."""
return self.SALES_MODE if self.SALES_MODE in ("classic", "tariffs") else "classic"
def get_trial_tariff_id(self) -> int:
"""Возвращает ID тарифа для триала (0 = использовать стандартные настройки)."""
return self.TRIAL_TARIFF_ID if self.TRIAL_TARIFF_ID > 0 else 0
def get_modem_price_per_month(self) -> int:
try:
value = int(self.MODEM_PRICE_PER_MONTH)
except (TypeError, ValueError):
logger.warning(
"Некорректное значение MODEM_PRICE_PER_MONTH: %s",
self.MODEM_PRICE_PER_MONTH,
)
return 10000
return max(0, value)
def get_modem_period_discounts(self) -> Dict[int, int]:
"""Возвращает скидки на модем по количеству месяцев: {месяцев: процент_скидки}"""
try:
config_str = (self.MODEM_PERIOD_DISCOUNTS or "").strip()
if not config_str:
return {}
discounts: Dict[int, int] = {}
for part in config_str.split(','):
part = part.strip()
if not part:
continue
months_and_discount = part.split(':')
if len(months_and_discount) != 2:
continue
months_str, discount_str = months_and_discount
try:
months = int(months_str.strip())
discount_percent = int(discount_str.strip())
except ValueError:
continue
discounts[months] = max(0, min(100, discount_percent))
return discounts
except Exception:
return {}
def get_modem_period_discount(self, months: int) -> int:
"""Возвращает процент скидки для указанного количества месяцев"""
if months <= 0:
return 0
discounts = self.get_modem_period_discounts()
# Ищем точное совпадение или ближайшее меньшее
applicable_discount = 0
for discount_months, discount_percent in sorted(discounts.items()):
if months >= discount_months:
applicable_discount = discount_percent
return applicable_discount
def is_trial_paid_activation_enabled(self) -> bool:
# TRIAL_PAYMENT_ENABLED - главный переключатель платной активации
# Если выключен - триал бесплатный, независимо от цены
if not self.TRIAL_PAYMENT_ENABLED:
return False
# Если включен - проверяем что цена > 0
return self.TRIAL_ACTIVATION_PRICE > 0
def get_trial_activation_price(self) -> int:
try:
value = int(self.TRIAL_ACTIVATION_PRICE)
except (TypeError, ValueError):
logger.warning(
"Некорректное значение TRIAL_ACTIVATION_PRICE: %s",
self.TRIAL_ACTIVATION_PRICE,
)
return 0
if value < 0:
return 0
return value
def is_yookassa_enabled(self) -> bool:
return (self.YOOKASSA_ENABLED and
self.YOOKASSA_SHOP_ID is not None and
self.YOOKASSA_SECRET_KEY is not None)
def is_nalogo_enabled(self) -> bool:
return (self.NALOGO_ENABLED and
self.NALOGO_INN is not None and
self.NALOGO_PASSWORD is not None)
def is_support_topup_enabled(self) -> bool:
return bool(self.SUPPORT_TOPUP_ENABLED)
def get_yookassa_return_url(self) -> str:
if self.YOOKASSA_RETURN_URL:
return self.YOOKASSA_RETURN_URL
elif self.WEBHOOK_URL:
return f"{self.WEBHOOK_URL}/payment-success"
return "https://t.me/"
def is_cryptobot_enabled(self) -> bool:
return (self.CRYPTOBOT_ENABLED and
self.CRYPTOBOT_API_TOKEN is not None)
def is_heleket_enabled(self) -> bool:
return (
self.HELEKET_ENABLED
and self.HELEKET_MERCHANT_ID is not None
and self.HELEKET_API_KEY is not None
)
def is_mulenpay_enabled(self) -> bool:
return (
self.MULENPAY_ENABLED
and self.MULENPAY_API_KEY is not None
and self.MULENPAY_SECRET_KEY is not None
and self.MULENPAY_SHOP_ID is not None
)
def get_mulenpay_display_name(self) -> str:
name = (self.MULENPAY_DISPLAY_NAME or "").strip()
if not name:
return "Mulen Pay"
return name
def get_mulenpay_display_name_html(self) -> str:
return html.escape(self.get_mulenpay_display_name())
def get_mulenpay_expected_origin(self) -> Optional[str]:
override = (self.MULENPAY_IFRAME_EXPECTED_ORIGIN or "").strip()
if override:
return override
base_url = (self.MULENPAY_BASE_URL or "").strip()
if not base_url:
return None
parsed = urlparse(base_url)
if parsed.scheme and parsed.netloc:
return f"{parsed.scheme}://{parsed.netloc}"
return None
def is_pal24_enabled(self) -> bool:
return (
self.PAL24_ENABLED
and self.PAL24_API_TOKEN is not None
and self.PAL24_SHOP_ID is not None
)
def is_platega_enabled(self) -> bool:
return (
self.PLATEGA_ENABLED
and self.PLATEGA_MERCHANT_ID is not None
and self.PLATEGA_SECRET is not None
)
def get_platega_display_name(self) -> str:
name = (self.PLATEGA_DISPLAY_NAME or "").strip()
if not name:
return "Platega"
return name
def get_platega_display_name_html(self) -> str:
return html.escape(self.get_platega_display_name())
def get_platega_return_url(self) -> Optional[str]:
if self.PLATEGA_RETURN_URL:
return self.PLATEGA_RETURN_URL
if self.WEBHOOK_URL:
return f"{self.WEBHOOK_URL}/payment-success"
return None
def get_platega_failed_url(self) -> Optional[str]:
if self.PLATEGA_FAILED_URL:
return self.PLATEGA_FAILED_URL
if self.WEBHOOK_URL:
return f"{self.WEBHOOK_URL}/payment-failed"
return None
def get_platega_active_methods(self) -> List[int]:
raw_value = str(self.PLATEGA_ACTIVE_METHODS or "")
normalized = raw_value.replace(";", ",")
methods: list[int] = []
seen: set[int] = set()
for part in normalized.split(","):
part = part.strip()
if not part:
continue
try:
method_code = int(part)
except ValueError:
logger.warning("Некорректный код метода Platega: %s", part)
continue
if method_code in {2, 10, 11, 12, 13} and method_code not in seen:
methods.append(method_code)
seen.add(method_code)
if not methods:
return [2]
return methods
@staticmethod
def get_platega_method_definitions() -> Dict[int, Dict[str, str]]:
return {
2: {"name": "СБП (QR)", "title": "🏦 СБП (QR)"},
10: {"name": "Банковские карты (RUB)", "title": "💳 Карты (RUB)"},
11: {"name": "Банковские карты", "title": "💳 Банковские карты"},
12: {"name": "Международные карты", "title": "🌍 Международные карты"},
13: {"name": "Криптовалюта", "title": "🪙 Криптовалюта"},
}
def get_platega_method_display_name(self, method_code: int) -> str:
definitions = self.get_platega_method_definitions()
info = definitions.get(method_code)
if info and info.get("name"):
return info["name"]
return f"Метод {method_code}"
def get_platega_method_display_title(self, method_code: int) -> str:
definitions = self.get_platega_method_definitions()
info = definitions.get(method_code)
if not info:
return f"Platega {method_code}"
return info.get("title") or info.get("name") or f"Platega {method_code}"
def is_wata_enabled(self) -> bool:
return (
self.WATA_ENABLED
and self.WATA_ACCESS_TOKEN is not None
and self.WATA_TERMINAL_PUBLIC_ID is not None
)
def is_cloudpayments_enabled(self) -> bool:
return (
self.CLOUDPAYMENTS_ENABLED
and self.CLOUDPAYMENTS_PUBLIC_ID is not None
and self.CLOUDPAYMENTS_API_SECRET is not None
)
def is_freekassa_enabled(self) -> bool:
return (
self.FREEKASSA_ENABLED
and self.FREEKASSA_SHOP_ID is not None
and self.FREEKASSA_API_KEY is not None
and self.FREEKASSA_SECRET_WORD_1 is not None
and self.FREEKASSA_SECRET_WORD_2 is not None
)
def get_freekassa_display_name(self) -> str:
name = (self.FREEKASSA_DISPLAY_NAME or "").strip()
return name if name else "Freekassa"
def get_freekassa_display_name_html(self) -> str:
return html.escape(self.get_freekassa_display_name())
def is_payment_verification_auto_check_enabled(self) -> bool:
return self.PAYMENT_VERIFICATION_AUTO_CHECK_ENABLED
def get_payment_verification_auto_check_interval(self) -> int:
try:
minutes = int(self.PAYMENT_VERIFICATION_AUTO_CHECK_INTERVAL_MINUTES)
except (TypeError, ValueError): # pragma: no cover - защитная проверка конфигурации
minutes = 10
if minutes <= 0:
logger.warning(
"Некорректный интервал автопроверки платежей: %s. Используется значение по умолчанию 10 минут.",
self.PAYMENT_VERIFICATION_AUTO_CHECK_INTERVAL_MINUTES,
)
return 10
return minutes
def get_cryptobot_base_url(self) -> str:
if self.CRYPTOBOT_TESTNET:
return "https://testnet-pay.crypt.bot"
return self.CRYPTOBOT_BASE_URL
def get_cryptobot_assets(self) -> List[str]:
try:
assets = self.CRYPTOBOT_ASSETS.strip()
if not assets:
return ["USDT", "TON"]
return [asset.strip() for asset in assets.split(',') if asset.strip()]
except (ValueError, AttributeError):
return ["USDT", "TON"]
def get_cryptobot_invoice_expires_seconds(self) -> int:
return self.CRYPTOBOT_INVOICE_EXPIRES_HOURS * 3600
def get_heleket_markup_percent(self) -> float:
try:
return float(self.HELEKET_MARKUP_PERCENT)
except (TypeError, ValueError):
return 0.0
def get_heleket_lifetime(self) -> int:
try:
value = int(self.HELEKET_INVOICE_LIFETIME)
except (TypeError, ValueError):
value = 3600
return max(300, min(43200, value))
def get_heleket_callback_url(self) -> Optional[str]:
if self.HELEKET_CALLBACK_URL:
return self.HELEKET_CALLBACK_URL
if self.WEBHOOK_URL:
return f"{self.WEBHOOK_URL}{self.HELEKET_WEBHOOK_PATH}"
return None
def is_happ_cryptolink_mode(self) -> bool:
return self.CONNECT_BUTTON_MODE == "happ_cryptolink"
def is_happ_download_button_enabled(self) -> bool:
return self.is_happ_cryptolink_mode() and self.CONNECT_BUTTON_HAPP_DOWNLOAD_ENABLED
def should_hide_subscription_link(self) -> bool:
"""Returns True when subscription links must be hidden from the interface."""
if self.is_happ_cryptolink_mode():
return False
return self.HIDE_SUBSCRIPTION_LINK
def is_contests_enabled(self) -> bool:
if getattr(self, "CONTESTS_ENABLED", False):
return True
# legacy fallback
return bool(getattr(self, "REFERRAL_CONTESTS_ENABLED", False))
def is_referral_contests_enabled(self) -> bool:
# kept for backward compatibility
return self.is_contests_enabled()
def get_happ_cryptolink_redirect_template(self) -> Optional[str]:
template = (self.HAPP_CRYPTOLINK_REDIRECT_TEMPLATE or "").strip()
return template or None
def get_happ_download_link(self, platform: str) -> Optional[str]:
platform_key = platform.lower()
if platform_key == "pc":
platform_key = "windows"
links = {
"ios": (self.HAPP_DOWNLOAD_LINK_IOS or "").strip(),
"android": (self.HAPP_DOWNLOAD_LINK_ANDROID or "").strip(),
"macos": (self.HAPP_DOWNLOAD_LINK_MACOS or "").strip(),
"windows": (
(self.HAPP_DOWNLOAD_LINK_WINDOWS or "").strip()
or (self.HAPP_DOWNLOAD_LINK_PC or "").strip()
),
}
link = links.get(platform_key)
return link if link else None
def is_maintenance_mode(self) -> bool:
return self.MAINTENANCE_MODE
def get_maintenance_message(self) -> str:
return self.MAINTENANCE_MESSAGE
def get_maintenance_check_interval(self) -> int:
return self.MAINTENANCE_CHECK_INTERVAL
def get_maintenance_retry_attempts(self) -> int:
try:
attempts = int(self.MAINTENANCE_RETRY_ATTEMPTS)
except (TypeError, ValueError):
attempts = 1
return max(1, attempts)
def is_base_promo_group_period_discount_enabled(self) -> bool:
return self.BASE_PROMO_GROUP_PERIOD_DISCOUNTS_ENABLED
def get_base_promo_group_period_discounts(self) -> Dict[int, int]:
try:
config_str = (self.BASE_PROMO_GROUP_PERIOD_DISCOUNTS or "").strip()
if not config_str:
return {}
discounts: Dict[int, int] = {}
for part in config_str.split(','):
part = part.strip()
if not part:
continue
period_and_discount = part.split(':')
if len(period_and_discount) != 2:
continue
period_str, discount_str = period_and_discount
try:
period_days = int(period_str.strip())
discount_percent = int(discount_str.strip())
except ValueError:
continue
discounts[period_days] = max(0, min(100, discount_percent))
return discounts
except Exception:
return {}
def get_base_promo_group_period_discount(self, period_days: Optional[int]) -> int:
if not period_days or not self.is_base_promo_group_period_discount_enabled():
return 0
discounts = self.get_base_promo_group_period_discounts()
return discounts.get(period_days, 0)
def is_maintenance_auto_enable(self) -> bool:
return self.MAINTENANCE_AUTO_ENABLE
def is_maintenance_monitoring_enabled(self) -> bool:
return self.MAINTENANCE_MONITORING_ENABLED
def get_available_subscription_periods(self) -> List[int]:
"""
Возвращает доступные периоды подписки.
Использует AVAILABLE_SUBSCRIPTION_PERIODS для фильтрации.
Не фильтрует по цене, т.к. в режиме classic базовая цена может быть 0.
"""
# Получаем разрешённые периоды из настройки
try:
periods_str = self.AVAILABLE_SUBSCRIPTION_PERIODS
if not periods_str or not periods_str.strip():
allowed_periods = {14, 30, 60, 90, 180, 360}
else:
allowed_periods = set()
for period_str in periods_str.split(','):
period_str = period_str.strip()
if period_str:
allowed_periods.add(int(period_str))
except (ValueError, AttributeError):
allowed_periods = {14, 30, 60, 90, 180, 360}
# Возвращаем только разрешённые периоды (без фильтрации по цене,
# т.к. в режиме classic цена складывается из серверов/трафика/устройств)
periods = sorted(list(allowed_periods))
return periods if periods else [30, 90, 180]
def get_available_renewal_periods(self) -> List[int]:
"""
Возвращает доступные периоды продления.
Использует AVAILABLE_RENEWAL_PERIODS для фильтрации.
Не фильтрует по цене, т.к. в режиме classic базовая цена может быть 0.
"""
# Получаем разрешённые периоды из настройки
try:
periods_str = self.AVAILABLE_RENEWAL_PERIODS
if not periods_str or not periods_str.strip():
allowed_periods = {30, 60, 90, 180, 360}
else:
allowed_periods = set()
for period_str in periods_str.split(','):
period_str = period_str.strip()
if period_str:
allowed_periods.add(int(period_str))
except (ValueError, AttributeError):
allowed_periods = {30, 60, 90, 180, 360}
# Возвращаем только разрешённые периоды (без фильтрации по цене)
periods = sorted(list(allowed_periods))
return periods if periods else [30, 90, 180]
def get_configured_subscription_periods(self) -> List[int]:
"""
Возвращает настроенные периоды подписки из AVAILABLE_SUBSCRIPTION_PERIODS.
БЕЗ фильтрации по ценам - используется для админки.
"""
try:
periods_str = self.AVAILABLE_SUBSCRIPTION_PERIODS
if not periods_str or not periods_str.strip():
return [14, 30, 60, 90, 180, 360]
periods = []
for period_str in periods_str.split(','):
period_str = period_str.strip()
if period_str:
periods.append(int(period_str))
return sorted(periods) if periods else [14, 30, 60, 90, 180, 360]
except (ValueError, AttributeError):
return [14, 30, 60, 90, 180, 360]
def get_configured_renewal_periods(self) -> List[int]:
"""
Возвращает настроенные периоды продления из AVAILABLE_RENEWAL_PERIODS.
БЕЗ фильтрации по ценам - используется для админки.
"""
try:
periods_str = self.AVAILABLE_RENEWAL_PERIODS
if not periods_str or not periods_str.strip():
return [30, 60, 90, 180, 360]
periods = []
for period_str in periods_str.split(','):
period_str = period_str.strip()
if period_str:
periods.append(int(period_str))
return sorted(periods) if periods else [30, 60, 90, 180, 360]
except (ValueError, AttributeError):
return [30, 60, 90, 180, 360]
def get_balance_payment_description(self, amount_kopeks: int, telegram_user_id: Optional[int] = None) -> str:
# Базовое описание
description = f"{self.PAYMENT_BALANCE_DESCRIPTION} на {self.format_price(amount_kopeks)}"
# Если передан user_id, добавляем его
if telegram_user_id is not None:
description += f" (ID {telegram_user_id})"
# Формируем финальную строку по шаблону
return self.PAYMENT_BALANCE_TEMPLATE.format(
service_name=self.PAYMENT_SERVICE_NAME,
description=description
)
def get_subscription_payment_description(self, period_days: int, amount_kopeks: int) -> str:
return self.PAYMENT_SUBSCRIPTION_TEMPLATE.format(
service_name=self.PAYMENT_SERVICE_NAME,
description=f"{self.PAYMENT_SUBSCRIPTION_DESCRIPTION} на {period_days} дней"
)
def get_custom_payment_description(self, description: str) -> str:
return self.PAYMENT_BALANCE_TEMPLATE.format(
service_name=self.PAYMENT_SERVICE_NAME,
description=description
)
def get_stars_rate(self) -> float:
return self.TELEGRAM_STARS_RATE_RUB
def stars_to_rubles(self, stars: int) -> float:
return stars * self.get_stars_rate()
def rubles_to_stars(self, rubles: float) -> int:
rate = self.get_stars_rate()
if rate <= 0:
raise ValueError("Stars rate must be positive")
return max(1, math.ceil(rubles / rate))
def get_admin_notifications_chat_id(self) -> Optional[int]:
if not self.ADMIN_NOTIFICATIONS_CHAT_ID:
return None
try:
return int(self.ADMIN_NOTIFICATIONS_CHAT_ID)
except (ValueError, TypeError):
return None
def is_admin_notifications_enabled(self) -> bool:
return (self.ADMIN_NOTIFICATIONS_ENABLED and
self.get_admin_notifications_chat_id() is not None)
def get_backup_send_chat_id(self) -> Optional[int]:
if not self.BACKUP_SEND_CHAT_ID:
return None
try:
return int(self.BACKUP_SEND_CHAT_ID)
except (ValueError, TypeError):
return None
def is_backup_send_enabled(self) -> bool:
return (self.BACKUP_SEND_ENABLED and
self.get_backup_send_chat_id() is not None)
def get_backup_archive_password(self) -> Optional[str]:
password = (self.BACKUP_ARCHIVE_PASSWORD or "").strip()
return password if password else None
# === Log Rotation Methods ===
def is_log_rotation_enabled(self) -> bool:
"""Проверить, включена ли новая система ротации логов."""
return self.LOG_ROTATION_ENABLED
def get_log_rotation_chat_id(self) -> Optional[int]:
"""Получить ID канала для отправки логов.
Если LOG_ROTATION_CHAT_ID не задан, использует BACKUP_SEND_CHAT_ID.
"""
chat_id = self.LOG_ROTATION_CHAT_ID or self.BACKUP_SEND_CHAT_ID
if not chat_id:
return None
try:
return int(chat_id)
except (ValueError, TypeError):
return None
def get_log_rotation_topic_id(self) -> Optional[int]:
"""Получить ID топика для отправки логов.
Если LOG_ROTATION_TOPIC_ID не задан, использует BACKUP_SEND_TOPIC_ID.
"""
topic_id = self.LOG_ROTATION_TOPIC_ID
if topic_id is not None:
return topic_id
return self.BACKUP_SEND_TOPIC_ID
def get_referral_settings(self) -> Dict:
return {
"program_enabled": self.is_referral_program_enabled(),
"minimum_topup_kopeks": self.REFERRAL_MINIMUM_TOPUP_KOPEKS,
"first_topup_bonus_kopeks": self.REFERRAL_FIRST_TOPUP_BONUS_KOPEKS,
"inviter_bonus_kopeks": self.REFERRAL_INVITER_BONUS_KOPEKS,
"commission_percent": self.REFERRAL_COMMISSION_PERCENT,
"notifications_enabled": self.REFERRAL_NOTIFICATIONS_ENABLED,
"withdrawal_enabled": self.REFERRAL_WITHDRAWAL_ENABLED,
"withdrawal_min_amount_kopeks": self.REFERRAL_WITHDRAWAL_MIN_AMOUNT_KOPEKS,
"withdrawal_cooldown_days": self.REFERRAL_WITHDRAWAL_COOLDOWN_DAYS,
}
def is_referral_withdrawal_enabled(self) -> bool:
"""Проверяет, включена ли функция вывода реферального баланса."""
return self.is_referral_program_enabled() and self.REFERRAL_WITHDRAWAL_ENABLED
def is_referral_program_enabled(self) -> bool:
return bool(self.REFERRAL_PROGRAM_ENABLED)
def is_referral_notifications_enabled(self) -> bool:
return self.REFERRAL_NOTIFICATIONS_ENABLED
def get_traffic_packages(self) -> List[Dict]:
import logging
logger = logging.getLogger(__name__)
try:
packages = []
config_str = self.TRAFFIC_PACKAGES_CONFIG.strip()
logger.debug(f"CONFIG STRING: '{config_str}'")
if not config_str:
logger.debug("CONFIG EMPTY, USING FALLBACK")
return self._get_fallback_traffic_packages()
logger.debug("PARSING CONFIG...")
for package_config in config_str.split(','):
package_config = package_config.strip()
if not package_config:
continue
parts = package_config.split(':')
if len(parts) != 3:
continue
try:
gb = int(parts[0])
price = int(parts[1])
enabled = parts[2].lower() == 'true'
packages.append({
"gb": gb,
"price": price,
"enabled": enabled
})
except ValueError:
continue
logger.debug(f"PARSED {len(packages)} packages from config")
return packages if packages else self._get_fallback_traffic_packages()
except Exception as e:
logger.info(f"ERROR PARSING CONFIG: {e}")
return self._get_fallback_traffic_packages()
def is_version_check_enabled(self) -> bool:
return self.VERSION_CHECK_ENABLED
def get_version_check_repo(self) -> str:
return self.VERSION_CHECK_REPO
def get_version_check_interval(self) -> int:
return self.VERSION_CHECK_INTERVAL_HOURS
def _get_fallback_traffic_packages(self) -> List[Dict]:
try:
if self.TRAFFIC_PACKAGES_CONFIG.strip():
packages = []
for package_config in self.TRAFFIC_PACKAGES_CONFIG.split(','):
package_config = package_config.strip()
if not package_config:
continue
parts = package_config.split(':')
if len(parts) != 3:
continue
try:
gb = int(parts[0])
price = int(parts[1])
enabled = parts[2].lower() == 'true'
packages.append({
"gb": gb,
"price": price,
"enabled": enabled
})
except ValueError:
continue
if packages:
return packages
except Exception as e:
pass
return [
{"gb": 5, "price": self.PRICE_TRAFFIC_5GB, "enabled": True},
{"gb": 10, "price": self.PRICE_TRAFFIC_10GB, "enabled": True},
{"gb": 25, "price": self.PRICE_TRAFFIC_25GB, "enabled": True},
{"gb": 50, "price": self.PRICE_TRAFFIC_50GB, "enabled": True},
{"gb": 100, "price": self.PRICE_TRAFFIC_100GB, "enabled": True},
{"gb": 250, "price": self.PRICE_TRAFFIC_250GB, "enabled": True},
{"gb": 500, "price": self.PRICE_TRAFFIC_500GB, "enabled": True},
{"gb": 1000, "price": self.PRICE_TRAFFIC_1000GB, "enabled": True},
{"gb": 0, "price": self.PRICE_TRAFFIC_UNLIMITED, "enabled": True},
]
def get_traffic_price(self, gb: Optional[int]) -> int:
packages = self.get_traffic_packages()
enabled_packages = [pkg for pkg in packages if pkg["enabled"]]
if not enabled_packages:
return 0
if gb is None:
gb = 0
for package in enabled_packages:
if package["gb"] == gb:
return package["price"]
unlimited_package = next((pkg for pkg in enabled_packages if pkg["gb"] == 0), None)
if gb <= 0:
return unlimited_package["price"] if unlimited_package else 0
finite_packages = [pkg for pkg in enabled_packages if pkg["gb"] > 0]
if not finite_packages:
return unlimited_package["price"] if unlimited_package else 0
max_package = max(finite_packages, key=lambda x: x["gb"])
if gb >= max_package["gb"]:
return unlimited_package["price"] if unlimited_package else max_package["price"]
suitable_packages = [pkg for pkg in finite_packages if pkg["gb"] >= gb]
if suitable_packages:
nearest_package = min(suitable_packages, key=lambda x: x["gb"])
return nearest_package["price"]
return unlimited_package["price"] if unlimited_package else 0
def _clean_support_contact(self) -> str:
return (self.SUPPORT_USERNAME or "").strip()
def get_support_contact_url(self) -> Optional[str]:
contact = self._clean_support_contact()
if not contact:
return None
if contact.startswith(("http://", "https://", "tg://")):
return contact
contact_without_prefix = contact.lstrip("@")
if contact_without_prefix.startswith(("t.me/", "telegram.me/", "telegram.dog/")):
return f"https://{contact_without_prefix}"
if contact.startswith(("t.me/", "telegram.me/", "telegram.dog/")):
return f"https://{contact}"
if "." in contact_without_prefix:
return f"https://{contact_without_prefix}"
if contact_without_prefix:
return f"https://t.me/{contact_without_prefix}"
return None
def get_support_contact_display(self) -> str:
contact = self._clean_support_contact()
if not contact:
return ""
if contact.startswith("@"):
return contact
if contact.startswith(("http://", "https://", "tg://")):
return contact
if contact.startswith(("t.me/", "telegram.me/", "telegram.dog/")):
url = self.get_support_contact_url()
return url if url else contact
contact_without_prefix = contact.lstrip("@")
if "." in contact_without_prefix:
url = self.get_support_contact_url()
return url if url else contact
if re.fullmatch(r"[A-Za-z0-9_]{3,}", contact_without_prefix):
return f"@{contact_without_prefix}"
return contact
def get_support_contact_display_html(self) -> str:
return html.escape(self.get_support_contact_display())
def get_server_status_mode(self) -> str:
return self.SERVER_STATUS_MODE
def is_server_status_enabled(self) -> bool:
return self.get_server_status_mode() != "disabled"
def get_server_status_external_url(self) -> Optional[str]:
url = (self.SERVER_STATUS_EXTERNAL_URL or "").strip()
return url or None
def get_server_status_metrics_url(self) -> Optional[str]:
url = (self.SERVER_STATUS_METRICS_URL or "").strip()
return url or None
def get_server_status_metrics_auth(self) -> Optional[tuple[str, str]]:
username = (self.SERVER_STATUS_METRICS_USERNAME or "").strip()
password_raw = self.SERVER_STATUS_METRICS_PASSWORD
if not username:
return None
password = "" if password_raw is None else str(password_raw)
return username, password
def get_server_status_items_per_page(self) -> int:
return max(1, self.SERVER_STATUS_ITEMS_PER_PAGE)
def get_server_status_request_timeout(self) -> int:
return max(1, self.SERVER_STATUS_REQUEST_TIMEOUT)
def is_web_api_enabled(self) -> bool:
return bool(self.WEB_API_ENABLED)
def get_web_api_allowed_origins(self) -> list[str]:
raw = (self.WEB_API_ALLOWED_ORIGINS or "").split(",")
origins = [origin.strip() for origin in raw if origin.strip()]
return origins or ["*"]
def get_web_api_docs_config(self) -> Dict[str, Optional[str]]:
if self.WEB_API_DOCS_ENABLED:
return {
"docs_url": "/docs",
"redoc_url": "/redoc",
"openapi_url": "/openapi.json",
}
return {"docs_url": None, "redoc_url": None, "openapi_url": None}
def get_support_system_mode(self) -> str:
mode = (self.SUPPORT_SYSTEM_MODE or "both").strip().lower()
return mode if mode in {"tickets", "contact", "both"} else "both"
def is_support_tickets_enabled(self) -> bool:
return self.get_support_system_mode() in {"tickets", "both"}
def is_support_contact_enabled(self) -> bool:
return self.get_support_system_mode() in {"contact", "both"}
# MiniApp tickets settings
def is_miniapp_tickets_enabled(self) -> bool:
"""Check if tickets are enabled in miniapp."""
return bool(self.MINIAPP_TICKETS_ENABLED)
def get_miniapp_support_type(self) -> str:
"""Get miniapp support type: tickets, profile, or url."""
support_type = (self.MINIAPP_SUPPORT_TYPE or "tickets").strip().lower()
return support_type if support_type in {"tickets", "profile", "url"} else "tickets"
def get_miniapp_support_url(self) -> str:
"""Get custom support URL for miniapp (when type is 'url')."""
return (self.MINIAPP_SUPPORT_URL or "").strip()
def get_bot_run_mode(self) -> str:
mode = (self.BOT_RUN_MODE or "polling").strip().lower()
if mode not in {"polling", "webhook", "both"}:
return "polling"
return mode
def get_telegram_webhook_path(self) -> str:
raw_path = (self.WEBHOOK_PATH or "/webhook").strip()
if not raw_path:
raw_path = "/webhook"
if not raw_path.startswith("/"):
raw_path = "/" + raw_path
return raw_path
def get_webhook_queue_maxsize(self) -> int:
try:
size = int(self.WEBHOOK_MAX_QUEUE_SIZE)
except (TypeError, ValueError):
size = 1024
return max(1, size)
def get_webhook_worker_count(self) -> int:
try:
workers = int(self.WEBHOOK_WORKERS)
except (TypeError, ValueError):
workers = 1
return max(1, workers)
def get_webhook_enqueue_timeout(self) -> float:
try:
timeout = float(self.WEBHOOK_ENQUEUE_TIMEOUT)
except (TypeError, ValueError):
timeout = 0.0
return max(0.0, timeout)
def get_webhook_shutdown_timeout(self) -> float:
try:
timeout = float(self.WEBHOOK_WORKER_SHUTDOWN_TIMEOUT)
except (TypeError, ValueError):
timeout = 30.0
return max(1.0, timeout)
def get_telegram_webhook_url(self) -> Optional[str]:
base_url = (self.WEBHOOK_URL or "").strip()
if not base_url:
return None
path = self.get_telegram_webhook_path()
return f"{base_url.rstrip('/')}{path}"
def get_miniapp_static_path(self) -> Path:
raw_path = (self.MINIAPP_STATIC_PATH or "miniapp").strip()
if not raw_path:
raw_path = "miniapp"
return Path(raw_path)
# Cabinet methods
def is_cabinet_enabled(self) -> bool:
return bool(self.CABINET_ENABLED)
def get_cabinet_jwt_secret(self) -> str:
if self.CABINET_JWT_SECRET:
return self.CABINET_JWT_SECRET
return self.BOT_TOKEN
def get_cabinet_access_token_expire_minutes(self) -> int:
return max(1, self.CABINET_ACCESS_TOKEN_EXPIRE_MINUTES)
def get_cabinet_refresh_token_expire_days(self) -> int:
return max(1, self.CABINET_REFRESH_TOKEN_EXPIRE_DAYS)
def get_cabinet_allowed_origins(self) -> List[str]:
if not self.CABINET_ALLOWED_ORIGINS:
return []
return [o.strip() for o in self.CABINET_ALLOWED_ORIGINS.split(",") if o.strip()]
def is_cabinet_email_verification_enabled(self) -> bool:
return bool(self.CABINET_EMAIL_VERIFICATION_ENABLED)
def get_cabinet_email_verification_expire_hours(self) -> int:
return max(1, self.CABINET_EMAIL_VERIFICATION_EXPIRE_HOURS)
def get_cabinet_password_reset_expire_hours(self) -> int:
return max(1, self.CABINET_PASSWORD_RESET_EXPIRE_HOURS)
def is_smtp_configured(self) -> bool:
return bool(self.SMTP_HOST and self.SMTP_USER and self.SMTP_PASSWORD)
def get_smtp_from_email(self) -> Optional[str]:
if self.SMTP_FROM_EMAIL:
return self.SMTP_FROM_EMAIL
return self.SMTP_USER
# Ban System helpers
def is_ban_system_enabled(self) -> bool:
return bool(self.BAN_SYSTEM_ENABLED)
def is_ban_system_configured(self) -> bool:
return bool(self.BAN_SYSTEM_API_URL and self.BAN_SYSTEM_API_TOKEN)
def get_ban_system_api_url(self) -> Optional[str]:
if self.BAN_SYSTEM_API_URL:
return self.BAN_SYSTEM_API_URL.rstrip('/')
return None
def get_ban_system_api_token(self) -> Optional[str]:
return self.BAN_SYSTEM_API_TOKEN
def get_ban_system_request_timeout(self) -> int:
return max(1, self.BAN_SYSTEM_REQUEST_TIMEOUT)
model_config = {
"env_file": ".env",
"env_file_encoding": "utf-8",
"extra": "ignore"
}
@field_validator("TIMEZONE")
@classmethod
def validate_timezone(cls, value: str) -> str:
try:
ZoneInfo(value)
except Exception as exc: # pragma: no cover - defensive validation
raise ValueError(
f"Некорректный идентификатор часового пояса: {value}"
) from exc
return value
settings = Settings()
ENV_OVERRIDE_KEYS = set(settings.model_fields_set)
_PERIOD_PRICE_FIELDS: Dict[int, str] = {
14: "PRICE_14_DAYS",
30: "PRICE_30_DAYS",
60: "PRICE_60_DAYS",
90: "PRICE_90_DAYS",
180: "PRICE_180_DAYS",
360: "PRICE_360_DAYS",
}
# Хранилище периодов/цен из БД (приоритет над .env)
_DB_PERIOD_PRICES: Optional[Dict[int, int]] = None
def set_period_prices_from_db(period_prices: Dict[int, int]) -> None:
"""
Устанавливает периоды/цены из БД.
Вызывается после синхронизации тарифов при запуске бота.
"""
global _DB_PERIOD_PRICES
_DB_PERIOD_PRICES = period_prices.copy() if period_prices else None
refresh_period_prices()
def get_db_period_prices() -> Optional[Dict[int, int]]:
"""Возвращает периоды/цены из БД если они загружены."""
return _DB_PERIOD_PRICES
def refresh_period_prices() -> None:
"""
Rebuild cached period price mapping.
Приоритет: БД > .env
"""
PERIOD_PRICES.clear()
if _DB_PERIOD_PRICES:
# Используем цены из БД
PERIOD_PRICES.update(_DB_PERIOD_PRICES)
else:
# Fallback на .env
PERIOD_PRICES.update(
{
days: getattr(settings, field_name, 0)
for days, field_name in _PERIOD_PRICE_FIELDS.items()
}
)
PERIOD_PRICES: Dict[int, int] = {}
refresh_period_prices()
def get_traffic_prices() -> Dict[int, int]:
packages = settings.get_traffic_packages()
return {package["gb"]: package["price"] for package in packages}
TRAFFIC_PRICES = get_traffic_prices()
def refresh_traffic_prices():
global TRAFFIC_PRICES
TRAFFIC_PRICES = get_traffic_prices()
refresh_traffic_prices()
settings._original_database_url = settings.DATABASE_URL
settings.DATABASE_URL = settings.get_database_url()