fix: allow trusted public proxies for YooKassa webhooks

This commit is contained in:
Egor
2025-11-07 10:43:19 +03:00
parent f7a4cedbbf
commit 45bd54e7b0
5 changed files with 728 additions and 39 deletions

View File

@@ -188,6 +188,7 @@ class Settings(BaseSettings):
YOOKASSA_WEBHOOK_HOST: str = "0.0.0.0"
YOOKASSA_WEBHOOK_PORT: int = 8082
YOOKASSA_WEBHOOK_SECRET: Optional[str] = None
YOOKASSA_TRUSTED_PROXY_NETWORKS: str = ""
YOOKASSA_MIN_AMOUNT_KOPEKS: int = 5000
YOOKASSA_MAX_AMOUNT_KOPEKS: int = 1000000
YOOKASSA_QUICK_AMOUNT_SELECTION_ENABLED: bool = False

View File

@@ -4,9 +4,16 @@ import json
import hashlib
import hmac
import base64
from typing import Optional, Dict, Any
from ipaddress import (
IPv4Address,
IPv4Network,
IPv6Address,
IPv6Network,
ip_address,
ip_network,
)
from typing import Iterable, Optional, Dict, Any, List, Union, Tuple
from aiohttp import web
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import settings
from app.services.payment_service import PaymentService
@@ -15,6 +22,146 @@ from app.database.database import get_db
logger = logging.getLogger(__name__)
IPAddress = Union[IPv4Address, IPv6Address]
IPNetwork = Union[IPv4Network, IPv6Network]
YOOKASSA_ALLOWED_IP_NETWORKS: tuple[IPNetwork, ...] = (
ip_network("185.71.76.0/27"),
ip_network("185.71.77.0/27"),
ip_network("77.75.153.0/25"),
ip_network("77.75.154.128/25"),
ip_network("77.75.156.11/32"),
ip_network("77.75.156.35/32"),
ip_network("2a02:5180::/32"),
)
def collect_yookassa_ip_candidates(*values: Optional[str]) -> List[str]:
candidates: List[str] = []
for value in values:
if not value:
continue
for part in value.split(","):
normalized = part.strip()
if normalized:
candidates.append(normalized)
return candidates
def _parse_candidate_ip(candidate: str) -> Optional[IPAddress]:
value = candidate.strip()
if not value:
return None
if value.startswith("[") and "]" in value:
value = value[1:value.index("]")]
if "%" in value:
value = value.split("%", 1)[0]
if value.count(":") == 1 and "." in value:
host, _, port = value.rpartition(":")
if port.isdigit():
value = host
try:
return ip_address(value)
except ValueError:
return None
def _should_trust_forwarded_headers(remote_ip: Optional[IPAddress]) -> bool:
if remote_ip is None:
return True
if _is_trusted_proxy_ip(remote_ip):
return True
return any(
getattr(remote_ip, attribute)
for attribute in ("is_private", "is_loopback", "is_link_local", "is_reserved")
)
_TRUSTED_PROXY_NETWORKS_CACHE: Tuple[str, Tuple[IPNetwork, ...]] = ("", ())
def _get_trusted_proxy_networks() -> Tuple[IPNetwork, ...]:
global _TRUSTED_PROXY_NETWORKS_CACHE
raw_value = getattr(settings, "YOOKASSA_TRUSTED_PROXY_NETWORKS", "") or ""
cached_raw, cached_networks = _TRUSTED_PROXY_NETWORKS_CACHE
if raw_value == cached_raw:
return cached_networks
networks: List[IPNetwork] = []
for part in raw_value.split(","):
candidate = part.strip()
if not candidate:
continue
try:
networks.append(ip_network(candidate, strict=False))
except ValueError:
logger.warning("Неверная сеть доверенного прокси YooKassa: %s", candidate)
cached_networks = tuple(networks)
_TRUSTED_PROXY_NETWORKS_CACHE = (raw_value, cached_networks)
return cached_networks
def _is_trusted_proxy_ip(ip_object: IPAddress) -> bool:
if any(
getattr(ip_object, attribute)
for attribute in ("is_private", "is_loopback", "is_link_local", "is_reserved")
):
return True
return any(ip_object in network for network in _get_trusted_proxy_networks())
def resolve_yookassa_ip(
candidates: Iterable[str],
*,
remote: Optional[str] = None,
) -> Optional[IPAddress]:
remote_ip = _parse_candidate_ip(remote) if remote else None
if (
remote_ip is not None
and remote_ip.is_global
and not _is_trusted_proxy_ip(remote_ip)
):
return remote_ip
candidate_list = list(candidates)
if _should_trust_forwarded_headers(remote_ip):
last_hop = remote_ip
for candidate in reversed(candidate_list):
ip_object = _parse_candidate_ip(candidate)
if ip_object is not None:
if last_hop is None or _is_trusted_proxy_ip(last_hop):
if _is_trusted_proxy_ip(ip_object):
last_hop = ip_object
continue
return ip_object
break
if last_hop is not None and not _is_trusted_proxy_ip(last_hop):
return last_hop
return remote_ip if remote_ip is not None else next(
(ip for ip in (_parse_candidate_ip(value) for value in candidate_list) if ip is not None),
None,
)
def is_yookassa_ip_allowed(ip_object: IPAddress) -> bool:
return any(ip_object in network for network in YOOKASSA_ALLOWED_IP_NETWORKS)
class YooKassaWebhookHandler:
@staticmethod
@@ -87,38 +234,66 @@ class YooKassaWebhookHandler:
self.payment_service = payment_service
async def handle_webhook(self, request: web.Request) -> web.Response:
try:
logger.info(f"📥 Получен YooKassa webhook: {request.method} {request.path}")
logger.info(f"📋 Headers: {dict(request.headers)}")
header_ip_candidates = collect_yookassa_ip_candidates(
request.headers.get("X-Forwarded-For"),
request.headers.get("X-Real-IP"),
)
client_ip = resolve_yookassa_ip(
header_ip_candidates,
remote=request.remote,
)
if client_ip is None:
logger.warning(
"🚫 Не удалось определить IP-адрес отправителя YooKassa webhook. Кандидаты: %s",
header_ip_candidates + ([request.remote] if request.remote else []),
)
return web.Response(status=403, text="Forbidden")
if not is_yookassa_ip_allowed(client_ip):
logger.warning(
"🚫 YooKassa webhook отклонён: IP %s не входит в доверенные диапазоны (%s)",
client_ip,
", ".join(str(network) for network in YOOKASSA_ALLOWED_IP_NETWORKS),
)
return web.Response(status=403, text="Forbidden")
logger.info("🌐 IP-адрес YooKassa подтверждён: %s", client_ip)
body = await request.text()
if not body:
logger.warning("⚠️ Получен пустой webhook от YooKassa")
return web.Response(status=400, text="Empty body")
logger.info(f"📄 Body: {body}")
signature = request.headers.get('Signature') or request.headers.get('X-YooKassa-Signature')
if settings.YOOKASSA_WEBHOOK_SECRET and signature:
if settings.YOOKASSA_WEBHOOK_SECRET:
if not signature:
logger.warning("⚠️ Webhook без подписи, но секрет настроен")
return web.Response(status=401, text="Missing signature")
logger.info(f"🔐 Получена подпись: {signature}")
if not YooKassaWebhookHandler.verify_webhook_signature(body, signature, settings.YOOKASSA_WEBHOOK_SECRET):
logger.warning("❌ Подпись не совпала, но продолжаем обработку (режим отладки)")
else:
logger.info("✅ Подпись webhook проверена успешно")
elif settings.YOOKASSA_WEBHOOK_SECRET and not signature:
logger.warning("⚠️ Webhook без подписи, но секрет настроен")
elif signature and not settings.YOOKASSA_WEBHOOK_SECRET:
if not YooKassaWebhookHandler.verify_webhook_signature(
body,
signature,
settings.YOOKASSA_WEBHOOK_SECRET,
):
logger.warning("❌ Неверная подпись YooKassa webhook")
return web.Response(status=401, text="Invalid signature")
elif signature:
logger.info(" Подпись получена, но проверка отключена (YOOKASSA_WEBHOOK_SECRET не настроен)")
else:
logger.info(" Проверка подписи отключена")
try:
webhook_data = json.loads(body)
except json.JSONDecodeError as e:

View File

@@ -15,7 +15,7 @@ from aiogram import Bot
from app.config import settings
from app.database.database import get_db
from app.external.tribute import TributeService as TributeAPI
from app.external.yookassa_webhook import YooKassaWebhookHandler
from app.external import yookassa_webhook as yookassa_webhook_module
from app.external.wata_webhook import WataWebhookHandler
from app.external.heleket_webhook import HeleketWebhookHandler
from app.external.pal24_client import Pal24APIError
@@ -322,7 +322,6 @@ def create_payment_router(bot: Bot, payment_service: PaymentService) -> APIRoute
routes_registered = True
if settings.is_yookassa_enabled():
yookassa_secret = settings.YOOKASSA_WEBHOOK_SECRET or ""
@router.options(settings.YOOKASSA_WEBHOOK_PATH)
async def yookassa_options() -> Response:
@@ -347,6 +346,36 @@ def create_payment_router(bot: Bot, payment_service: PaymentService) -> APIRoute
@router.post(settings.YOOKASSA_WEBHOOK_PATH)
async def yookassa_webhook(request: Request) -> JSONResponse:
header_ip_candidates = yookassa_webhook_module.collect_yookassa_ip_candidates(
request.headers.get("X-Forwarded-For"),
request.headers.get("X-Real-IP"),
)
remote_ip = request.client.host if request.client else None
client_ip = yookassa_webhook_module.resolve_yookassa_ip(
header_ip_candidates,
remote=remote_ip,
)
if client_ip is None:
return JSONResponse(
{
"status": "error",
"reason": "unknown_ip",
"candidates": header_ip_candidates + ([remote_ip] if remote_ip else []),
},
status_code=status.HTTP_403_FORBIDDEN,
)
if not yookassa_webhook_module.is_yookassa_ip_allowed(client_ip):
return JSONResponse(
{
"status": "error",
"reason": "forbidden_ip",
"ip": str(client_ip),
},
status_code=status.HTTP_403_FORBIDDEN,
)
body_bytes = await request.body()
if not body_bytes:
return JSONResponse({"status": "error", "reason": "empty_body"}, status_code=status.HTTP_400_BAD_REQUEST)
@@ -354,18 +383,27 @@ def create_payment_router(bot: Bot, payment_service: PaymentService) -> APIRoute
body = body_bytes.decode("utf-8")
signature = request.headers.get("Signature") or request.headers.get("X-YooKassa-Signature")
if yookassa_secret:
if settings.YOOKASSA_WEBHOOK_SECRET:
if not signature:
logger.warning("⚠️ YooKassa webhook без подписи при настроенном секрете")
return JSONResponse(
{"status": "error", "reason": "missing_signature"},
status_code=status.HTTP_401_UNAUTHORIZED,
)
if not YooKassaWebhookHandler.verify_webhook_signature(body, signature, yookassa_secret):
if not yookassa_webhook_module.YooKassaWebhookHandler.verify_webhook_signature(
body,
signature,
settings.YOOKASSA_WEBHOOK_SECRET,
):
logger.warning("❌ Неверная подпись YooKassa webhook")
return JSONResponse(
{"status": "error", "reason": "invalid_signature"},
status_code=status.HTTP_401_UNAUTHORIZED,
)
elif signature:
logger.info(" Получена подпись YooKassa, но проверка отключена (YOOKASSA_WEBHOOK_SECRET не настроен)")
try:
webhook_data = json.loads(body)

191
tests/external/test_yookassa_webhook.py vendored Normal file
View File

@@ -0,0 +1,191 @@
import base64
import hashlib
import hmac
import json
from types import SimpleNamespace
from unittest.mock import AsyncMock
import pytest
from aiohttp import web
from aiohttp.test_utils import TestClient, TestServer
from app.config import settings
from app.external.yookassa_webhook import (
create_yookassa_webhook_app,
resolve_yookassa_ip,
)
ALLOWED_IP = "185.71.76.10"
class DummyDB:
async def close(self) -> None: # pragma: no cover - simple stub
pass
def _generate_signature(body: str, secret: str) -> str:
payment_id = "test-payment"
timestamp = "2024-01-01T00:00:00.000Z"
payload = f"{payment_id}.{timestamp}.{body}".encode("utf-8")
digest = hmac.new(secret.encode("utf-8"), payload, hashlib.sha256).digest()
return f"v1 {payment_id} {timestamp} {base64.b64encode(digest).decode('utf-8')}"
@pytest.fixture(autouse=True)
def configure_settings(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(settings, "YOOKASSA_ENABLED", True, raising=False)
monkeypatch.setattr(settings, "YOOKASSA_SHOP_ID", "shop", raising=False)
monkeypatch.setattr(settings, "YOOKASSA_SECRET_KEY", "key", raising=False)
monkeypatch.setattr(settings, "YOOKASSA_WEBHOOK_SECRET", "secret", raising=False)
monkeypatch.setattr(settings, "YOOKASSA_WEBHOOK_PATH", "/yookassa-webhook", raising=False)
monkeypatch.setattr(settings, "YOOKASSA_TRUSTED_PROXY_NETWORKS", "", raising=False)
def _build_headers(**overrides: str) -> dict[str, str]:
headers = {
"Content-Type": "application/json",
"X-Forwarded-For": ALLOWED_IP,
}
headers.update(overrides)
return headers
@pytest.mark.parametrize(
("remote", "expected"),
(
("185.71.76.10", "185.71.76.10"),
("8.8.8.8", "8.8.8.8"),
("10.0.0.5", "185.71.76.10"),
(None, "185.71.76.10"),
),
)
def test_resolve_yookassa_ip_trust_rules(remote: str | None, expected: str) -> None:
candidates = [ALLOWED_IP]
ip_object = resolve_yookassa_ip(candidates, remote=remote)
assert ip_object is not None
assert str(ip_object) == expected
def test_resolve_yookassa_ip_prefers_last_forwarded_candidate() -> None:
candidates = ["185.71.76.10", "8.8.8.8"]
ip_object = resolve_yookassa_ip(candidates, remote="10.0.0.5")
assert ip_object is not None
assert str(ip_object) == "8.8.8.8"
def test_resolve_yookassa_ip_accepts_allowed_last_forwarded_candidate() -> None:
candidates = ["8.8.8.8", ALLOWED_IP]
ip_object = resolve_yookassa_ip(candidates, remote="10.0.0.5")
assert ip_object is not None
assert str(ip_object) == ALLOWED_IP
def test_resolve_yookassa_ip_skips_trusted_proxy_hops(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(settings, "YOOKASSA_TRUSTED_PROXY_NETWORKS", "203.0.113.0/24", raising=False)
candidates = [ALLOWED_IP, "203.0.113.10"]
ip_object = resolve_yookassa_ip(candidates, remote="10.0.0.5")
assert ip_object is not None
assert str(ip_object) == ALLOWED_IP
def test_resolve_yookassa_ip_trusted_public_proxy(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(settings, "YOOKASSA_TRUSTED_PROXY_NETWORKS", "198.51.100.0/24", raising=False)
candidates = [ALLOWED_IP, "198.51.100.10"]
ip_object = resolve_yookassa_ip(candidates, remote="198.51.100.20")
assert ip_object is not None
assert str(ip_object) == ALLOWED_IP
def test_resolve_yookassa_ip_returns_none_when_no_candidates() -> None:
assert resolve_yookassa_ip([], remote=None) is None
async def _post_webhook(client: TestClient, payload: dict, **headers: str) -> web.Response:
body = json.dumps(payload, ensure_ascii=False)
return await client.post(
settings.YOOKASSA_WEBHOOK_PATH,
data=body.encode("utf-8"),
headers=_build_headers(**headers),
)
def _patch_get_db(monkeypatch: pytest.MonkeyPatch) -> None:
async def fake_get_db():
yield DummyDB()
monkeypatch.setattr("app.external.yookassa_webhook.get_db", fake_get_db)
@pytest.mark.asyncio
async def test_handle_webhook_missing_signature(monkeypatch: pytest.MonkeyPatch) -> None:
_patch_get_db(monkeypatch)
service = SimpleNamespace(process_yookassa_webhook=AsyncMock())
app = create_yookassa_webhook_app(service)
async with TestClient(TestServer(app)) as client:
response = await _post_webhook(client, {"event": "payment.succeeded"})
status = response.status
body = await response.text()
assert status == 401
assert body == "Missing signature"
service.process_yookassa_webhook.assert_not_awaited()
@pytest.mark.asyncio
async def test_handle_webhook_invalid_signature(monkeypatch: pytest.MonkeyPatch) -> None:
_patch_get_db(monkeypatch)
service = SimpleNamespace(process_yookassa_webhook=AsyncMock())
app = create_yookassa_webhook_app(service)
async with TestClient(TestServer(app)) as client:
response = await _post_webhook(
client,
{"event": "payment.succeeded"},
Signature="v1 test-payment 2024-01-01T00:00:00.000Z invalid",
)
status = response.status
body = await response.text()
assert status == 401
assert body == "Invalid signature"
service.process_yookassa_webhook.assert_not_awaited()
@pytest.mark.asyncio
async def test_handle_webhook_valid_signature(monkeypatch: pytest.MonkeyPatch) -> None:
_patch_get_db(monkeypatch)
process_mock = AsyncMock(return_value=True)
service = SimpleNamespace(process_yookassa_webhook=process_mock)
app = create_yookassa_webhook_app(service)
async with TestClient(TestServer(app)) as client:
payload = {"event": "payment.succeeded"}
body = json.dumps(payload, ensure_ascii=False)
signature = _generate_signature(body, settings.YOOKASSA_WEBHOOK_SECRET)
response = await client.post(
settings.YOOKASSA_WEBHOOK_PATH,
data=body.encode("utf-8"),
headers=_build_headers(Signature=signature),
)
status = response.status
text = await response.text()
assert status == 200
assert text == "OK"
process_mock.assert_awaited_once()

View File

@@ -1,3 +1,6 @@
import base64
import hashlib
import hmac
import json
from types import SimpleNamespace
from unittest.mock import AsyncMock
@@ -13,6 +16,14 @@ class DummyBot:
pass
def _generate_yookassa_signature(body: str, secret: str) -> str:
payment_id = "test-payment"
timestamp = "2024-01-01T00:00:00.000Z"
payload = f"{payment_id}.{timestamp}.{body}".encode("utf-8")
digest = hmac.new(secret.encode("utf-8"), payload, hashlib.sha256).digest()
return f"v1 {payment_id} {timestamp} {base64.b64encode(digest).decode('utf-8')}"
@pytest.fixture(autouse=True)
def reset_settings(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(settings, "TRIBUTE_ENABLED", False, raising=False)
@@ -28,6 +39,7 @@ def reset_settings(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(settings, "YOOKASSA_WEBHOOK_SECRET", None, raising=False)
monkeypatch.setattr(settings, "YOOKASSA_SHOP_ID", "shop", raising=False)
monkeypatch.setattr(settings, "YOOKASSA_SECRET_KEY", "key", raising=False)
monkeypatch.setattr(settings, "YOOKASSA_TRUSTED_PROXY_NETWORKS", "", raising=False)
monkeypatch.setattr(settings, "WEBHOOK_URL", "http://test", raising=False)
@@ -38,7 +50,12 @@ def _get_route(router, path: str, method: str = "POST"):
raise AssertionError(f"Route {path} with method {method} not found")
def _build_request(path: str, body: bytes, headers: dict[str, str]) -> Request:
def _build_request(
path: str,
body: bytes,
headers: dict[str, str],
client_ip: str | None = "185.71.76.1",
) -> Request:
scope = {
"type": "http",
"asgi": {"version": "3.0"},
@@ -47,6 +64,9 @@ def _build_request(path: str, body: bytes, headers: dict[str, str]) -> Request:
"headers": [(k.lower().encode("latin-1"), v.encode("latin-1")) for k, v in headers.items()],
}
if client_ip is not None:
scope["client"] = (client_ip, 12345)
async def receive() -> dict:
return {"type": "http.request", "body": body, "more_body": False}
@@ -92,38 +112,242 @@ async def test_tribute_webhook_success(monkeypatch: pytest.MonkeyPatch) -> None:
@pytest.mark.anyio
async def test_yookassa_invalid_signature(monkeypatch: pytest.MonkeyPatch) -> None:
async def test_yookassa_unknown_ip(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(settings, "YOOKASSA_ENABLED", True, raising=False)
monkeypatch.setattr(settings, "YOOKASSA_WEBHOOK_SECRET", "secret", raising=False)
class StubHandler:
@staticmethod
def verify_webhook_signature(body: str, signature: str, secret: str) -> bool: # noqa: D401
return False
service = SimpleNamespace(process_yookassa_webhook=AsyncMock())
monkeypatch.setattr("app.webserver.payments.YooKassaWebhookHandler", StubHandler)
router = create_payment_router(DummyBot(), SimpleNamespace())
router = create_payment_router(DummyBot(), service)
assert router is not None
route = _get_route(router, settings.YOOKASSA_WEBHOOK_PATH)
request = _build_request(
settings.YOOKASSA_WEBHOOK_PATH,
body=json.dumps({"event": "payment.succeeded"}).encode("utf-8"),
headers={"Signature": "bad"},
headers={},
client_ip=None,
)
response = await route.endpoint(request)
assert response.status_code == 401
assert response.status_code == 403
payload = json.loads(response.body.decode("utf-8"))
assert payload["reason"] == "unknown_ip"
service.process_yookassa_webhook.assert_not_awaited()
@pytest.mark.anyio
async def test_yookassa_missing_signature(monkeypatch: pytest.MonkeyPatch) -> None:
async def test_yookassa_forbidden_ip(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(settings, "YOOKASSA_ENABLED", True, raising=False)
service = SimpleNamespace(process_yookassa_webhook=AsyncMock())
router = create_payment_router(DummyBot(), service)
assert router is not None
route = _get_route(router, settings.YOOKASSA_WEBHOOK_PATH)
request = _build_request(
settings.YOOKASSA_WEBHOOK_PATH,
body=json.dumps({"event": "payment.succeeded"}).encode("utf-8"),
headers={},
client_ip="8.8.8.8",
)
response = await route.endpoint(request)
assert response.status_code == 403
payload = json.loads(response.body.decode("utf-8"))
assert payload["reason"] == "forbidden_ip"
assert payload["ip"] == "8.8.8.8"
service.process_yookassa_webhook.assert_not_awaited()
@pytest.mark.anyio
async def test_yookassa_forbidden_ip_ignores_spoofed_header(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(settings, "YOOKASSA_ENABLED", True, raising=False)
service = SimpleNamespace(process_yookassa_webhook=AsyncMock())
router = create_payment_router(DummyBot(), service)
assert router is not None
route = _get_route(router, settings.YOOKASSA_WEBHOOK_PATH)
request = _build_request(
settings.YOOKASSA_WEBHOOK_PATH,
body=json.dumps({"event": "payment.succeeded"}).encode("utf-8"),
headers={"X-Forwarded-For": "185.71.76.10"},
client_ip="8.8.8.8",
)
response = await route.endpoint(request)
assert response.status_code == 403
payload = json.loads(response.body.decode("utf-8"))
assert payload["reason"] == "forbidden_ip"
assert payload["ip"] == "8.8.8.8"
service.process_yookassa_webhook.assert_not_awaited()
@pytest.mark.anyio
async def test_yookassa_forbidden_ip_ignores_spoofed_forwarded_chain(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(settings, "YOOKASSA_ENABLED", True, raising=False)
service = SimpleNamespace(process_yookassa_webhook=AsyncMock())
router = create_payment_router(DummyBot(), service)
assert router is not None
route = _get_route(router, settings.YOOKASSA_WEBHOOK_PATH)
request = _build_request(
settings.YOOKASSA_WEBHOOK_PATH,
body=json.dumps({"event": "payment.succeeded"}).encode("utf-8"),
headers={"X-Forwarded-For": "185.71.76.10, 8.8.8.8"},
client_ip="10.0.0.5",
)
response = await route.endpoint(request)
assert response.status_code == 403
payload = json.loads(response.body.decode("utf-8"))
assert payload["reason"] == "forbidden_ip"
assert payload["ip"] == "8.8.8.8"
service.process_yookassa_webhook.assert_not_awaited()
@pytest.mark.anyio
async def test_yookassa_allowed_ip(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(settings, "YOOKASSA_ENABLED", True, raising=False)
async def fake_get_db():
yield SimpleNamespace()
monkeypatch.setattr("app.webserver.payments.get_db", fake_get_db)
process_mock = AsyncMock(return_value=True)
service = SimpleNamespace(process_yookassa_webhook=process_mock)
router = create_payment_router(DummyBot(), service)
assert router is not None
route = _get_route(router, settings.YOOKASSA_WEBHOOK_PATH)
request = _build_request(
settings.YOOKASSA_WEBHOOK_PATH,
body=json.dumps({"event": "payment.succeeded"}).encode("utf-8"),
headers={},
client_ip="185.71.76.10",
)
response = await route.endpoint(request)
assert response.status_code == 200
payload = json.loads(response.body.decode("utf-8"))
assert payload["status"] == "ok"
process_mock.assert_awaited_once()
@pytest.mark.anyio
async def test_yookassa_allowed_via_forwarded_header_when_proxy(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(settings, "YOOKASSA_ENABLED", True, raising=False)
async def fake_get_db():
yield SimpleNamespace()
monkeypatch.setattr("app.webserver.payments.get_db", fake_get_db)
process_mock = AsyncMock(return_value=True)
service = SimpleNamespace(process_yookassa_webhook=process_mock)
router = create_payment_router(DummyBot(), service)
assert router is not None
route = _get_route(router, settings.YOOKASSA_WEBHOOK_PATH)
request = _build_request(
settings.YOOKASSA_WEBHOOK_PATH,
body=json.dumps({"event": "payment.succeeded"}).encode("utf-8"),
headers={"X-Forwarded-For": "185.71.76.10"},
client_ip="10.0.0.5",
)
response = await route.endpoint(request)
assert response.status_code == 200
payload = json.loads(response.body.decode("utf-8"))
assert payload["status"] == "ok"
process_mock.assert_awaited_once()
@pytest.mark.anyio
async def test_yookassa_allowed_via_trusted_forwarded_chain(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(settings, "YOOKASSA_ENABLED", True, raising=False)
monkeypatch.setattr(settings, "YOOKASSA_TRUSTED_PROXY_NETWORKS", "203.0.113.0/24", raising=False)
async def fake_get_db():
yield SimpleNamespace()
monkeypatch.setattr("app.webserver.payments.get_db", fake_get_db)
process_mock = AsyncMock(return_value=True)
service = SimpleNamespace(process_yookassa_webhook=process_mock)
router = create_payment_router(DummyBot(), service)
assert router is not None
route = _get_route(router, settings.YOOKASSA_WEBHOOK_PATH)
request = _build_request(
settings.YOOKASSA_WEBHOOK_PATH,
body=json.dumps({"event": "payment.succeeded"}).encode("utf-8"),
headers={"X-Forwarded-For": "185.71.76.10, 203.0.113.10"},
client_ip="10.0.0.5",
)
response = await route.endpoint(request)
assert response.status_code == 200
payload = json.loads(response.body.decode("utf-8"))
assert payload["status"] == "ok"
process_mock.assert_awaited_once()
@pytest.mark.anyio
async def test_yookassa_allowed_via_trusted_public_proxy(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(settings, "YOOKASSA_ENABLED", True, raising=False)
monkeypatch.setattr(settings, "YOOKASSA_TRUSTED_PROXY_NETWORKS", "198.51.100.0/24", raising=False)
async def fake_get_db():
yield SimpleNamespace()
monkeypatch.setattr("app.webserver.payments.get_db", fake_get_db)
process_mock = AsyncMock(return_value=True)
service = SimpleNamespace(process_yookassa_webhook=process_mock)
router = create_payment_router(DummyBot(), service)
assert router is not None
route = _get_route(router, settings.YOOKASSA_WEBHOOK_PATH)
request = _build_request(
settings.YOOKASSA_WEBHOOK_PATH,
body=json.dumps({"event": "payment.succeeded"}).encode("utf-8"),
headers={"X-Forwarded-For": "185.71.76.10, 198.51.100.10"},
client_ip="198.51.100.20",
)
response = await route.endpoint(request)
assert response.status_code == 200
payload = json.loads(response.body.decode("utf-8"))
assert payload["status"] == "ok"
process_mock.assert_awaited_once()
@pytest.mark.anyio
async def test_yookassa_missing_signature_when_secret_configured(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(settings, "YOOKASSA_ENABLED", True, raising=False)
monkeypatch.setattr(settings, "YOOKASSA_WEBHOOK_SECRET", "secret", raising=False)
router = create_payment_router(DummyBot(), SimpleNamespace())
service = SimpleNamespace(process_yookassa_webhook=AsyncMock())
router = create_payment_router(DummyBot(), service)
assert router is not None
route = _get_route(router, settings.YOOKASSA_WEBHOOK_PATH)
@@ -138,6 +362,66 @@ async def test_yookassa_missing_signature(monkeypatch: pytest.MonkeyPatch) -> No
assert response.status_code == 401
payload = json.loads(response.body.decode("utf-8"))
assert payload["reason"] == "missing_signature"
service.process_yookassa_webhook.assert_not_awaited()
@pytest.mark.anyio
async def test_yookassa_invalid_signature(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(settings, "YOOKASSA_ENABLED", True, raising=False)
monkeypatch.setattr(settings, "YOOKASSA_WEBHOOK_SECRET", "secret", raising=False)
service = SimpleNamespace(process_yookassa_webhook=AsyncMock())
router = create_payment_router(DummyBot(), service)
assert router is not None
route = _get_route(router, settings.YOOKASSA_WEBHOOK_PATH)
request = _build_request(
settings.YOOKASSA_WEBHOOK_PATH,
body=json.dumps({"event": "payment.succeeded"}).encode("utf-8"),
headers={"Signature": "v1 test invalid"},
)
response = await route.endpoint(request)
assert response.status_code == 401
payload = json.loads(response.body.decode("utf-8"))
assert payload["reason"] == "invalid_signature"
service.process_yookassa_webhook.assert_not_awaited()
@pytest.mark.anyio
async def test_yookassa_valid_signature(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(settings, "YOOKASSA_ENABLED", True, raising=False)
monkeypatch.setattr(settings, "YOOKASSA_WEBHOOK_SECRET", "secret", raising=False)
async def fake_get_db():
yield SimpleNamespace()
monkeypatch.setattr("app.webserver.payments.get_db", fake_get_db)
process_mock = AsyncMock(return_value=True)
service = SimpleNamespace(process_yookassa_webhook=process_mock)
router = create_payment_router(DummyBot(), service)
assert router is not None
route = _get_route(router, settings.YOOKASSA_WEBHOOK_PATH)
payload = {"event": "payment.succeeded"}
body = json.dumps(payload).encode("utf-8")
signature = _generate_yookassa_signature(body.decode("utf-8"), settings.YOOKASSA_WEBHOOK_SECRET)
request = _build_request(
settings.YOOKASSA_WEBHOOK_PATH,
body=body,
headers={"Signature": signature},
)
response = await route.endpoint(request)
assert response.status_code == 200
payload = json.loads(response.body.decode("utf-8"))
assert payload["status"] == "ok"
process_mock.assert_awaited_once()
@pytest.mark.anyio