Improve YooKassa webhook recovery

This commit is contained in:
Egor
2025-10-26 06:52:18 +03:00
parent 6651e59f63
commit b780fe2278
5 changed files with 595 additions and 26 deletions

View File

@@ -1,6 +1,8 @@
import asyncio
import hashlib
import json
import logging
from typing import Optional, Dict, Any
from typing import Optional, Dict, Any, Tuple
import aiohttp
@@ -17,6 +19,10 @@ class MulenPayService:
self.shop_id = settings.MULENPAY_SHOP_ID
self.secret_key = settings.MULENPAY_SECRET_KEY
self.base_url = settings.MULENPAY_BASE_URL.rstrip("/")
self._timeout = aiohttp.ClientTimeout(total=30, connect=10, sock_read=25)
self._max_retries = 3
self._retry_delay = 0.5
self._retryable_statuses = {500, 502, 503, 504}
@property
def is_configured(self) -> bool:
@@ -45,31 +51,115 @@ class MulenPayService:
"Content-Type": "application/json",
}
try:
timeout = aiohttp.ClientTimeout(total=30)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.request(
last_error: Optional[BaseException] = None
for attempt in range(1, self._max_retries + 1):
try:
async with aiohttp.ClientSession(timeout=self._timeout) as session:
async with session.request(
method,
url,
headers=headers,
json=json_data,
params=params,
) as response:
data, raw_text = await self._deserialize_response(response)
if response.status >= 400:
logger.error(
"MulenPay API error %s %s: %s",
response.status,
endpoint,
raw_text,
)
if (
response.status in self._retryable_statuses
and attempt < self._max_retries
):
await self._sleep_with_backoff(attempt)
continue
return None
if data is None:
if raw_text:
logger.warning(
"MulenPay returned unexpected payload for %s: %s",
endpoint,
raw_text,
)
return None
return data
except asyncio.CancelledError:
logger.debug("MulenPay request cancelled: %s %s", method, endpoint)
raise
except asyncio.TimeoutError as error:
last_error = error
logger.warning(
"MulenPay request timeout (%s %s) attempt %s/%s",
method,
url,
headers=headers,
json=json_data,
params=params,
) as response:
data = await response.json(content_type=None)
endpoint,
attempt,
self._max_retries,
)
except aiohttp.ClientError as error:
last_error = error
logger.warning(
"MulenPay client error (%s %s) attempt %s/%s: %s",
method,
endpoint,
attempt,
self._max_retries,
error,
)
except Exception as error: # pragma: no cover - safety
logger.error("Unexpected MulenPay error: %s", error, exc_info=True)
return None
if response.status >= 400:
logger.error(
"MulenPay API error %s %s: %s", response.status, endpoint, data
)
return None
if attempt < self._max_retries:
await self._sleep_with_backoff(attempt)
return data
except aiohttp.ClientError as error:
logger.error("MulenPay API request error: %s", error)
return None
except Exception as error: # pragma: no cover - safety
logger.error("Unexpected MulenPay error: %s", error, exc_info=True)
return None
if isinstance(last_error, asyncio.TimeoutError):
logger.error(
"MulenPay request timed out after %s attempts: %s %s",
self._max_retries,
method,
endpoint,
)
elif last_error is not None:
logger.error(
"MulenPay request failed after %s attempts (%s %s): %s",
self._max_retries,
method,
endpoint,
last_error,
)
return None
async def _sleep_with_backoff(self, attempt: int) -> None:
await asyncio.sleep(self._retry_delay * attempt)
async def _deserialize_response(
self, response: aiohttp.ClientResponse
) -> Tuple[Optional[Dict[str, Any]], str]:
raw_text = await response.text()
if not raw_text:
return None, ""
content_type = response.headers.get("Content-Type", "")
if "json" in content_type.lower() or not content_type:
try:
return json.loads(raw_text), raw_text
except json.JSONDecodeError as error:
logger.error(
"Failed to decode MulenPay JSON response %s: %s",
response.url,
error,
)
return None, raw_text
return None, raw_text
@staticmethod
def _format_amount(amount_kopeks: int) -> str:

View File

@@ -8,6 +8,7 @@ from __future__ import annotations
import logging
from datetime import datetime
from decimal import Decimal, InvalidOperation
from importlib import import_module
from typing import Any, Dict, Optional, TYPE_CHECKING
@@ -545,14 +546,40 @@ class YooKassaPaymentMixin:
logger.warning(
"Локальный платеж для YooKassa id %s не найден", yookassa_payment_id
)
return False
payment = await self._restore_missing_yookassa_payment(db, event_object)
if not payment:
logger.error(
"Не удалось восстановить локальную запись платежа YooKassa %s",
yookassa_payment_id,
)
return False
payment.status = event_object.get("status", payment.status)
payment.confirmation_url = event_object.get("confirmation_url")
payment.confirmation_url = self._extract_confirmation_url(event_object)
payment.payment_method_type = (
(event_object.get("payment_method") or {}).get("type")
or payment.payment_method_type
)
payment.refundable = event_object.get("refundable", getattr(payment, "refundable", False))
current_paid = bool(getattr(payment, "is_paid", getattr(payment, "paid", False)))
payment.is_paid = bool(event_object.get("paid", current_paid))
captured_at_raw = event_object.get("captured_at")
if captured_at_raw:
try:
payment.captured_at = datetime.fromisoformat(
captured_at_raw.replace("Z", "+00:00")
).replace(tzinfo=None)
except Exception as error:
logger.debug(
"Не удалось распарсить captured_at=%s: %s",
captured_at_raw,
error,
)
await db.commit()
await db.refresh(payment)
@@ -565,3 +592,152 @@ class YooKassaPaymentMixin:
payment.status,
)
return True
async def _restore_missing_yookassa_payment(
self,
db: AsyncSession,
event_object: Dict[str, Any],
) -> Optional["YooKassaPayment"]:
"""Создает локальную запись платежа на основе данных webhook, если она отсутствует."""
yookassa_payment_id = event_object.get("id")
if not yookassa_payment_id:
return None
metadata = self._normalise_yookassa_metadata(event_object.get("metadata"))
user_id_raw = metadata.get("user_id") or metadata.get("userId")
if user_id_raw is None:
logger.error(
"Webhook YooKassa %s не содержит user_id в metadata. Невозможно восстановить платеж.",
yookassa_payment_id,
)
return None
try:
user_id = int(user_id_raw)
except (TypeError, ValueError):
logger.error(
"Webhook YooKassa %s содержит некорректный user_id=%s",
yookassa_payment_id,
user_id_raw,
)
return None
amount_info = event_object.get("amount") or {}
amount_value = amount_info.get("value")
currency = (amount_info.get("currency") or "RUB").upper()
if amount_value is None:
logger.error(
"Webhook YooKassa %s не содержит сумму платежа",
yookassa_payment_id,
)
return None
try:
amount_kopeks = int((Decimal(str(amount_value)) * 100).quantize(Decimal("1")))
except (InvalidOperation, ValueError) as error:
logger.error(
"Некорректная сумма в webhook YooKassa %s: %s (%s)",
yookassa_payment_id,
amount_value,
error,
)
return None
description = event_object.get("description") or metadata.get("description") or "YooKassa платеж"
payment_method_type = (event_object.get("payment_method") or {}).get("type")
yookassa_created_at = None
created_at_raw = event_object.get("created_at")
if created_at_raw:
try:
yookassa_created_at = datetime.fromisoformat(
created_at_raw.replace("Z", "+00:00")
).replace(tzinfo=None)
except Exception as error: # pragma: no cover - диагностический лог
logger.debug(
"Не удалось распарсить created_at=%s для YooKassa %s: %s",
created_at_raw,
yookassa_payment_id,
error,
)
payment_module = import_module("app.services.payment_service")
local_payment = await payment_module.create_yookassa_payment(
db=db,
user_id=user_id,
yookassa_payment_id=yookassa_payment_id,
amount_kopeks=amount_kopeks,
currency=currency,
description=description,
status=event_object.get("status", "pending"),
confirmation_url=self._extract_confirmation_url(event_object),
metadata_json=metadata,
payment_method_type=payment_method_type,
yookassa_created_at=yookassa_created_at,
test_mode=bool(event_object.get("test") or event_object.get("test_mode")),
)
if not local_payment:
return None
await payment_module.update_yookassa_payment_status(
db=db,
yookassa_payment_id=yookassa_payment_id,
status=event_object.get("status", local_payment.status),
is_paid=bool(event_object.get("paid")),
is_captured=event_object.get("status") == "succeeded",
captured_at=self._parse_datetime(event_object.get("captured_at")),
payment_method_type=payment_method_type,
)
return await payment_module.get_yookassa_payment_by_id(db, yookassa_payment_id)
@staticmethod
def _normalise_yookassa_metadata(metadata: Any) -> Dict[str, Any]:
if isinstance(metadata, dict):
return metadata
if isinstance(metadata, list):
normalised: Dict[str, Any] = {}
for item in metadata:
key = item.get("key") if isinstance(item, dict) else None
if key:
normalised[key] = item.get("value")
return normalised
if isinstance(metadata, str):
try:
import json
parsed = json.loads(metadata)
if isinstance(parsed, dict):
return parsed
except json.JSONDecodeError:
logger.debug("Не удалось распарсить metadata webhook YooKassa: %s", metadata)
return {}
@staticmethod
def _extract_confirmation_url(event_object: Dict[str, Any]) -> Optional[str]:
if "confirmation_url" in event_object:
return event_object.get("confirmation_url")
confirmation = event_object.get("confirmation")
if isinstance(confirmation, dict):
return confirmation.get("confirmation_url") or confirmation.get("return_url")
return None
@staticmethod
def _parse_datetime(raw_value: Optional[str]) -> Optional[datetime]:
if not raw_value:
return None
try:
return datetime.fromisoformat(raw_value.replace("Z", "+00:00")).replace(tzinfo=None)
except Exception:
return None

View File

@@ -40,6 +40,11 @@ async def create_yookassa_payment(*args, **kwargs):
return await yk_crud.create_yookassa_payment(*args, **kwargs)
async def update_yookassa_payment_status(*args, **kwargs):
yk_crud = import_module("app.database.crud.yookassa")
return await yk_crud.update_yookassa_payment_status(*args, **kwargs)
async def link_yookassa_payment_to_transaction(*args, **kwargs):
yk_crud = import_module("app.database.crud.yookassa")
return await yk_crud.link_yookassa_payment_to_transaction(*args, **kwargs)

View File

@@ -2,8 +2,10 @@
from __future__ import annotations
import asyncio
import json
from pathlib import Path
from typing import Any, Dict, Optional
from typing import Any, Dict, Optional, Sequence
import sys
import pytest
@@ -16,6 +18,57 @@ from app.config import settings # noqa: E402
from app.services.mulenpay_service import MulenPayService # noqa: E402
class _DummyResponse:
def __init__(
self,
*,
status: int,
body: str = "{}",
headers: Optional[Dict[str, str]] = None,
url: str = "https://mulenpay.test/endpoint",
) -> None:
self.status = status
self._body = body
self.headers = headers or {"Content-Type": "application/json"}
self.url = url
async def __aenter__(self) -> "_DummyResponse":
return self
async def __aexit__(self, exc_type, exc, tb) -> bool: # pragma: no cover - interface
return False
async def text(self) -> str:
return self._body
class _DummySession:
def __init__(self, result: Any) -> None:
self._result = result
async def __aenter__(self) -> "_DummySession":
return self
async def __aexit__(self, exc_type, exc, tb) -> bool: # pragma: no cover - interface
return False
def request(self, *args: Any, **kwargs: Any) -> Any:
if isinstance(self._result, BaseException):
raise self._result
return self._result
def _session_factory(responses: Sequence[Any]) -> Any:
call_state = {"index": 0}
def _factory(*_args: Any, **_kwargs: Any) -> _DummySession:
index = min(call_state["index"], len(responses) - 1)
call_state["index"] += 1
return _DummySession(responses[index])
return _factory
@pytest.fixture
def anyio_backend() -> str:
return "asyncio"
@@ -105,3 +158,88 @@ async def test_get_payment(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(service, "_request", fake_request, raising=False)
result = await service.get_payment(123)
assert result == {"id": 123, "status": "paid"}
@pytest.mark.anyio("asyncio")
async def test_request_success(monkeypatch: pytest.MonkeyPatch) -> None:
_enable_service(monkeypatch)
service = MulenPayService()
response_payload = {"ok": True}
monkeypatch.setattr(
"app.services.mulenpay_service.aiohttp.ClientSession",
_session_factory([
_DummyResponse(status=200, body=json.dumps(response_payload)),
]),
)
result = await service._request("GET", "/ping")
assert result == response_payload
@pytest.mark.anyio("asyncio")
async def test_request_retries_on_server_error(monkeypatch: pytest.MonkeyPatch) -> None:
_enable_service(monkeypatch)
service = MulenPayService()
service._max_retries = 2
sleep_calls: list[float] = []
async def fake_sleep(delay: float) -> None:
sleep_calls.append(delay)
monkeypatch.setattr(
"app.services.mulenpay_service.asyncio.sleep",
fake_sleep,
)
monkeypatch.setattr(
"app.services.mulenpay_service.aiohttp.ClientSession",
_session_factory(
[
_DummyResponse(status=502, body="{\"error\": \"bad gateway\"}"),
_DummyResponse(status=200, body="{\"ok\": true}"),
]
),
)
result = await service._request("GET", "/retry")
assert result == {"ok": True}
assert sleep_calls == [service._retry_delay]
@pytest.mark.anyio("asyncio")
async def test_request_returns_none_after_timeouts(monkeypatch: pytest.MonkeyPatch) -> None:
_enable_service(monkeypatch)
service = MulenPayService()
service._max_retries = 2
async def fake_sleep(_delay: float) -> None:
return None
monkeypatch.setattr(
"app.services.mulenpay_service.asyncio.sleep",
fake_sleep,
)
monkeypatch.setattr(
"app.services.mulenpay_service.aiohttp.ClientSession",
_session_factory([asyncio.TimeoutError()]),
)
result = await service._request("GET", "/timeout")
assert result is None
@pytest.mark.anyio("asyncio")
async def test_request_reraises_cancelled(monkeypatch: pytest.MonkeyPatch) -> None:
_enable_service(monkeypatch)
service = MulenPayService()
monkeypatch.setattr(
"app.services.mulenpay_service.aiohttp.ClientSession",
_session_factory([asyncio.CancelledError()]),
)
with pytest.raises(asyncio.CancelledError):
await service._request("GET", "/cancel")

View File

@@ -496,6 +496,166 @@ async def test_process_yookassa_webhook_success(monkeypatch: pytest.MonkeyPatch)
assert admin_calls
@pytest.mark.anyio("asyncio")
async def test_process_yookassa_webhook_restores_missing_payment(
monkeypatch: pytest.MonkeyPatch,
) -> None:
bot = DummyBot()
service = _make_service(bot)
fake_session = FakeSession()
restored_payment = SimpleNamespace(
yookassa_payment_id="yk_456",
user_id=21,
amount_kopeks=0,
status="pending",
is_paid=False,
transaction_id=None,
description="",
payment_method_type=None,
confirmation_url=None,
metadata_json=None,
test_mode=False,
refundable=False,
)
get_calls = {"count": 0}
async def fake_get_payment(db, payment_id):
get_calls["count"] += 1
if get_calls["count"] == 1:
return None
return restored_payment
async def fake_create_payment(**kwargs: Any):
restored_payment.user_id = kwargs["user_id"]
restored_payment.amount_kopeks = kwargs["amount_kopeks"]
restored_payment.status = kwargs["status"]
restored_payment.description = kwargs["description"]
restored_payment.payment_method_type = kwargs["payment_method_type"]
restored_payment.confirmation_url = kwargs["confirmation_url"]
restored_payment.metadata_json = kwargs["metadata_json"]
restored_payment.test_mode = kwargs["test_mode"]
restored_payment.yookassa_payment_id = kwargs["yookassa_payment_id"]
restored_payment.yookassa_created_at = kwargs["yookassa_created_at"]
return restored_payment
async def fake_update_status(
db,
yookassa_payment_id,
status,
is_paid,
is_captured,
captured_at,
payment_method_type,
):
restored_payment.status = status
restored_payment.is_paid = is_paid
restored_payment.is_captured = is_captured
restored_payment.captured_at = captured_at
restored_payment.payment_method_type = payment_method_type
return restored_payment
async def fake_link(db, yookassa_payment_id, transaction_id):
restored_payment.transaction_id = transaction_id
monkeypatch.setattr(payment_service_module, "get_yookassa_payment_by_id", fake_get_payment)
monkeypatch.setattr(payment_service_module, "create_yookassa_payment", fake_create_payment)
monkeypatch.setattr(payment_service_module, "update_yookassa_payment_status", fake_update_status)
monkeypatch.setattr(payment_service_module, "link_yookassa_payment_to_transaction", fake_link)
transactions: list[Dict[str, Any]] = []
async def fake_create_transaction(db, **kwargs):
transactions.append(kwargs)
return SimpleNamespace(id=555, **kwargs)
monkeypatch.setattr(payment_service_module, "create_transaction", fake_create_transaction)
user = SimpleNamespace(
id=21,
telegram_id=2100,
balance_kopeks=0,
has_made_first_topup=False,
promo_group=None,
subscription=None,
referred_by_id=None,
referrer=None,
)
async def fake_get_user(db, user_id):
return user
monkeypatch.setattr(payment_service_module, "get_user_by_id", fake_get_user)
monkeypatch.setattr(type(settings), "format_price", lambda self, amount: f"{amount / 100:.2f}", raising=False)
referral_mock = SimpleNamespace(process_referral_topup=AsyncMock())
monkeypatch.setitem(sys.modules, "app.services.referral_service", referral_mock)
admin_calls: list[Any] = []
class DummyAdminService:
def __init__(self, bot):
self.bot = bot
async def send_balance_topup_notification(self, *args, **kwargs):
admin_calls.append((args, kwargs))
monkeypatch.setitem(sys.modules, "app.services.admin_notification_service", SimpleNamespace(AdminNotificationService=lambda bot: DummyAdminService(bot)))
service.build_topup_success_keyboard = AsyncMock(return_value=None)
payload = {
"object": {
"id": "yk_456",
"status": "succeeded",
"paid": True,
"amount": {"value": "150.00", "currency": "RUB"},
"metadata": {"user_id": "21", "payment_purpose": "balance_topup"},
"description": "Пополнение",
"payment_method": {"type": "bank_card"},
"created_at": "2024-01-02T12:00:00Z",
"captured_at": "2024-01-02T12:05:00Z",
"confirmation": {"confirmation_url": "https://pay.example"},
}
}
result = await service.process_yookassa_webhook(fake_session, payload)
assert result is True
assert get_calls["count"] >= 2 # повторный запрос после восстановления
assert restored_payment.amount_kopeks == 15000
assert restored_payment.is_paid is True
assert transactions and transactions[0]["amount_kopeks"] == 15000
assert restored_payment.transaction_id == 555
assert user.balance_kopeks == 15000
assert bot.sent_messages
assert admin_calls
@pytest.mark.anyio("asyncio")
async def test_process_yookassa_webhook_missing_metadata(monkeypatch: pytest.MonkeyPatch) -> None:
service = _make_service(DummyBot())
db = FakeSession()
async def fake_get_payment(db_session, payment_id):
return None
create_mock = AsyncMock()
update_mock = AsyncMock()
monkeypatch.setattr(payment_service_module, "get_yookassa_payment_by_id", fake_get_payment)
monkeypatch.setattr(payment_service_module, "create_yookassa_payment", create_mock)
monkeypatch.setattr(payment_service_module, "update_yookassa_payment_status", update_mock)
payload = {"object": {"id": "yk_missing", "status": "succeeded", "paid": True}}
result = await service.process_yookassa_webhook(db, payload)
assert result is False
create_mock.assert_not_awaited()
update_mock.assert_not_awaited()
@pytest.mark.anyio("asyncio")
async def test_process_yookassa_webhook_missing_id(monkeypatch: pytest.MonkeyPatch) -> None:
bot = DummyBot()