mirror of
https://github.com/BEDOLAGA-DEV/remnawave-bedolaga-telegram-bot.git
synced 2026-02-25 13:51:50 +00:00
fix: payment race conditions, balance atomicity, renewal rollback safety
- YooKassa: SELECT FOR UPDATE on payment row to prevent concurrent double-processing - subtract_user_balance: row locking to prevent concurrent balance race conditions - subtract_user_balance: transaction creation before commit for atomicity - subscription renewal: compensating refund if extend_subscription fails after charge - StaleDataError: use savepoint instead of full rollback to protect parent transaction
This commit is contained in:
@@ -666,14 +666,15 @@ async def decrement_subscription_server_counts(
|
||||
try:
|
||||
from app.database.crud.server_squad import remove_user_from_servers
|
||||
|
||||
await remove_user_from_servers(db, sorted(server_ids))
|
||||
# Use savepoint so StaleDataError rollback doesn't affect the parent transaction
|
||||
async with db.begin_nested():
|
||||
await remove_user_from_servers(db, sorted(server_ids))
|
||||
except StaleDataError:
|
||||
logger.warning(
|
||||
'⚠️ Подписка %s уже удалена (StaleDataError), пропускаем декремент серверов %s',
|
||||
sub_id,
|
||||
list(server_ids),
|
||||
)
|
||||
await db.rollback()
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
'⚠️ Ошибка уменьшения счетчика пользователей серверов %s для подписки %s: %s',
|
||||
|
||||
@@ -503,6 +503,10 @@ async def subtract_user_balance(
|
||||
logger.info(f' 💸 Сумма к списанию: {amount_kopeks} копеек')
|
||||
logger.info(f' 📝 Описание: {description}')
|
||||
|
||||
# Lock the user row to prevent concurrent balance race conditions
|
||||
locked_result = await db.execute(select(User).where(User.id == user.id).with_for_update())
|
||||
user = locked_result.scalar_one()
|
||||
|
||||
log_context: dict[str, object] | None = None
|
||||
if consume_promo_offer:
|
||||
try:
|
||||
@@ -554,14 +558,13 @@ async def subtract_user_balance(
|
||||
|
||||
user.updated_at = datetime.utcnow()
|
||||
|
||||
await db.commit()
|
||||
await db.refresh(user)
|
||||
|
||||
if create_transaction:
|
||||
from app.database.crud.transaction import (
|
||||
create_transaction as create_trans,
|
||||
)
|
||||
|
||||
# create_trans commits the session, atomically persisting
|
||||
# both the balance change and the transaction record
|
||||
await create_trans(
|
||||
db=db,
|
||||
user_id=user.id,
|
||||
@@ -570,6 +573,10 @@ async def subtract_user_balance(
|
||||
description=description,
|
||||
payment_method=payment_method,
|
||||
)
|
||||
else:
|
||||
await db.commit()
|
||||
|
||||
await db.refresh(user)
|
||||
|
||||
if consume_promo_offer and log_context:
|
||||
try:
|
||||
|
||||
@@ -397,6 +397,21 @@ class YooKassaPaymentMixin:
|
||||
try:
|
||||
from sqlalchemy import select
|
||||
|
||||
from app.database.models import YooKassaPayment as YKPayment
|
||||
|
||||
# Lock the payment row to prevent concurrent double-processing
|
||||
locked_result = await db.execute(select(YKPayment).where(YKPayment.id == payment.id).with_for_update())
|
||||
payment = locked_result.scalar_one()
|
||||
|
||||
# Fast-path: already processed
|
||||
if getattr(payment, 'transaction_id', None):
|
||||
logger.info(
|
||||
'Платеж YooKassa %s уже обработан (transaction_id=%s), пропускаем.',
|
||||
payment.yookassa_payment_id,
|
||||
payment.transaction_id,
|
||||
)
|
||||
return True
|
||||
|
||||
payment_module = import_module('app.services.payment_service')
|
||||
|
||||
# Проверяем, не обрабатывается ли уже этот платеж (защита от дублирования)
|
||||
|
||||
@@ -449,7 +449,30 @@ class SubscriptionRenewalService:
|
||||
subscription_before = subscription
|
||||
old_end_date = subscription_before.end_date
|
||||
|
||||
subscription_after = await extend_subscription(db, subscription_before, period_days)
|
||||
try:
|
||||
subscription_after = await extend_subscription(db, subscription_before, period_days)
|
||||
except Exception:
|
||||
# Compensate: refund the charged balance since extension failed
|
||||
if charge_from_balance > 0:
|
||||
try:
|
||||
from app.database.crud.user import add_user_balance
|
||||
|
||||
await add_user_balance(
|
||||
db,
|
||||
user,
|
||||
charge_from_balance,
|
||||
'Возврат: ошибка продления подписки',
|
||||
create_transaction=True,
|
||||
transaction_type=TransactionType.REFUND,
|
||||
)
|
||||
except Exception as refund_error:
|
||||
logger.critical(
|
||||
'CRITICAL: Failed to refund %s kopeks to user %s after extension failure: %s',
|
||||
charge_from_balance,
|
||||
user.id,
|
||||
refund_error,
|
||||
)
|
||||
raise
|
||||
|
||||
server_ids = pricing.server_ids or []
|
||||
server_prices_for_period = pricing.details.get('servers_individual_prices', [])
|
||||
|
||||
Reference in New Issue
Block a user