From c5124b97b63eda59b52d2cbf9e2dcdaa6141ed6e Mon Sep 17 00:00:00 2001 From: Fringg Date: Wed, 11 Feb 2026 21:49:37 +0300 Subject: [PATCH] fix: payment race conditions, balance atomicity, renewal rollback safety - YooKassa: SELECT FOR UPDATE on payment row to prevent concurrent double-processing - subtract_user_balance: row locking to prevent concurrent balance race conditions - subtract_user_balance: transaction creation before commit for atomicity - subscription renewal: compensating refund if extend_subscription fails after charge - StaleDataError: use savepoint instead of full rollback to protect parent transaction --- app/database/crud/subscription.py | 5 ++-- app/database/crud/user.py | 13 +++++++--- app/services/payment/yookassa.py | 15 ++++++++++++ app/services/subscription_renewal_service.py | 25 +++++++++++++++++++- 4 files changed, 52 insertions(+), 6 deletions(-) diff --git a/app/database/crud/subscription.py b/app/database/crud/subscription.py index ae65112a..f2904512 100644 --- a/app/database/crud/subscription.py +++ b/app/database/crud/subscription.py @@ -666,14 +666,15 @@ async def decrement_subscription_server_counts( try: from app.database.crud.server_squad import remove_user_from_servers - await remove_user_from_servers(db, sorted(server_ids)) + # Use savepoint so StaleDataError rollback doesn't affect the parent transaction + async with db.begin_nested(): + await remove_user_from_servers(db, sorted(server_ids)) except StaleDataError: logger.warning( '⚠️ Подписка %s уже удалена (StaleDataError), пропускаем декремент серверов %s', sub_id, list(server_ids), ) - await db.rollback() except Exception as error: logger.error( '⚠️ Ошибка уменьшения счетчика пользователей серверов %s для подписки %s: %s', diff --git a/app/database/crud/user.py b/app/database/crud/user.py index af359f29..0e73f5e3 100644 --- a/app/database/crud/user.py +++ b/app/database/crud/user.py @@ -503,6 +503,10 @@ async def subtract_user_balance( logger.info(f' 💸 Сумма к списанию: {amount_kopeks} копеек') logger.info(f' 📝 Описание: {description}') + # Lock the user row to prevent concurrent balance race conditions + locked_result = await db.execute(select(User).where(User.id == user.id).with_for_update()) + user = locked_result.scalar_one() + log_context: dict[str, object] | None = None if consume_promo_offer: try: @@ -554,14 +558,13 @@ async def subtract_user_balance( user.updated_at = datetime.utcnow() - await db.commit() - await db.refresh(user) - if create_transaction: from app.database.crud.transaction import ( create_transaction as create_trans, ) + # create_trans commits the session, atomically persisting + # both the balance change and the transaction record await create_trans( db=db, user_id=user.id, @@ -570,6 +573,10 @@ async def subtract_user_balance( description=description, payment_method=payment_method, ) + else: + await db.commit() + + await db.refresh(user) if consume_promo_offer and log_context: try: diff --git a/app/services/payment/yookassa.py b/app/services/payment/yookassa.py index 59a642bf..ed05d603 100644 --- a/app/services/payment/yookassa.py +++ b/app/services/payment/yookassa.py @@ -397,6 +397,21 @@ class YooKassaPaymentMixin: try: from sqlalchemy import select + from app.database.models import YooKassaPayment as YKPayment + + # Lock the payment row to prevent concurrent double-processing + locked_result = await db.execute(select(YKPayment).where(YKPayment.id == payment.id).with_for_update()) + payment = locked_result.scalar_one() + + # Fast-path: already processed + if getattr(payment, 'transaction_id', None): + logger.info( + 'Платеж YooKassa %s уже обработан (transaction_id=%s), пропускаем.', + payment.yookassa_payment_id, + payment.transaction_id, + ) + return True + payment_module = import_module('app.services.payment_service') # Проверяем, не обрабатывается ли уже этот платеж (защита от дублирования) diff --git a/app/services/subscription_renewal_service.py b/app/services/subscription_renewal_service.py index 12ff3782..c8ec39b6 100644 --- a/app/services/subscription_renewal_service.py +++ b/app/services/subscription_renewal_service.py @@ -449,7 +449,30 @@ class SubscriptionRenewalService: subscription_before = subscription old_end_date = subscription_before.end_date - subscription_after = await extend_subscription(db, subscription_before, period_days) + try: + subscription_after = await extend_subscription(db, subscription_before, period_days) + except Exception: + # Compensate: refund the charged balance since extension failed + if charge_from_balance > 0: + try: + from app.database.crud.user import add_user_balance + + await add_user_balance( + db, + user, + charge_from_balance, + 'Возврат: ошибка продления подписки', + create_transaction=True, + transaction_type=TransactionType.REFUND, + ) + except Exception as refund_error: + logger.critical( + 'CRITICAL: Failed to refund %s kopeks to user %s after extension failure: %s', + charge_from_balance, + user.id, + refund_error, + ) + raise server_ids = pricing.server_ids or [] server_prices_for_period = pricing.details.get('servers_individual_prices', [])