From 9cabbf72d2d34f59db75463cdef92f6dbdb25268 Mon Sep 17 00:00:00 2001 From: Egor Date: Sat, 18 Oct 2025 01:20:51 +0300 Subject: [PATCH] Add WATA webhook support with signature verification --- README.md | 2 + app/config.py | 5 + app/external/wata_webhook.py | 255 ++++++++++++++++++++ app/services/payment/wata.py | 93 +++++++ main.py | 45 +++- tests/external/test_wata_webhook.py | 90 +++++++ tests/services/test_payment_service_wata.py | 211 ++++++++++++++++ 7 files changed, 699 insertions(+), 2 deletions(-) create mode 100644 app/external/wata_webhook.py create mode 100644 tests/external/test_wata_webhook.py diff --git a/README.md b/README.md index be1dcb9c..4e685ff2 100644 --- a/README.md +++ b/README.md @@ -625,6 +625,8 @@ WATA_ENABLED=false WATA_TOKEN= WATA_TERMINAL_ID= WATA_WEBHOOK_PATH=/wata-webhook +WATA_WEBHOOK_HOST=0.0.0.0 +WATA_WEBHOOK_PORT=8085 # ===== ИНТЕРФЕЙС И UX ===== ENABLE_LOGO_MODE=true diff --git a/app/config.py b/app/config.py index 036467d8..3518e15f 100644 --- a/app/config.py +++ b/app/config.py @@ -245,6 +245,11 @@ class Settings(BaseSettings): WATA_MIN_AMOUNT_KOPEKS: int = 10000 WATA_MAX_AMOUNT_KOPEKS: int = 100000000 WATA_REQUEST_TIMEOUT: int = 30 + WATA_WEBHOOK_PATH: str = "/wata-webhook" + WATA_WEBHOOK_HOST: str = "0.0.0.0" + WATA_WEBHOOK_PORT: int = 8085 + WATA_PUBLIC_KEY_URL: Optional[str] = None + WATA_PUBLIC_KEY_CACHE_SECONDS: int = 3600 MAIN_MENU_MODE: str = "default" CONNECT_BUTTON_MODE: str = "guide" diff --git a/app/external/wata_webhook.py b/app/external/wata_webhook.py new file mode 100644 index 00000000..075d69fd --- /dev/null +++ b/app/external/wata_webhook.py @@ -0,0 +1,255 @@ +from __future__ import annotations + +import asyncio +import base64 +import json +import logging +from datetime import datetime, timedelta +from typing import Any, Dict, Optional + +import aiohttp +from aiohttp import web +from cryptography.exceptions import InvalidSignature +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import padding + +from app.config import settings +from app.database.database import get_db +from app.services.payment_service import PaymentService + +logger = logging.getLogger(__name__) + + +class WataPublicKeyProvider: + """Loads and caches the WATA public key used for webhook signature validation.""" + + def __init__(self, *, cache_seconds: Optional[int] = None) -> None: + self._cache_seconds = cache_seconds or int(settings.WATA_PUBLIC_KEY_CACHE_SECONDS) + self._cached_key: Optional[str] = None + self._expires_at: Optional[datetime] = None + self._lock = asyncio.Lock() + + async def get_public_key(self) -> Optional[str]: + """Returns a cached public key or fetches a new one from WATA.""" + + now = datetime.utcnow() + if self._cached_key and self._expires_at and now < self._expires_at: + return self._cached_key + + async with self._lock: + now = datetime.utcnow() + if self._cached_key and self._expires_at and now < self._expires_at: + return self._cached_key + + key = await self._fetch_public_key() + if key: + self._cached_key = key + if self._cache_seconds > 0: + self._expires_at = datetime.utcnow() + timedelta(seconds=self._cache_seconds) + else: + self._expires_at = None + logger.debug("Получен и закеширован публичный ключ WATA") + return self._cached_key + + if self._cached_key: + logger.warning("Используем ранее закешированный публичный ключ WATA") + return self._cached_key + + logger.error("Публичный ключ WATA недоступен") + return None + + async def _fetch_public_key(self) -> Optional[str]: + url = settings.WATA_PUBLIC_KEY_URL or f"{settings.WATA_BASE_URL.rstrip('/')}/public-key" + timeout = aiohttp.ClientTimeout(total=settings.WATA_REQUEST_TIMEOUT) + + try: + async with aiohttp.ClientSession(timeout=timeout) as session: + async with session.get(url) as response: + text = await response.text() + if response.status >= 400: + logger.error( + "Ошибка получения публичного ключа WATA %s: %s", + response.status, + text, + ) + return None + + try: + payload = await response.json() + except aiohttp.ContentTypeError: + logger.error("Ответ WATA public-key не является JSON: %s", text) + return None + + if isinstance(payload, dict): + value = payload.get("value") + if value: + return value + logger.error("Ответ WATA public-key не содержит ключ: %s", payload) + else: + logger.error("Неожиданный формат ответа WATA public-key: %s", payload) + except Exception as error: + logger.error("Ошибка запроса публичного ключа WATA: %s", error) + + return None + + +class WataWebhookHandler: + """Processes webhook callbacks coming from WATA.""" + + def __init__( + self, + payment_service: PaymentService, + *, + public_key_provider: Optional[WataPublicKeyProvider] = None, + ) -> None: + self.payment_service = payment_service + self.public_key_provider = public_key_provider or WataPublicKeyProvider() + + async def _verify_signature(self, raw_body: str, signature: str) -> bool: + signature = (signature or "").strip() + if not signature: + logger.error("WATA webhook без подписи") + return False + + public_key_pem = await self.public_key_provider.get_public_key() + if not public_key_pem: + logger.error("Публичный ключ WATA отсутствует, проверка подписи невозможна") + return False + + try: + signature_bytes = base64.b64decode(signature) + except (ValueError, TypeError): + logger.error("Некорректная подпись WATA (не Base64)") + return False + + try: + public_key = serialization.load_pem_public_key(public_key_pem.encode("utf-8")) + except ValueError as error: + logger.error("Ошибка загрузки публичного ключа WATA: %s", error) + return False + + try: + public_key.verify( + signature_bytes, + raw_body.encode("utf-8"), + padding.PKCS1v15(), + hashes.SHA512(), + ) + return True + except InvalidSignature: + logger.warning("Подпись WATA webhook не прошла проверку") + return False + except Exception as error: + logger.error("Ошибка проверки подписи WATA: %s", error) + return False + + async def handle_webhook(self, request: web.Request) -> web.Response: + if not settings.is_wata_enabled(): + logger.warning("Получен WATA webhook, но сервис отключен") + return web.json_response({"status": "error", "reason": "wata_disabled"}, status=503) + + raw_body = await request.text() + if not raw_body: + logger.warning("Получен пустой WATA webhook") + return web.json_response({"status": "error", "reason": "empty_body"}, status=400) + + signature = request.headers.get("X-Signature") + if not await self._verify_signature(raw_body, signature or ""): + return web.json_response({"status": "error", "reason": "invalid_signature"}, status=401) + + try: + payload: Dict[str, Any] = json.loads(raw_body) + except json.JSONDecodeError: + logger.error("Некорректный JSON WATA webhook") + return web.json_response({"status": "error", "reason": "invalid_json"}, status=400) + + logger.info( + "Получен WATA webhook: order_id=%s, status=%s", + payload.get("orderId"), + payload.get("transactionStatus"), + ) + + async for db in get_db(): + try: + processed = await self.payment_service.process_wata_webhook(db, payload) + if processed: + return web.json_response({"status": "ok"}, status=200) + return web.json_response({"status": "error", "reason": "not_processed"}, status=400) + finally: + await db.close() + + async def health_check(self, request: web.Request) -> web.Response: + return web.json_response( + { + "status": "ok", + "service": "wata_webhook", + "enabled": settings.is_wata_enabled(), + "path": settings.WATA_WEBHOOK_PATH, + } + ) + + async def options_handler(self, _: web.Request) -> web.Response: + return web.Response( + status=200, + headers={ + "Access-Control-Allow-Origin": "*", + "Access-Control-Allow-Methods": "POST, GET, OPTIONS", + "Access-Control-Allow-Headers": "Content-Type, X-Signature", + }, + ) + + +def create_wata_webhook_app(payment_service: PaymentService) -> web.Application: + app = web.Application() + handler = WataWebhookHandler(payment_service) + + app.router.add_post(settings.WATA_WEBHOOK_PATH, handler.handle_webhook) + app.router.add_get(settings.WATA_WEBHOOK_PATH, handler.health_check) + app.router.add_options(settings.WATA_WEBHOOK_PATH, handler.options_handler) + app.router.add_get("/health", handler.health_check) + + logger.info( + "Настроен WATA webhook endpoint на %s", + settings.WATA_WEBHOOK_PATH, + ) + + return app + + +async def start_wata_webhook_server(payment_service: PaymentService) -> None: + if not settings.is_wata_enabled(): + logger.info("WATA отключен, webhook сервер не запускается") + return + + app = create_wata_webhook_app(payment_service) + runner = web.AppRunner(app) + await runner.setup() + + site = web.TCPSite( + runner, + host=settings.WATA_WEBHOOK_HOST, + port=settings.WATA_WEBHOOK_PORT, + ) + + try: + await site.start() + logger.info( + "WATA webhook сервер запущен на %s:%s", + settings.WATA_WEBHOOK_HOST, + settings.WATA_WEBHOOK_PORT, + ) + logger.info( + "WATA webhook URL: http://%s:%s%s", + settings.WATA_WEBHOOK_HOST, + settings.WATA_WEBHOOK_PORT, + settings.WATA_WEBHOOK_PATH, + ) + + while True: + await asyncio.sleep(1) + except asyncio.CancelledError: + logger.info("WATA webhook сервер остановлен по запросу") + finally: + await site.stop() + await runner.cleanup() + logger.info("WATA webhook сервер корректно остановлен") diff --git a/app/services/payment/wata.py b/app/services/payment/wata.py index 8d1f7940..213c731e 100644 --- a/app/services/payment/wata.py +++ b/app/services/payment/wata.py @@ -120,6 +120,99 @@ class WataPaymentMixin: "order_id": order_id, } + async def process_wata_webhook( + self, + db: AsyncSession, + payload: Dict[str, Any], + ) -> bool: + """Handles asynchronous webhook notifications from WATA.""" + + payment_module = import_module("app.services.payment_service") + + if not isinstance(payload, dict): + logger.error("WATA webhook payload не является словарём: %s", payload) + return False + + order_id_raw = payload.get("orderId") + payment_link_raw = payload.get("paymentLinkId") or payload.get("id") + transaction_status_raw = payload.get("transactionStatus") + + order_id = str(order_id_raw) if order_id_raw else None + payment_link_id = str(payment_link_raw) if payment_link_raw else None + transaction_status = (transaction_status_raw or "").strip() + + if not order_id and not payment_link_id: + logger.error( + "WATA webhook без orderId и paymentLinkId: %s", + payload, + ) + return False + + if not transaction_status: + logger.error("WATA webhook без статуса транзакции: %s", payload) + return False + + payment = None + if order_id: + payment = await payment_module.get_wata_payment_by_order_id(db, order_id) + if not payment and payment_link_id: + payment = await payment_module.get_wata_payment_by_link_id(db, payment_link_id) + + if not payment: + logger.error( + "WATA платеж не найден (order_id=%s, payment_link_id=%s)", + order_id, + payment_link_id, + ) + return False + + status_lower = transaction_status.lower() + metadata = dict(getattr(payment, "metadata_json", {}) or {}) + metadata["last_webhook"] = payload + terminal_public_id = ( + payload.get("terminalPublicId") + or payload.get("terminal_public_id") + or payload.get("terminalPublicID") + ) + + update_kwargs: Dict[str, Any] = { + "metadata": metadata, + "callback_payload": payload, + "terminal_public_id": terminal_public_id, + } + + if transaction_status: + update_kwargs["status"] = transaction_status + update_kwargs["last_status"] = transaction_status + + if status_lower != "paid" and not payment.is_paid: + update_kwargs["is_paid"] = False + + payment = await payment_module.update_wata_payment_status( + db, + payment=payment, + **update_kwargs, + ) + + if status_lower == "paid": + if payment.is_paid: + logger.info( + "WATA платеж %s уже помечен как оплачен", + payment.payment_link_id, + ) + return True + + await self._finalize_wata_payment(db, payment, payload) + return True + + if status_lower == "declined": + logger.info( + "WATA платеж %s отклонён", + payment.payment_link_id, + ) + + return True + async def get_wata_payment_status( self, db: AsyncSession, diff --git a/main.py b/main.py index 337903fc..5b718766 100644 --- a/main.py +++ b/main.py @@ -17,6 +17,7 @@ from app.services.version_service import version_service from app.external.webhook_server import WebhookServer from app.external.yookassa_webhook import start_yookassa_webhook_server from app.external.pal24_webhook import start_pal24_webhook_server, Pal24WebhookServer +from app.external.wata_webhook import start_wata_webhook_server from app.database.universal_migration import run_universal_migration from app.services.backup_service import backup_service from app.services.reporting_service import reporting_service @@ -72,6 +73,7 @@ async def main(): webhook_server = None yookassa_server_task = None + wata_server_task = None pal24_server: Pal24WebhookServer | None = None monitoring_task = None maintenance_task = None @@ -285,6 +287,21 @@ async def main(): else: stage.skip("PayPalych отключен настройками") + async with timeline.stage( + "WATA webhook", + "💳", + success_message="WATA webhook запущен", + ) as stage: + if settings.is_wata_enabled(): + wata_server_task = asyncio.create_task( + start_wata_webhook_server(payment_service) + ) + stage.log( + f"Endpoint: {settings.WEBHOOK_URL}:{settings.WATA_WEBHOOK_PORT}{settings.WATA_WEBHOOK_PATH}" + ) + else: + stage.skip("WATA отключен настройками") + async with timeline.stage( "Служба мониторинга", "📈", @@ -369,6 +386,10 @@ async def main(): webhook_lines.append( f"PayPalych: {settings.WEBHOOK_URL}:{settings.PAL24_WEBHOOK_PORT}{settings.PAL24_WEBHOOK_PATH}" ) + if settings.is_wata_enabled(): + webhook_lines.append( + f"WATA: {settings.WEBHOOK_URL}:{settings.WATA_WEBHOOK_PORT}{settings.WATA_WEBHOOK_PATH}" + ) timeline.log_section( "Активные webhook endpoints", @@ -399,7 +420,19 @@ async def main(): yookassa_server_task = asyncio.create_task( start_yookassa_webhook_server(payment_service) ) - + + if wata_server_task and wata_server_task.done(): + exception = wata_server_task.exception() + if exception: + logger.error(f"WATA webhook сервер завершился с ошибкой: {exception}") + logger.info("🔄 Перезапуск WATA webhook сервера...") + if settings.is_wata_enabled(): + wata_server_task = asyncio.create_task( + start_wata_webhook_server(payment_service) + ) + else: + wata_server_task = None + if monitoring_task.done(): exception = monitoring_task.exception() if exception: @@ -446,7 +479,15 @@ async def main(): await yookassa_server_task except asyncio.CancelledError: pass - + + if wata_server_task and not wata_server_task.done(): + logger.info("ℹ️ Остановка WATA webhook сервера...") + wata_server_task.cancel() + try: + await wata_server_task + except asyncio.CancelledError: + pass + if monitoring_task and not monitoring_task.done(): logger.info("ℹ️ Остановка службы мониторинга...") monitoring_service.stop_monitoring() diff --git a/tests/external/test_wata_webhook.py b/tests/external/test_wata_webhook.py new file mode 100644 index 00000000..7708d684 --- /dev/null +++ b/tests/external/test_wata_webhook.py @@ -0,0 +1,90 @@ +"""Unit tests for the WATA webhook handler.""" + +from __future__ import annotations + +import base64 +from pathlib import Path +import sys +from typing import Optional + +import pytest +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import padding, rsa + +ROOT_DIR = Path(__file__).resolve().parents[2] +if str(ROOT_DIR) not in sys.path: + sys.path.insert(0, str(ROOT_DIR)) + +from app.external.wata_webhook import WataWebhookHandler # noqa: E402 + + +class DummyPaymentService: + async def process_wata_webhook(self, *args, **kwargs): # pragma: no cover - not used in tests + return True + + +class StubPublicKeyProvider: + def __init__(self, public_key_pem: Optional[str]) -> None: + self.public_key_pem = public_key_pem + + async def get_public_key(self) -> Optional[str]: + return self.public_key_pem + + +@pytest.fixture +def anyio_backend() -> str: + return "asyncio" + + +@pytest.mark.anyio("asyncio") +async def test_verify_signature_success() -> None: + private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) + public_key = private_key.public_key().public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ).decode("utf-8") + + payload = "{\"status\": \"Paid\"}" + signature = base64.b64encode( + private_key.sign( + payload.encode("utf-8"), + padding.PKCS1v15(), + hashes.SHA512(), + ) + ).decode("utf-8") + + handler = WataWebhookHandler( + DummyPaymentService(), + public_key_provider=StubPublicKeyProvider(public_key), + ) + + assert await handler._verify_signature(payload, signature) is True + + +@pytest.mark.anyio("asyncio") +async def test_verify_signature_fails_with_invalid_signature() -> None: + private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) + public_key = private_key.public_key().public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ).decode("utf-8") + + payload = "{\"status\": \"Paid\"}" + bad_signature = base64.b64encode(b"not-a-signature").decode("utf-8") + + handler = WataWebhookHandler( + DummyPaymentService(), + public_key_provider=StubPublicKeyProvider(public_key), + ) + + assert await handler._verify_signature(payload, bad_signature) is False + + +@pytest.mark.anyio("asyncio") +async def test_verify_signature_fails_without_public_key() -> None: + handler = WataWebhookHandler( + DummyPaymentService(), + public_key_provider=StubPublicKeyProvider(None), + ) + + assert await handler._verify_signature("{}", "signature") is False diff --git a/tests/services/test_payment_service_wata.py b/tests/services/test_payment_service_wata.py index fd30232f..e1028a83 100644 --- a/tests/services/test_payment_service_wata.py +++ b/tests/services/test_payment_service_wata.py @@ -47,6 +47,23 @@ class StubWataService: return self.response +class DummyWataPayment: + def __init__(self) -> None: + self.id = 1 + self.user_id = 42 + self.payment_link_id = "link-123" + self.order_id = "order-123" + self.amount_kopeks = 15_000 + self.currency = "RUB" + self.description = "Пополнение" + self.status = "Opened" + self.is_paid = False + self.metadata_json: Dict[str, Any] = {} + self.transaction_id: Optional[int] = None + self.callback_payload: Optional[Dict[str, Any]] = None + self.terminal_public_id: Optional[str] = None + + def _make_service(stub: Optional[StubWataService]) -> PaymentService: service = PaymentService.__new__(PaymentService) # type: ignore[call-arg] service.bot = None @@ -142,3 +159,197 @@ async def test_create_wata_payment_returns_none_without_service() -> None: description="Пополнение", ) assert result is None + + +@pytest.mark.anyio("asyncio") +async def test_process_wata_webhook_updates_status(monkeypatch: pytest.MonkeyPatch) -> None: + service = _make_service(None) + db = DummySession() + payment = DummyWataPayment() + update_kwargs: Dict[str, Any] = {} + link_lookup_called = False + + async def fake_get_by_order_id(db_arg: Any, order_id: str) -> DummyWataPayment: + assert db_arg is db + assert order_id == payment.order_id + return payment + + async def fake_get_by_link_id(*_: Any, **__: Any) -> Optional[DummyWataPayment]: + nonlocal link_lookup_called + link_lookup_called = True + return None + + async def fake_update_status( + db_arg: Any, + *, + payment: DummyWataPayment, + **kwargs: Any, + ) -> DummyWataPayment: + assert db_arg is db + update_kwargs.update(kwargs) + if "status" in kwargs: + payment.status = kwargs["status"] + if "is_paid" in kwargs: + payment.is_paid = kwargs["is_paid"] + if "metadata" in kwargs: + payment.metadata_json = kwargs["metadata"] + if "callback_payload" in kwargs: + payment.callback_payload = kwargs["callback_payload"] + if "terminal_public_id" in kwargs: + payment.terminal_public_id = kwargs["terminal_public_id"] + return payment + + monkeypatch.setattr( + payment_service_module, + "get_wata_payment_by_order_id", + fake_get_by_order_id, + raising=False, + ) + monkeypatch.setattr( + payment_service_module, + "get_wata_payment_by_link_id", + fake_get_by_link_id, + raising=False, + ) + monkeypatch.setattr( + payment_service_module, + "update_wata_payment_status", + fake_update_status, + raising=False, + ) + + payload = { + "orderId": payment.order_id, + "transactionStatus": "Declined", + "terminalPublicId": "terminal-001", + } + + processed = await service.process_wata_webhook(db, payload) + + assert processed is True + assert link_lookup_called is False + assert payment.status == "Declined" + assert payment.is_paid is False + assert payment.metadata_json.get("last_webhook") == payload + assert payment.callback_payload == payload + assert payment.terminal_public_id == "terminal-001" + assert update_kwargs["status"] == "Declined" + assert update_kwargs["is_paid"] is False + + +@pytest.mark.anyio("asyncio") +async def test_process_wata_webhook_finalizes_paid(monkeypatch: pytest.MonkeyPatch) -> None: + service = _make_service(None) + db = DummySession() + payment = DummyWataPayment() + finalize_called = False + + async def fake_get_by_order_id(*_: Any, **__: Any) -> DummyWataPayment: + return payment + + async def fake_update_status( + db_arg: Any, + *, + payment: DummyWataPayment, + **kwargs: Any, + ) -> DummyWataPayment: + if "metadata" in kwargs: + payment.metadata_json = kwargs["metadata"] + if "callback_payload" in kwargs: + payment.callback_payload = kwargs["callback_payload"] + if "status" in kwargs: + payment.status = kwargs["status"] + return payment + + async def fake_finalize( + db_arg: Any, + payment_arg: DummyWataPayment, + payload_arg: Dict[str, Any], + ) -> DummyWataPayment: + nonlocal finalize_called + finalize_called = True + payment_arg.is_paid = True + return payment_arg + + monkeypatch.setattr( + payment_service_module, + "get_wata_payment_by_order_id", + fake_get_by_order_id, + raising=False, + ) + monkeypatch.setattr( + payment_service_module, + "get_wata_payment_by_link_id", + lambda *args, **kwargs: None, + raising=False, + ) + monkeypatch.setattr( + payment_service_module, + "update_wata_payment_status", + fake_update_status, + raising=False, + ) + monkeypatch.setattr( + service, + "_finalize_wata_payment", + fake_finalize, + raising=False, + ) + + payload = { + "orderId": payment.order_id, + "transactionStatus": "Paid", + "transactionId": "tx-001", + } + + processed = await service.process_wata_webhook(db, payload) + + assert processed is True + assert finalize_called is True + assert payment.is_paid is True + assert payment.metadata_json.get("last_webhook") == payload + + +@pytest.mark.anyio("asyncio") +async def test_process_wata_webhook_returns_false_when_payment_missing( + monkeypatch: pytest.MonkeyPatch, +) -> None: + service = _make_service(None) + db = DummySession() + + async def fake_get_by_order_id(*_: Any, **__: Any) -> None: + return None + + async def fake_get_by_link_id(*_: Any, **__: Any) -> None: + return None + + async def fail_update(*_: Any, **__: Any) -> None: + pytest.fail("update_wata_payment_status should not be called") + + monkeypatch.setattr( + payment_service_module, + "get_wata_payment_by_order_id", + fake_get_by_order_id, + raising=False, + ) + monkeypatch.setattr( + payment_service_module, + "get_wata_payment_by_link_id", + fake_get_by_link_id, + raising=False, + ) + monkeypatch.setattr( + payment_service_module, + "update_wata_payment_status", + fail_update, + raising=False, + ) + + payload = { + "orderId": "missing-order", + "transactionStatus": "Paid", + } + + processed = await service.process_wata_webhook(db, payload) + + assert processed is False