diff --git a/app/services/mulenpay_service.py b/app/services/mulenpay_service.py index afcb44d9..6cfbebf1 100644 --- a/app/services/mulenpay_service.py +++ b/app/services/mulenpay_service.py @@ -1,6 +1,8 @@ +import asyncio import hashlib +import json import logging -from typing import Optional, Dict, Any +from typing import Optional, Dict, Any, Tuple import aiohttp @@ -17,6 +19,10 @@ class MulenPayService: self.shop_id = settings.MULENPAY_SHOP_ID self.secret_key = settings.MULENPAY_SECRET_KEY self.base_url = settings.MULENPAY_BASE_URL.rstrip("/") + self._timeout = aiohttp.ClientTimeout(total=30, connect=10, sock_read=25) + self._max_retries = 3 + self._retry_delay = 0.5 + self._retryable_statuses = {500, 502, 503, 504} @property def is_configured(self) -> bool: @@ -45,31 +51,115 @@ class MulenPayService: "Content-Type": "application/json", } - try: - timeout = aiohttp.ClientTimeout(total=30) - async with aiohttp.ClientSession(timeout=timeout) as session: - async with session.request( + last_error: Optional[BaseException] = None + + for attempt in range(1, self._max_retries + 1): + try: + async with aiohttp.ClientSession(timeout=self._timeout) as session: + async with session.request( + method, + url, + headers=headers, + json=json_data, + params=params, + ) as response: + data, raw_text = await self._deserialize_response(response) + + if response.status >= 400: + logger.error( + "MulenPay API error %s %s: %s", + response.status, + endpoint, + raw_text, + ) + if ( + response.status in self._retryable_statuses + and attempt < self._max_retries + ): + await self._sleep_with_backoff(attempt) + continue + return None + + if data is None: + if raw_text: + logger.warning( + "MulenPay returned unexpected payload for %s: %s", + endpoint, + raw_text, + ) + return None + + return data + except asyncio.CancelledError: + logger.debug("MulenPay request cancelled: %s %s", method, endpoint) + raise + except asyncio.TimeoutError as error: + last_error = error + logger.warning( + "MulenPay request timeout (%s %s) attempt %s/%s", method, - url, - headers=headers, - json=json_data, - params=params, - ) as response: - data = await response.json(content_type=None) + endpoint, + attempt, + self._max_retries, + ) + except aiohttp.ClientError as error: + last_error = error + logger.warning( + "MulenPay client error (%s %s) attempt %s/%s: %s", + method, + endpoint, + attempt, + self._max_retries, + error, + ) + except Exception as error: # pragma: no cover - safety + logger.error("Unexpected MulenPay error: %s", error, exc_info=True) + return None - if response.status >= 400: - logger.error( - "MulenPay API error %s %s: %s", response.status, endpoint, data - ) - return None + if attempt < self._max_retries: + await self._sleep_with_backoff(attempt) - return data - except aiohttp.ClientError as error: - logger.error("MulenPay API request error: %s", error) - return None - except Exception as error: # pragma: no cover - safety - logger.error("Unexpected MulenPay error: %s", error, exc_info=True) - return None + if isinstance(last_error, asyncio.TimeoutError): + logger.error( + "MulenPay request timed out after %s attempts: %s %s", + self._max_retries, + method, + endpoint, + ) + elif last_error is not None: + logger.error( + "MulenPay request failed after %s attempts (%s %s): %s", + self._max_retries, + method, + endpoint, + last_error, + ) + + return None + + async def _sleep_with_backoff(self, attempt: int) -> None: + await asyncio.sleep(self._retry_delay * attempt) + + async def _deserialize_response( + self, response: aiohttp.ClientResponse + ) -> Tuple[Optional[Dict[str, Any]], str]: + raw_text = await response.text() + if not raw_text: + return None, "" + + content_type = response.headers.get("Content-Type", "") + if "json" in content_type.lower() or not content_type: + try: + return json.loads(raw_text), raw_text + except json.JSONDecodeError as error: + logger.error( + "Failed to decode MulenPay JSON response %s: %s", + response.url, + error, + ) + return None, raw_text + + return None, raw_text @staticmethod def _format_amount(amount_kopeks: int) -> str: diff --git a/app/services/payment/yookassa.py b/app/services/payment/yookassa.py index a04ee5dc..e9489bc3 100644 --- a/app/services/payment/yookassa.py +++ b/app/services/payment/yookassa.py @@ -8,6 +8,7 @@ from __future__ import annotations import logging from datetime import datetime +from decimal import Decimal, InvalidOperation from importlib import import_module from typing import Any, Dict, Optional, TYPE_CHECKING @@ -545,14 +546,40 @@ class YooKassaPaymentMixin: logger.warning( "Локальный платеж для YooKassa id %s не найден", yookassa_payment_id ) - return False + payment = await self._restore_missing_yookassa_payment(db, event_object) + + if not payment: + logger.error( + "Не удалось восстановить локальную запись платежа YooKassa %s", + yookassa_payment_id, + ) + return False payment.status = event_object.get("status", payment.status) - payment.confirmation_url = event_object.get("confirmation_url") + payment.confirmation_url = self._extract_confirmation_url(event_object) + + payment.payment_method_type = ( + (event_object.get("payment_method") or {}).get("type") + or payment.payment_method_type + ) + payment.refundable = event_object.get("refundable", getattr(payment, "refundable", False)) current_paid = bool(getattr(payment, "is_paid", getattr(payment, "paid", False))) payment.is_paid = bool(event_object.get("paid", current_paid)) + captured_at_raw = event_object.get("captured_at") + if captured_at_raw: + try: + payment.captured_at = datetime.fromisoformat( + captured_at_raw.replace("Z", "+00:00") + ).replace(tzinfo=None) + except Exception as error: + logger.debug( + "Не удалось распарсить captured_at=%s: %s", + captured_at_raw, + error, + ) + await db.commit() await db.refresh(payment) @@ -565,3 +592,152 @@ class YooKassaPaymentMixin: payment.status, ) return True + + async def _restore_missing_yookassa_payment( + self, + db: AsyncSession, + event_object: Dict[str, Any], + ) -> Optional["YooKassaPayment"]: + """Создает локальную запись платежа на основе данных webhook, если она отсутствует.""" + + yookassa_payment_id = event_object.get("id") + if not yookassa_payment_id: + return None + + metadata = self._normalise_yookassa_metadata(event_object.get("metadata")) + user_id_raw = metadata.get("user_id") or metadata.get("userId") + + if user_id_raw is None: + logger.error( + "Webhook YooKassa %s не содержит user_id в metadata. Невозможно восстановить платеж.", + yookassa_payment_id, + ) + return None + + try: + user_id = int(user_id_raw) + except (TypeError, ValueError): + logger.error( + "Webhook YooKassa %s содержит некорректный user_id=%s", + yookassa_payment_id, + user_id_raw, + ) + return None + + amount_info = event_object.get("amount") or {} + amount_value = amount_info.get("value") + currency = (amount_info.get("currency") or "RUB").upper() + + if amount_value is None: + logger.error( + "Webhook YooKassa %s не содержит сумму платежа", + yookassa_payment_id, + ) + return None + + try: + amount_kopeks = int((Decimal(str(amount_value)) * 100).quantize(Decimal("1"))) + except (InvalidOperation, ValueError) as error: + logger.error( + "Некорректная сумма в webhook YooKassa %s: %s (%s)", + yookassa_payment_id, + amount_value, + error, + ) + return None + + description = event_object.get("description") or metadata.get("description") or "YooKassa платеж" + payment_method_type = (event_object.get("payment_method") or {}).get("type") + + yookassa_created_at = None + created_at_raw = event_object.get("created_at") + if created_at_raw: + try: + yookassa_created_at = datetime.fromisoformat( + created_at_raw.replace("Z", "+00:00") + ).replace(tzinfo=None) + except Exception as error: # pragma: no cover - диагностический лог + logger.debug( + "Не удалось распарсить created_at=%s для YooKassa %s: %s", + created_at_raw, + yookassa_payment_id, + error, + ) + + payment_module = import_module("app.services.payment_service") + + local_payment = await payment_module.create_yookassa_payment( + db=db, + user_id=user_id, + yookassa_payment_id=yookassa_payment_id, + amount_kopeks=amount_kopeks, + currency=currency, + description=description, + status=event_object.get("status", "pending"), + confirmation_url=self._extract_confirmation_url(event_object), + metadata_json=metadata, + payment_method_type=payment_method_type, + yookassa_created_at=yookassa_created_at, + test_mode=bool(event_object.get("test") or event_object.get("test_mode")), + ) + + if not local_payment: + return None + + await payment_module.update_yookassa_payment_status( + db=db, + yookassa_payment_id=yookassa_payment_id, + status=event_object.get("status", local_payment.status), + is_paid=bool(event_object.get("paid")), + is_captured=event_object.get("status") == "succeeded", + captured_at=self._parse_datetime(event_object.get("captured_at")), + payment_method_type=payment_method_type, + ) + + return await payment_module.get_yookassa_payment_by_id(db, yookassa_payment_id) + + @staticmethod + def _normalise_yookassa_metadata(metadata: Any) -> Dict[str, Any]: + if isinstance(metadata, dict): + return metadata + + if isinstance(metadata, list): + normalised: Dict[str, Any] = {} + for item in metadata: + key = item.get("key") if isinstance(item, dict) else None + if key: + normalised[key] = item.get("value") + return normalised + + if isinstance(metadata, str): + try: + import json + + parsed = json.loads(metadata) + if isinstance(parsed, dict): + return parsed + except json.JSONDecodeError: + logger.debug("Не удалось распарсить metadata webhook YooKassa: %s", metadata) + + return {} + + @staticmethod + def _extract_confirmation_url(event_object: Dict[str, Any]) -> Optional[str]: + if "confirmation_url" in event_object: + return event_object.get("confirmation_url") + + confirmation = event_object.get("confirmation") + if isinstance(confirmation, dict): + return confirmation.get("confirmation_url") or confirmation.get("return_url") + + return None + + @staticmethod + def _parse_datetime(raw_value: Optional[str]) -> Optional[datetime]: + if not raw_value: + return None + + try: + return datetime.fromisoformat(raw_value.replace("Z", "+00:00")).replace(tzinfo=None) + except Exception: + return None diff --git a/app/services/payment_service.py b/app/services/payment_service.py index 23ffb9c2..6acb83dd 100644 --- a/app/services/payment_service.py +++ b/app/services/payment_service.py @@ -40,6 +40,11 @@ async def create_yookassa_payment(*args, **kwargs): return await yk_crud.create_yookassa_payment(*args, **kwargs) +async def update_yookassa_payment_status(*args, **kwargs): + yk_crud = import_module("app.database.crud.yookassa") + return await yk_crud.update_yookassa_payment_status(*args, **kwargs) + + async def link_yookassa_payment_to_transaction(*args, **kwargs): yk_crud = import_module("app.database.crud.yookassa") return await yk_crud.link_yookassa_payment_to_transaction(*args, **kwargs) diff --git a/tests/services/test_mulenpay_service_adapter.py b/tests/services/test_mulenpay_service_adapter.py index da443375..fc2d64a5 100644 --- a/tests/services/test_mulenpay_service_adapter.py +++ b/tests/services/test_mulenpay_service_adapter.py @@ -2,8 +2,10 @@ from __future__ import annotations +import asyncio +import json from pathlib import Path -from typing import Any, Dict, Optional +from typing import Any, Dict, Optional, Sequence import sys import pytest @@ -16,6 +18,57 @@ from app.config import settings # noqa: E402 from app.services.mulenpay_service import MulenPayService # noqa: E402 +class _DummyResponse: + def __init__( + self, + *, + status: int, + body: str = "{}", + headers: Optional[Dict[str, str]] = None, + url: str = "https://mulenpay.test/endpoint", + ) -> None: + self.status = status + self._body = body + self.headers = headers or {"Content-Type": "application/json"} + self.url = url + + async def __aenter__(self) -> "_DummyResponse": + return self + + async def __aexit__(self, exc_type, exc, tb) -> bool: # pragma: no cover - interface + return False + + async def text(self) -> str: + return self._body + + +class _DummySession: + def __init__(self, result: Any) -> None: + self._result = result + + async def __aenter__(self) -> "_DummySession": + return self + + async def __aexit__(self, exc_type, exc, tb) -> bool: # pragma: no cover - interface + return False + + def request(self, *args: Any, **kwargs: Any) -> Any: + if isinstance(self._result, BaseException): + raise self._result + return self._result + + +def _session_factory(responses: Sequence[Any]) -> Any: + call_state = {"index": 0} + + def _factory(*_args: Any, **_kwargs: Any) -> _DummySession: + index = min(call_state["index"], len(responses) - 1) + call_state["index"] += 1 + return _DummySession(responses[index]) + + return _factory + + @pytest.fixture def anyio_backend() -> str: return "asyncio" @@ -105,3 +158,88 @@ async def test_get_payment(monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.setattr(service, "_request", fake_request, raising=False) result = await service.get_payment(123) assert result == {"id": 123, "status": "paid"} + + +@pytest.mark.anyio("asyncio") +async def test_request_success(monkeypatch: pytest.MonkeyPatch) -> None: + _enable_service(monkeypatch) + service = MulenPayService() + + response_payload = {"ok": True} + monkeypatch.setattr( + "app.services.mulenpay_service.aiohttp.ClientSession", + _session_factory([ + _DummyResponse(status=200, body=json.dumps(response_payload)), + ]), + ) + + result = await service._request("GET", "/ping") + assert result == response_payload + + +@pytest.mark.anyio("asyncio") +async def test_request_retries_on_server_error(monkeypatch: pytest.MonkeyPatch) -> None: + _enable_service(monkeypatch) + service = MulenPayService() + service._max_retries = 2 + + sleep_calls: list[float] = [] + + async def fake_sleep(delay: float) -> None: + sleep_calls.append(delay) + + monkeypatch.setattr( + "app.services.mulenpay_service.asyncio.sleep", + fake_sleep, + ) + + monkeypatch.setattr( + "app.services.mulenpay_service.aiohttp.ClientSession", + _session_factory( + [ + _DummyResponse(status=502, body="{\"error\": \"bad gateway\"}"), + _DummyResponse(status=200, body="{\"ok\": true}"), + ] + ), + ) + + result = await service._request("GET", "/retry") + assert result == {"ok": True} + assert sleep_calls == [service._retry_delay] + + +@pytest.mark.anyio("asyncio") +async def test_request_returns_none_after_timeouts(monkeypatch: pytest.MonkeyPatch) -> None: + _enable_service(monkeypatch) + service = MulenPayService() + service._max_retries = 2 + + async def fake_sleep(_delay: float) -> None: + return None + + monkeypatch.setattr( + "app.services.mulenpay_service.asyncio.sleep", + fake_sleep, + ) + + monkeypatch.setattr( + "app.services.mulenpay_service.aiohttp.ClientSession", + _session_factory([asyncio.TimeoutError()]), + ) + + result = await service._request("GET", "/timeout") + assert result is None + + +@pytest.mark.anyio("asyncio") +async def test_request_reraises_cancelled(monkeypatch: pytest.MonkeyPatch) -> None: + _enable_service(monkeypatch) + service = MulenPayService() + + monkeypatch.setattr( + "app.services.mulenpay_service.aiohttp.ClientSession", + _session_factory([asyncio.CancelledError()]), + ) + + with pytest.raises(asyncio.CancelledError): + await service._request("GET", "/cancel") diff --git a/tests/services/test_payment_service_webhooks.py b/tests/services/test_payment_service_webhooks.py index 7d95a0c3..8542fff3 100644 --- a/tests/services/test_payment_service_webhooks.py +++ b/tests/services/test_payment_service_webhooks.py @@ -496,6 +496,166 @@ async def test_process_yookassa_webhook_success(monkeypatch: pytest.MonkeyPatch) assert admin_calls +@pytest.mark.anyio("asyncio") +async def test_process_yookassa_webhook_restores_missing_payment( + monkeypatch: pytest.MonkeyPatch, +) -> None: + bot = DummyBot() + service = _make_service(bot) + fake_session = FakeSession() + + restored_payment = SimpleNamespace( + yookassa_payment_id="yk_456", + user_id=21, + amount_kopeks=0, + status="pending", + is_paid=False, + transaction_id=None, + description="", + payment_method_type=None, + confirmation_url=None, + metadata_json=None, + test_mode=False, + refundable=False, + ) + + get_calls = {"count": 0} + + async def fake_get_payment(db, payment_id): + get_calls["count"] += 1 + if get_calls["count"] == 1: + return None + return restored_payment + + async def fake_create_payment(**kwargs: Any): + restored_payment.user_id = kwargs["user_id"] + restored_payment.amount_kopeks = kwargs["amount_kopeks"] + restored_payment.status = kwargs["status"] + restored_payment.description = kwargs["description"] + restored_payment.payment_method_type = kwargs["payment_method_type"] + restored_payment.confirmation_url = kwargs["confirmation_url"] + restored_payment.metadata_json = kwargs["metadata_json"] + restored_payment.test_mode = kwargs["test_mode"] + restored_payment.yookassa_payment_id = kwargs["yookassa_payment_id"] + restored_payment.yookassa_created_at = kwargs["yookassa_created_at"] + return restored_payment + + async def fake_update_status( + db, + yookassa_payment_id, + status, + is_paid, + is_captured, + captured_at, + payment_method_type, + ): + restored_payment.status = status + restored_payment.is_paid = is_paid + restored_payment.is_captured = is_captured + restored_payment.captured_at = captured_at + restored_payment.payment_method_type = payment_method_type + return restored_payment + + async def fake_link(db, yookassa_payment_id, transaction_id): + restored_payment.transaction_id = transaction_id + + monkeypatch.setattr(payment_service_module, "get_yookassa_payment_by_id", fake_get_payment) + monkeypatch.setattr(payment_service_module, "create_yookassa_payment", fake_create_payment) + monkeypatch.setattr(payment_service_module, "update_yookassa_payment_status", fake_update_status) + monkeypatch.setattr(payment_service_module, "link_yookassa_payment_to_transaction", fake_link) + + transactions: list[Dict[str, Any]] = [] + + async def fake_create_transaction(db, **kwargs): + transactions.append(kwargs) + return SimpleNamespace(id=555, **kwargs) + + monkeypatch.setattr(payment_service_module, "create_transaction", fake_create_transaction) + + user = SimpleNamespace( + id=21, + telegram_id=2100, + balance_kopeks=0, + has_made_first_topup=False, + promo_group=None, + subscription=None, + referred_by_id=None, + referrer=None, + ) + + async def fake_get_user(db, user_id): + return user + + monkeypatch.setattr(payment_service_module, "get_user_by_id", fake_get_user) + monkeypatch.setattr(type(settings), "format_price", lambda self, amount: f"{amount / 100:.2f}₽", raising=False) + + referral_mock = SimpleNamespace(process_referral_topup=AsyncMock()) + monkeypatch.setitem(sys.modules, "app.services.referral_service", referral_mock) + + admin_calls: list[Any] = [] + + class DummyAdminService: + def __init__(self, bot): + self.bot = bot + + async def send_balance_topup_notification(self, *args, **kwargs): + admin_calls.append((args, kwargs)) + + monkeypatch.setitem(sys.modules, "app.services.admin_notification_service", SimpleNamespace(AdminNotificationService=lambda bot: DummyAdminService(bot))) + service.build_topup_success_keyboard = AsyncMock(return_value=None) + + payload = { + "object": { + "id": "yk_456", + "status": "succeeded", + "paid": True, + "amount": {"value": "150.00", "currency": "RUB"}, + "metadata": {"user_id": "21", "payment_purpose": "balance_topup"}, + "description": "Пополнение", + "payment_method": {"type": "bank_card"}, + "created_at": "2024-01-02T12:00:00Z", + "captured_at": "2024-01-02T12:05:00Z", + "confirmation": {"confirmation_url": "https://pay.example"}, + } + } + + result = await service.process_yookassa_webhook(fake_session, payload) + + assert result is True + assert get_calls["count"] >= 2 # повторный запрос после восстановления + assert restored_payment.amount_kopeks == 15000 + assert restored_payment.is_paid is True + assert transactions and transactions[0]["amount_kopeks"] == 15000 + assert restored_payment.transaction_id == 555 + assert user.balance_kopeks == 15000 + assert bot.sent_messages + assert admin_calls + + +@pytest.mark.anyio("asyncio") +async def test_process_yookassa_webhook_missing_metadata(monkeypatch: pytest.MonkeyPatch) -> None: + service = _make_service(DummyBot()) + db = FakeSession() + + async def fake_get_payment(db_session, payment_id): + return None + + create_mock = AsyncMock() + update_mock = AsyncMock() + + monkeypatch.setattr(payment_service_module, "get_yookassa_payment_by_id", fake_get_payment) + monkeypatch.setattr(payment_service_module, "create_yookassa_payment", create_mock) + monkeypatch.setattr(payment_service_module, "update_yookassa_payment_status", update_mock) + + payload = {"object": {"id": "yk_missing", "status": "succeeded", "paid": True}} + + result = await service.process_yookassa_webhook(db, payload) + + assert result is False + create_mock.assert_not_awaited() + update_mock.assert_not_awaited() + + @pytest.mark.anyio("asyncio") async def test_process_yookassa_webhook_missing_id(monkeypatch: pytest.MonkeyPatch) -> None: bot = DummyBot()