From 84cfcf45fadda00fd32dde7c12e5e975c552e47b Mon Sep 17 00:00:00 2001 From: Egor Date: Sun, 26 Oct 2025 06:04:16 +0300 Subject: [PATCH] Improve MulenPay hook resilience --- app/services/mulenpay_service.py | 136 ++++++++++++++--- .../services/test_mulenpay_service_adapter.py | 140 +++++++++++++++++- 2 files changed, 252 insertions(+), 24 deletions(-) diff --git a/app/services/mulenpay_service.py b/app/services/mulenpay_service.py index afcb44d9..6cfbebf1 100644 --- a/app/services/mulenpay_service.py +++ b/app/services/mulenpay_service.py @@ -1,6 +1,8 @@ +import asyncio import hashlib +import json import logging -from typing import Optional, Dict, Any +from typing import Optional, Dict, Any, Tuple import aiohttp @@ -17,6 +19,10 @@ class MulenPayService: self.shop_id = settings.MULENPAY_SHOP_ID self.secret_key = settings.MULENPAY_SECRET_KEY self.base_url = settings.MULENPAY_BASE_URL.rstrip("/") + self._timeout = aiohttp.ClientTimeout(total=30, connect=10, sock_read=25) + self._max_retries = 3 + self._retry_delay = 0.5 + self._retryable_statuses = {500, 502, 503, 504} @property def is_configured(self) -> bool: @@ -45,31 +51,115 @@ class MulenPayService: "Content-Type": "application/json", } - try: - timeout = aiohttp.ClientTimeout(total=30) - async with aiohttp.ClientSession(timeout=timeout) as session: - async with session.request( + last_error: Optional[BaseException] = None + + for attempt in range(1, self._max_retries + 1): + try: + async with aiohttp.ClientSession(timeout=self._timeout) as session: + async with session.request( + method, + url, + headers=headers, + json=json_data, + params=params, + ) as response: + data, raw_text = await self._deserialize_response(response) + + if response.status >= 400: + logger.error( + "MulenPay API error %s %s: %s", + response.status, + endpoint, + raw_text, + ) + if ( + response.status in self._retryable_statuses + and attempt < self._max_retries + ): + await self._sleep_with_backoff(attempt) + continue + return None + + if data is None: + if raw_text: + logger.warning( + "MulenPay returned unexpected payload for %s: %s", + endpoint, + raw_text, + ) + return None + + return data + except asyncio.CancelledError: + logger.debug("MulenPay request cancelled: %s %s", method, endpoint) + raise + except asyncio.TimeoutError as error: + last_error = error + logger.warning( + "MulenPay request timeout (%s %s) attempt %s/%s", method, - url, - headers=headers, - json=json_data, - params=params, - ) as response: - data = await response.json(content_type=None) + endpoint, + attempt, + self._max_retries, + ) + except aiohttp.ClientError as error: + last_error = error + logger.warning( + "MulenPay client error (%s %s) attempt %s/%s: %s", + method, + endpoint, + attempt, + self._max_retries, + error, + ) + except Exception as error: # pragma: no cover - safety + logger.error("Unexpected MulenPay error: %s", error, exc_info=True) + return None - if response.status >= 400: - logger.error( - "MulenPay API error %s %s: %s", response.status, endpoint, data - ) - return None + if attempt < self._max_retries: + await self._sleep_with_backoff(attempt) - return data - except aiohttp.ClientError as error: - logger.error("MulenPay API request error: %s", error) - return None - except Exception as error: # pragma: no cover - safety - logger.error("Unexpected MulenPay error: %s", error, exc_info=True) - return None + if isinstance(last_error, asyncio.TimeoutError): + logger.error( + "MulenPay request timed out after %s attempts: %s %s", + self._max_retries, + method, + endpoint, + ) + elif last_error is not None: + logger.error( + "MulenPay request failed after %s attempts (%s %s): %s", + self._max_retries, + method, + endpoint, + last_error, + ) + + return None + + async def _sleep_with_backoff(self, attempt: int) -> None: + await asyncio.sleep(self._retry_delay * attempt) + + async def _deserialize_response( + self, response: aiohttp.ClientResponse + ) -> Tuple[Optional[Dict[str, Any]], str]: + raw_text = await response.text() + if not raw_text: + return None, "" + + content_type = response.headers.get("Content-Type", "") + if "json" in content_type.lower() or not content_type: + try: + return json.loads(raw_text), raw_text + except json.JSONDecodeError as error: + logger.error( + "Failed to decode MulenPay JSON response %s: %s", + response.url, + error, + ) + return None, raw_text + + return None, raw_text @staticmethod def _format_amount(amount_kopeks: int) -> str: diff --git a/tests/services/test_mulenpay_service_adapter.py b/tests/services/test_mulenpay_service_adapter.py index da443375..fc2d64a5 100644 --- a/tests/services/test_mulenpay_service_adapter.py +++ b/tests/services/test_mulenpay_service_adapter.py @@ -2,8 +2,10 @@ from __future__ import annotations +import asyncio +import json from pathlib import Path -from typing import Any, Dict, Optional +from typing import Any, Dict, Optional, Sequence import sys import pytest @@ -16,6 +18,57 @@ from app.config import settings # noqa: E402 from app.services.mulenpay_service import MulenPayService # noqa: E402 +class _DummyResponse: + def __init__( + self, + *, + status: int, + body: str = "{}", + headers: Optional[Dict[str, str]] = None, + url: str = "https://mulenpay.test/endpoint", + ) -> None: + self.status = status + self._body = body + self.headers = headers or {"Content-Type": "application/json"} + self.url = url + + async def __aenter__(self) -> "_DummyResponse": + return self + + async def __aexit__(self, exc_type, exc, tb) -> bool: # pragma: no cover - interface + return False + + async def text(self) -> str: + return self._body + + +class _DummySession: + def __init__(self, result: Any) -> None: + self._result = result + + async def __aenter__(self) -> "_DummySession": + return self + + async def __aexit__(self, exc_type, exc, tb) -> bool: # pragma: no cover - interface + return False + + def request(self, *args: Any, **kwargs: Any) -> Any: + if isinstance(self._result, BaseException): + raise self._result + return self._result + + +def _session_factory(responses: Sequence[Any]) -> Any: + call_state = {"index": 0} + + def _factory(*_args: Any, **_kwargs: Any) -> _DummySession: + index = min(call_state["index"], len(responses) - 1) + call_state["index"] += 1 + return _DummySession(responses[index]) + + return _factory + + @pytest.fixture def anyio_backend() -> str: return "asyncio" @@ -105,3 +158,88 @@ async def test_get_payment(monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.setattr(service, "_request", fake_request, raising=False) result = await service.get_payment(123) assert result == {"id": 123, "status": "paid"} + + +@pytest.mark.anyio("asyncio") +async def test_request_success(monkeypatch: pytest.MonkeyPatch) -> None: + _enable_service(monkeypatch) + service = MulenPayService() + + response_payload = {"ok": True} + monkeypatch.setattr( + "app.services.mulenpay_service.aiohttp.ClientSession", + _session_factory([ + _DummyResponse(status=200, body=json.dumps(response_payload)), + ]), + ) + + result = await service._request("GET", "/ping") + assert result == response_payload + + +@pytest.mark.anyio("asyncio") +async def test_request_retries_on_server_error(monkeypatch: pytest.MonkeyPatch) -> None: + _enable_service(monkeypatch) + service = MulenPayService() + service._max_retries = 2 + + sleep_calls: list[float] = [] + + async def fake_sleep(delay: float) -> None: + sleep_calls.append(delay) + + monkeypatch.setattr( + "app.services.mulenpay_service.asyncio.sleep", + fake_sleep, + ) + + monkeypatch.setattr( + "app.services.mulenpay_service.aiohttp.ClientSession", + _session_factory( + [ + _DummyResponse(status=502, body="{\"error\": \"bad gateway\"}"), + _DummyResponse(status=200, body="{\"ok\": true}"), + ] + ), + ) + + result = await service._request("GET", "/retry") + assert result == {"ok": True} + assert sleep_calls == [service._retry_delay] + + +@pytest.mark.anyio("asyncio") +async def test_request_returns_none_after_timeouts(monkeypatch: pytest.MonkeyPatch) -> None: + _enable_service(monkeypatch) + service = MulenPayService() + service._max_retries = 2 + + async def fake_sleep(_delay: float) -> None: + return None + + monkeypatch.setattr( + "app.services.mulenpay_service.asyncio.sleep", + fake_sleep, + ) + + monkeypatch.setattr( + "app.services.mulenpay_service.aiohttp.ClientSession", + _session_factory([asyncio.TimeoutError()]), + ) + + result = await service._request("GET", "/timeout") + assert result is None + + +@pytest.mark.anyio("asyncio") +async def test_request_reraises_cancelled(monkeypatch: pytest.MonkeyPatch) -> None: + _enable_service(monkeypatch) + service = MulenPayService() + + monkeypatch.setattr( + "app.services.mulenpay_service.aiohttp.ClientSession", + _session_factory([asyncio.CancelledError()]), + ) + + with pytest.raises(asyncio.CancelledError): + await service._request("GET", "/cancel")