Add dynamic Telegram Stars rate with auto refresh

This commit is contained in:
Egor
2025-11-01 00:25:55 +03:00
parent 49bcd18a9a
commit 61a93f86b3
8 changed files with 550 additions and 0 deletions

View File

@@ -181,6 +181,7 @@ MIN_BALANCE_FOR_AUTOPAY_KOPEKS=10000
# Telegram Stars (работает автоматически)
TELEGRAM_STARS_ENABLED=true
TELEGRAM_STARS_RATE_RUB=1.3
TELEGRAM_STARS_CUSTOM_RATE_ENABLED=false
# Tribute (https://tribute.app)
TRIBUTE_ENABLED=false

View File

@@ -595,6 +595,7 @@ MIN_BALANCE_FOR_AUTOPAY_KOPEKS=10000
# Telegram Stars
TELEGRAM_STARS_ENABLED=true
TELEGRAM_STARS_RATE_RUB=1.3
TELEGRAM_STARS_CUSTOM_RATE_ENABLED=false
# Tribute
TRIBUTE_ENABLED=false

View File

@@ -164,6 +164,7 @@ class Settings(BaseSettings):
TELEGRAM_STARS_ENABLED: bool = True
TELEGRAM_STARS_RATE_RUB: float = 1.3
TELEGRAM_STARS_CUSTOM_RATE_ENABLED: bool = False
TRIBUTE_ENABLED: bool = False
TRIBUTE_API_KEY: Optional[str] = None
@@ -1121,6 +1122,23 @@ class Settings(BaseSettings):
)
def get_stars_rate(self) -> float:
"""Возвращает актуальный курс Telegram Stars в рублях."""
if self.TELEGRAM_STARS_CUSTOM_RATE_ENABLED:
return self.TELEGRAM_STARS_RATE_RUB
try:
from app.services.telegram_stars_rate_service import ( # pylint: disable=import-outside-toplevel
telegram_stars_rate_service,
)
telegram_stars_rate_service.ensure_refresh()
cached_rate = telegram_stars_rate_service.get_cached_rate()
if cached_rate:
return cached_rate
except Exception as error: # pragma: no cover - защитный блок от циклических импортов
logger.debug("Не удалось получить актуальный курс Stars: %s", error)
return self.TELEGRAM_STARS_RATE_RUB
def stars_to_rubles(self, stars: int) -> float:

View File

@@ -269,6 +269,7 @@ class BotConfigurationService:
"VERSION_CHECK_REPO": "VERSION",
"VERSION_CHECK_INTERVAL_HOURS": "VERSION",
"TELEGRAM_STARS_RATE_RUB": "TELEGRAM",
"TELEGRAM_STARS_CUSTOM_RATE_ENABLED": "TELEGRAM",
"REMNAWAVE_USER_DESCRIPTION_TEMPLATE": "REMNAWAVE",
"REMNAWAVE_USER_USERNAME_TEMPLATE": "REMNAWAVE",
"REMNAWAVE_AUTO_SYNC_ENABLED": "REMNAWAVE",

View File

@@ -0,0 +1,387 @@
from __future__ import annotations
import asyncio
import json
import logging
import math
import re
import time
from dataclasses import dataclass
from typing import Any, Optional, Sequence, Tuple
import aiohttp
from app.config import settings
logger = logging.getLogger(__name__)
_RATE_KEYS: Tuple[str, ...] = (
"rub_per_star",
"rubperstar",
"price_per_star",
"per_star_price",
"perstarprice",
"priceperstar",
"per_star",
"perstar",
"rate",
"exchange_rate",
"exchangerate",
)
_STAR_KEYS: Tuple[str, ...] = (
"stars",
"star",
"star_count",
"stars_count",
"starcount",
"starscount",
"starQuantity",
"starquantity",
)
_PRICE_KEYS: Tuple[str, ...] = (
"price",
"amount",
"total",
"total_price",
"totalPrice",
"total_amount",
"totalAmount",
"value",
)
@dataclass(frozen=True, slots=True)
class _RequestSpec:
method: str
url: str
params: Optional[dict[str, Any]] = None
json_payload: Optional[dict[str, Any]] = None
class TelegramStarsRateService:
"""Получает и кэширует актуальный курс Telegram Stars."""
_REQUESTS: Sequence[_RequestSpec] = (
_RequestSpec(
method="POST",
url="https://pay.telegram.org/api/index",
json_payload={
"method": "getStarsExchangeRates",
"params": {"currency": "RUB"},
},
),
_RequestSpec(
method="GET",
url="https://pay.telegram.org/api/index",
params={"act": "pack", "type": "stars"},
),
)
_REQUEST_HEADERS = {
"User-Agent": "Mozilla/5.0 (compatible; RemnawaveBot/1.0)",
"Accept": "application/json, text/plain, */*",
"Referer": "https://pay.telegram.org/",
"Origin": "https://pay.telegram.org",
}
_REFRESH_INTERVAL_SECONDS = 15 * 60
_MIN_RETRY_INTERVAL_SECONDS = 60
_MIN_REASONABLE_RATE = 0.01
_MAX_REASONABLE_RATE = 100.0
def __init__(self) -> None:
self._rate: Optional[float] = float(settings.TELEGRAM_STARS_RATE_RUB or 0)
self._last_update: float = 0.0
self._last_attempt: float = 0.0
self._lock = asyncio.Lock()
self._background_task: Optional[asyncio.Task[Optional[float]]] = None
def get_cached_rate(self) -> Optional[float]:
"""Возвращает закэшированный курс Stars."""
if self._rate and self._rate >= self._MIN_REASONABLE_RATE:
return self._rate
return None
def ensure_refresh(self, force: bool = False) -> None:
"""Гарантирует запуск обновления курса в фоне при необходимости."""
if settings.TELEGRAM_STARS_CUSTOM_RATE_ENABLED:
return
if not force and not self._is_refresh_needed():
return
try:
loop = asyncio.get_running_loop()
except RuntimeError:
try:
asyncio.run(self.refresh_rate(force=force))
except RuntimeError:
logger.debug("Не удалось синхронно обновить курс Stars")
return
if self._background_task and not self._background_task.done():
return
self._background_task = loop.create_task(
self.refresh_rate(force=force),
name="telegram-stars-rate-refresh",
)
async def refresh_rate(self, force: bool = False) -> Optional[float]:
"""Асинхронно обновляет курс Stars."""
if settings.TELEGRAM_STARS_CUSTOM_RATE_ENABLED:
return float(settings.TELEGRAM_STARS_RATE_RUB)
if not force and not self._is_refresh_needed():
return self.get_cached_rate()
async with self._lock:
if not force and not self._is_refresh_needed():
return self.get_cached_rate()
self._last_attempt = time.monotonic()
try:
rate = await self._fetch_rate()
except Exception as error:
logger.warning("Не удалось получить курс Telegram Stars: %s", error)
return self.get_cached_rate()
if rate is None:
logger.debug("API Telegram Stars не вернуло валидный курс")
return self.get_cached_rate()
self._rate = rate
self._last_update = time.monotonic()
settings.TELEGRAM_STARS_RATE_RUB = rate
logger.info("Актуальный курс Telegram Stars обновлён: %.4f ₽/⭐", rate)
return rate
def _is_refresh_needed(self) -> bool:
now = time.monotonic()
if self._rate is None or self._rate < self._MIN_REASONABLE_RATE:
return now - self._last_attempt >= self._MIN_RETRY_INTERVAL_SECONDS
if now - self._last_update >= self._REFRESH_INTERVAL_SECONDS:
return now - self._last_attempt >= self._MIN_RETRY_INTERVAL_SECONDS
return False
async def _fetch_rate(self) -> Optional[float]:
timeout = aiohttp.ClientTimeout(total=10)
async with aiohttp.ClientSession(timeout=timeout) as session:
for request in self._REQUESTS:
try:
response = await session.request(
method=request.method,
url=request.url,
headers=self._REQUEST_HEADERS,
params=request.params,
json=request.json_payload,
)
except aiohttp.ClientError as error:
logger.debug(
"Ошибка запроса курса Stars (%s %s): %s",
request.method,
request.url,
error,
)
continue
if response.status >= 400:
body = await response.text()
logger.debug(
"API Telegram Stars ответило %s: %s",
response.status,
_truncate(body, 200),
)
continue
data = await self._read_json(response)
if data is None:
continue
rate = self._extract_rate(data)
if rate is not None:
return rate
return None
async def _read_json(self, response: aiohttp.ClientResponse) -> Optional[Any]:
try:
return await response.json(content_type=None)
except (aiohttp.ContentTypeError, json.JSONDecodeError, ValueError):
text = await response.text()
try:
return json.loads(text)
except json.JSONDecodeError:
logger.debug("Не удалось распарсить ответ Telegram Stars как JSON")
return None
@classmethod
def _extract_rate(cls, data: Any) -> Optional[float]:
candidates: list[Tuple[float, float]] = []
cls._collect_rate_candidates(data, candidates)
if not candidates:
return None
best_rate, _ = max(
candidates,
key=lambda item: (item[1], -item[0]),
)
return best_rate
@classmethod
def _collect_rate_candidates(
cls,
payload: Any,
result: list[Tuple[float, float]],
) -> None:
if isinstance(payload, dict):
direct_rate = cls._parse_direct_rate(payload)
if direct_rate is not None:
result.append((direct_rate, math.inf))
pack_rate = cls._parse_pack_rate(payload)
if pack_rate is not None:
result.append(pack_rate)
for value in payload.values():
cls._collect_rate_candidates(value, result)
elif isinstance(payload, list):
for item in payload:
cls._collect_rate_candidates(item, result)
@classmethod
def _parse_direct_rate(cls, data: dict[str, Any]) -> Optional[float]:
for key, value in data.items():
normalized_key = key.lower()
if normalized_key in _RATE_KEYS:
if isinstance(value, dict):
price = cls._parse_price_value(value)
normalized_rate = cls._normalize_rate(price)
if normalized_rate is not None:
return normalized_rate
continue
numeric = cls._coerce_number(value)
normalized_rate = cls._normalize_rate(numeric)
if normalized_rate is not None:
return normalized_rate
return None
@classmethod
def _parse_pack_rate(cls, data: dict[str, Any]) -> Optional[Tuple[float, float]]:
stars = cls._parse_stars_value(data)
if stars is None or stars <= 0:
return None
price = cls._parse_price_value(data)
if price is None or price <= 0:
return None
rate = cls._normalize_rate(price / stars)
if rate is None:
return None
return rate, float(stars)
@classmethod
def _parse_stars_value(cls, data: dict[str, Any]) -> Optional[float]:
for key in data.keys():
normalized_key = key.lower()
if normalized_key in _STAR_KEYS:
candidate = cls._coerce_number(data[key])
if candidate and candidate > 0:
return candidate
return None
@classmethod
def _parse_price_value(cls, value: Any) -> Optional[float]:
if isinstance(value, dict):
currency = value.get("currency") or value.get("code")
if currency and str(currency).upper() not in {"RUB", "RUR"}:
return None
for key in _PRICE_KEYS:
if key in value:
candidate = cls._parse_price_value(value[key])
if candidate is not None:
return candidate
return None
if isinstance(value, list):
for item in value:
candidate = cls._parse_price_value(item)
if candidate is not None:
return candidate
return None
return cls._coerce_number(value)
@classmethod
def _coerce_number(cls, value: Any) -> Optional[float]:
if value is None:
return None
if isinstance(value, (int, float)):
return float(value)
if isinstance(value, str):
cleaned = value.strip().replace("", "").replace(" ", "")
if not cleaned:
return None
match = re.search(r"-?\d+[\.,]?\d*", cleaned)
if not match:
return None
normalized = match.group(0).replace(",", ".")
try:
return float(normalized)
except ValueError:
return None
return None
@classmethod
def _normalize_rate(cls, value: Optional[float]) -> Optional[float]:
if value is None or value <= 0:
return None
rate = float(value)
attempts = 0
while rate > cls._MAX_REASONABLE_RATE and attempts < 3:
rate /= 100
attempts += 1
if cls._MIN_REASONABLE_RATE <= rate <= cls._MAX_REASONABLE_RATE:
return round(rate, 4)
return None
def _truncate(value: str, max_len: int) -> str:
value = value.strip()
if len(value) <= max_len:
return value
return value[: max_len - 1] + ""
telegram_stars_rate_service = TelegramStarsRateService()

View File

@@ -1,5 +1,7 @@
from __future__ import annotations
import logging
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
@@ -30,6 +32,9 @@ from .routes import (
)
logger = logging.getLogger(__name__)
OPENAPI_TAGS = [
{
"name": "health",
@@ -153,4 +158,18 @@ def create_web_api_app() -> FastAPI:
app.include_router(polls.router, prefix="/polls", tags=["polls"])
app.include_router(logs.router, prefix="/logs", tags=["logs"])
@app.on_event("startup")
async def refresh_stars_rate() -> None: # pragma: no cover - инфраструктурный хук
if settings.TELEGRAM_STARS_CUSTOM_RATE_ENABLED:
return
try:
from app.services.telegram_stars_rate_service import ( # pylint: disable=import-outside-toplevel
telegram_stars_rate_service,
)
await telegram_stars_rate_service.refresh_rate()
except Exception as error:
logger.debug("Не удалось обновить курс Stars при старте web api: %s", error)
return app

23
main.py
View File

@@ -35,6 +35,7 @@ from app.localization.loader import ensure_locale_templates
from app.services.system_settings_service import bot_configuration_service
from app.services.external_admin_service import ensure_external_admin_token
from app.services.broadcast_service import broadcast_service
from app.services.telegram_stars_rate_service import telegram_stars_rate_service
from app.utils.startup_timeline import StartupTimeline
@@ -141,6 +142,28 @@ async def main():
stage.warning(f"Не удалось загрузить конфигурацию: {error}")
logger.error(f"Не удалось загрузить конфигурацию: {error}")
async with timeline.stage(
"Актуализация курса Telegram Stars",
"",
success_message="Курс Stars актуален",
) as stage:
try:
rate = await telegram_stars_rate_service.refresh_rate(force=True)
if settings.TELEGRAM_STARS_CUSTOM_RATE_ENABLED:
stage.log(
"Используется пользовательский курс Stars: "
f"{settings.TELEGRAM_STARS_RATE_RUB:.2f} ₽/⭐"
)
elif rate:
stage.log(f"Текущий курс Stars: {rate:.2f} ₽/⭐")
else:
stage.warning(
"Не удалось получить курс из Telegram, используется значение из настроек"
)
except Exception as error:
stage.warning(f"Ошибка обновления курса Stars: {error}")
logger.warning("Ошибка обновления курса Telegram Stars: %s", error)
bot = None
dp = None
async with timeline.stage("Настройка бота", "🤖", success_message="Бот настроен") as stage:

View File

@@ -0,0 +1,100 @@
from pathlib import Path
from unittest.mock import AsyncMock
import sys
import pytest
ROOT_DIR = Path(__file__).resolve().parents[2]
if str(ROOT_DIR) not in sys.path:
sys.path.insert(0, str(ROOT_DIR))
from app.config import settings
from app.services import telegram_stars_rate_service as rate_module
from app.services.telegram_stars_rate_service import TelegramStarsRateService
@pytest.mark.parametrize(
"payload,expected",
[
(
{"rate": 1.79},
pytest.approx(1.79),
),
(
{"pricePerStar": "1.85"},
pytest.approx(1.85),
),
(
{
"packs": [
{
"stars": 250,
"price": {"currency": "RUB", "amount": 44900},
}
]
},
pytest.approx(1.796, rel=1e-3),
),
(
{
"data": {
"options": [
{
"star_count": "500",
"total_price": {"value": "895", "currency": "RUB"},
}
]
}
},
pytest.approx(1.79, rel=1e-3),
),
],
)
def test_extract_rate(payload, expected):
assert TelegramStarsRateService._extract_rate(payload) == expected
@pytest.mark.asyncio
async def test_refresh_rate_updates_settings(monkeypatch):
service = TelegramStarsRateService()
monkeypatch.setattr(settings, "TELEGRAM_STARS_CUSTOM_RATE_ENABLED", False, raising=False)
monkeypatch.setattr(settings, "TELEGRAM_STARS_RATE_RUB", 1.3, raising=False)
monkeypatch.setattr(service, "_fetch_rate", AsyncMock(return_value=1.91))
rate = await service.refresh_rate(force=True)
assert rate == pytest.approx(1.91)
assert settings.TELEGRAM_STARS_RATE_RUB == pytest.approx(1.91)
@pytest.mark.asyncio
async def test_refresh_rate_respects_custom_setting(monkeypatch):
service = TelegramStarsRateService()
monkeypatch.setattr(settings, "TELEGRAM_STARS_CUSTOM_RATE_ENABLED", True, raising=False)
monkeypatch.setattr(settings, "TELEGRAM_STARS_RATE_RUB", 2.05, raising=False)
fetch_mock = AsyncMock(return_value=1.5)
monkeypatch.setattr(service, "_fetch_rate", fetch_mock)
rate = await service.refresh_rate(force=True)
assert rate == pytest.approx(2.05)
fetch_mock.assert_not_awaited()
def test_settings_get_stars_rate_uses_dynamic(monkeypatch):
custom_service = TelegramStarsRateService()
monkeypatch.setattr(rate_module, "telegram_stars_rate_service", custom_service, raising=False)
monkeypatch.setattr(settings, "TELEGRAM_STARS_CUSTOM_RATE_ENABLED", False, raising=False)
monkeypatch.setattr(settings, "TELEGRAM_STARS_RATE_RUB", 1.3, raising=False)
custom_service._rate = 2.34
assert settings.get_stars_rate() == pytest.approx(2.34)
def test_settings_get_stars_rate_uses_custom(monkeypatch):
monkeypatch.setattr(settings, "TELEGRAM_STARS_CUSTOM_RATE_ENABLED", True, raising=False)
monkeypatch.setattr(settings, "TELEGRAM_STARS_RATE_RUB", 3.21, raising=False)
assert settings.get_stars_rate() == pytest.approx(3.21)