Add automatic subscription purchase after balance top-up

This commit is contained in:
Egor
2025-10-26 18:04:13 +03:00
parent 2616292f97
commit a681a9ff1b
5 changed files with 631 additions and 285 deletions

View File

@@ -187,6 +187,8 @@ class Settings(BaseSettings):
DISABLE_TOPUP_BUTTONS: bool = False
PAYMENT_VERIFICATION_AUTO_CHECK_ENABLED: bool = False
PAYMENT_VERIFICATION_AUTO_CHECK_INTERVAL_MINUTES: int = 10
AUTOBUY_AFTER_TOPUP_ENABLED: bool = False
# Настройки простой покупки
SIMPLE_SUBSCRIPTION_ENABLED: bool = False
@@ -786,9 +788,12 @@ class Settings(BaseSettings):
def is_traffic_fixed(self) -> bool:
return self.TRAFFIC_SELECTION_MODE.lower() == "fixed"
def get_fixed_traffic_limit(self) -> int:
return self.FIXED_TRAFFIC_LIMIT_GB
def is_autobuy_after_topup_enabled(self) -> bool:
return bool(self.AUTOBUY_AFTER_TOPUP_ENABLED)
def is_yookassa_enabled(self) -> bool:
return (self.YOOKASSA_ENABLED and

View File

@@ -1,10 +1,12 @@
import base64
import json
import logging
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Dict, List, Any, Tuple, Optional
from urllib.parse import quote
from aiogram import Dispatcher, types, F
from aiogram import Bot
from aiogram.exceptions import TelegramBadRequest
from aiogram.fsm.context import FSMContext
from aiogram.types import InlineKeyboardMarkup, InlineKeyboardButton
@@ -141,6 +143,17 @@ from .traffic import (
select_traffic,
)
@dataclass(slots=True)
class PurchaseExecutionResult:
success: bool
message: Optional[str] = None
keyboard: Optional[InlineKeyboardMarkup] = None
error_message: Optional[str] = None
missing_amount: Optional[int] = None
purchase_completed: bool = False
async def show_subscription_info(
callback: types.CallbackQuery,
db_user: User,
@@ -1339,6 +1352,332 @@ async def devices_continue(
await state.set_state(SubscriptionStates.confirming_purchase)
await callback.answer()
async def _complete_purchase_workflow(
*,
db: AsyncSession,
db_user: User,
data: Dict[str, Any],
final_price: int,
final_traffic_gb: int,
months_in_period: int,
promo_offer_discount_value: int,
promo_offer_discount_percent: int,
server_prices: List[int],
texts,
bot: Optional[Bot],
purchase_reason: str,
) -> PurchaseExecutionResult:
purchase_completed = False
try:
success = await subtract_user_balance(
db,
db_user,
final_price,
purchase_reason,
consume_promo_offer=promo_offer_discount_value > 0,
)
if not success:
missing_kopeks = max(0, final_price - db_user.balance_kopeks)
return PurchaseExecutionResult(
success=False,
missing_amount=missing_kopeks,
)
existing_subscription = db_user.subscription
was_trial_conversion = False
current_time = datetime.utcnow()
if existing_subscription:
logger.info(f"Обновляем существующую подписку пользователя {db_user.telegram_id}")
bonus_period = timedelta()
if existing_subscription.is_trial:
logger.info(f"Конверсия из триала в платную для пользователя {db_user.telegram_id}")
was_trial_conversion = True
trial_duration = (current_time - existing_subscription.start_date).days
if settings.TRIAL_ADD_REMAINING_DAYS_TO_PAID and existing_subscription.end_date:
remaining_trial_delta = existing_subscription.end_date - current_time
if remaining_trial_delta.total_seconds() > 0:
bonus_period = remaining_trial_delta
logger.info(
"Добавляем оставшееся время триала (%s) к новой подписке пользователя %s",
bonus_period,
db_user.telegram_id,
)
try:
from app.database.crud.subscription_conversion import create_subscription_conversion
await create_subscription_conversion(
db=db,
user_id=db_user.id,
trial_duration_days=trial_duration,
payment_method="balance",
first_payment_amount_kopeks=final_price,
first_paid_period_days=data['period_days']
)
logger.info(
"Записана конверсия: %s дн. триал → %s дн. платная за %s",
trial_duration,
data['period_days'],
final_price / 100,
)
except Exception as conversion_error:
logger.error(f"Ошибка записи конверсии: {conversion_error}")
existing_subscription.is_trial = False
existing_subscription.status = SubscriptionStatus.ACTIVE.value
existing_subscription.traffic_limit_gb = final_traffic_gb
existing_subscription.device_limit = data['devices']
existing_subscription.connected_squads = data['countries']
existing_subscription.start_date = current_time
existing_subscription.end_date = current_time + timedelta(days=data['period_days']) + bonus_period
existing_subscription.updated_at = current_time
existing_subscription.traffic_used_gb = 0.0
await db.commit()
await db.refresh(existing_subscription)
subscription = existing_subscription
else:
logger.info(f"Создаем новую подписку для пользователя {db_user.telegram_id}")
subscription = await create_paid_subscription_with_traffic_mode(
db=db,
user_id=db_user.id,
duration_days=data['period_days'],
device_limit=data['devices'],
connected_squads=data['countries'],
traffic_gb=final_traffic_gb
)
from app.utils.user_utils import mark_user_as_had_paid_subscription
await mark_user_as_had_paid_subscription(db, db_user)
from app.database.crud.server_squad import get_server_ids_by_uuids, add_user_to_servers
from app.database.crud.subscription import add_subscription_servers
server_ids = await get_server_ids_by_uuids(db, data['countries'])
if server_ids:
await add_subscription_servers(db, subscription, server_ids, server_prices)
await add_user_to_servers(db, server_ids)
logger.info(f"Сохранены цены серверов за весь период: {server_prices}")
await db.refresh(db_user)
subscription_service = SubscriptionService()
if db_user.remnawave_uuid:
remnawave_user = await subscription_service.update_remnawave_user(
db,
subscription,
reset_traffic=settings.RESET_TRAFFIC_ON_PAYMENT,
reset_reason="покупка подписки",
)
else:
remnawave_user = await subscription_service.create_remnawave_user(
db,
subscription,
reset_traffic=settings.RESET_TRAFFIC_ON_PAYMENT,
reset_reason="покупка подписки",
)
if not remnawave_user:
logger.error(f"Не удалось создать/обновить RemnaWave пользователя для {db_user.telegram_id}")
remnawave_user = await subscription_service.create_remnawave_user(
db,
subscription,
reset_traffic=settings.RESET_TRAFFIC_ON_PAYMENT,
reset_reason="покупка подписки (повторная попытка)",
)
transaction = await create_transaction(
db=db,
user_id=db_user.id,
type=TransactionType.SUBSCRIPTION_PAYMENT,
amount_kopeks=final_price,
description=f"Подписка на {data['period_days']} дней ({months_in_period} мес)"
)
if bot:
try:
notification_service = AdminNotificationService(bot)
await notification_service.send_subscription_purchase_notification(
db, db_user, subscription, transaction, data['period_days'], was_trial_conversion
)
except Exception as error:
logger.error(f"Ошибка отправки уведомления о покупке: {error}")
await db.refresh(db_user)
await db.refresh(subscription)
subscription_link = get_display_subscription_link(subscription)
hide_subscription_link = settings.should_hide_subscription_link()
discount_note = ""
if promo_offer_discount_value > 0:
discount_note = texts.t(
"SUBSCRIPTION_PROMO_DISCOUNT_NOTE",
"⚡ Доп. скидка {percent}%: -{amount}",
).format(
percent=promo_offer_discount_percent,
amount=texts.format_price(promo_offer_discount_value),
)
connect_keyboard: InlineKeyboardMarkup
if remnawave_user and subscription_link:
if settings.is_happ_cryptolink_mode():
success_text = (
f"{texts.SUBSCRIPTION_PURCHASED}\n\n"
+ texts.t(
"SUBSCRIPTION_HAPP_LINK_PROMPT",
"🔒 Ссылка на подписку создана. Нажмите кнопку \"Подключиться\" ниже, чтобы открыть её в Happ.",
)
+ "\n\n"
+ texts.t(
"SUBSCRIPTION_IMPORT_INSTRUCTION_PROMPT",
"📱 Нажмите кнопку ниже, чтобы получить инструкцию по настройке VPN на вашем устройстве",
)
)
elif hide_subscription_link:
success_text = (
f"{texts.SUBSCRIPTION_PURCHASED}\n\n"
+ texts.t(
"SUBSCRIPTION_LINK_HIDDEN_NOTICE",
" Ссылка подписки доступна по кнопкам ниже или в разделе \"Моя подписка\".",
)
+ "\n\n"
+ texts.t(
"SUBSCRIPTION_IMPORT_INSTRUCTION_PROMPT",
"📱 Нажмите кнопку ниже, чтобы получить инструкцию по настройке VPN на вашем устройстве",
)
)
else:
import_link_section = texts.t(
"SUBSCRIPTION_IMPORT_LINK_SECTION",
"🔗 <b>Ваша ссылка для импорта в VPN приложение:</b>\\n<code>{subscription_url}</code>",
).format(subscription_url=subscription_link)
success_text = (
f"{texts.SUBSCRIPTION_PURCHASED}\n\n"
f"{import_link_section}\n\n"
f"{texts.t('SUBSCRIPTION_IMPORT_INSTRUCTION_PROMPT', '📱 Нажмите кнопку ниже, чтобы получить инструкцию по настройке VPN на вашем устройстве')}"
)
if discount_note:
success_text = f"{success_text}\n\n{discount_note}"
connect_mode = settings.CONNECT_BUTTON_MODE
if connect_mode == "miniapp_subscription":
connect_keyboard = InlineKeyboardMarkup(inline_keyboard=[
[
InlineKeyboardButton(
text=texts.t("CONNECT_BUTTON", "🔗 Подключиться"),
web_app=types.WebAppInfo(url=subscription_link),
)
],
[InlineKeyboardButton(text=texts.t("BACK_TO_MAIN_MENU_BUTTON", "⬅️ В главное меню"),
callback_data="back_to_menu")],
])
elif connect_mode == "miniapp_custom":
target_url = settings.MINIAPP_CUSTOM_URL
if not target_url:
logger.warning("Кастомная ссылка MiniApp не настроена, используем стандартную клавиатуру подключения")
connect_keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text=texts.t("CONNECT_BUTTON", "🔗 Подключиться"),
callback_data="subscription_connect")],
[InlineKeyboardButton(text=texts.t("BACK_TO_MAIN_MENU_BUTTON", "⬅️ В главное меню"),
callback_data="back_to_menu")],
])
else:
connect_keyboard = InlineKeyboardMarkup(inline_keyboard=[
[
InlineKeyboardButton(
text=texts.t("CONNECT_BUTTON", "🔗 Подключиться"),
web_app=types.WebAppInfo(url=target_url),
)
],
[InlineKeyboardButton(text=texts.t("BACK_TO_MAIN_MENU_BUTTON", "⬅️ В главное меню"),
callback_data="back_to_menu")],
])
elif connect_mode == "link":
rows = [
[InlineKeyboardButton(text=texts.t("CONNECT_BUTTON", "🔗 Подключиться"), url=subscription_link)]
]
happ_row = get_happ_download_button_row(texts)
if happ_row:
rows.append(happ_row)
rows.append([InlineKeyboardButton(text=texts.t("BACK_TO_MAIN_MENU_BUTTON", "⬅️ В главное меню"),
callback_data="back_to_menu")])
connect_keyboard = InlineKeyboardMarkup(inline_keyboard=rows)
elif connect_mode == "happ_cryptolink":
rows = [
[
InlineKeyboardButton(
text=texts.t("CONNECT_BUTTON", "🔗 Подключиться"),
callback_data="open_subscription_link",
)
]
]
happ_row = get_happ_download_button_row(texts)
if happ_row:
rows.append(happ_row)
rows.append([InlineKeyboardButton(text=texts.t("BACK_TO_MAIN_MENU_BUTTON", "⬅️ В главное меню"),
callback_data="back_to_menu")])
connect_keyboard = InlineKeyboardMarkup(inline_keyboard=rows)
else:
connect_keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text=texts.t("CONNECT_BUTTON", "🔗 Подключиться"),
callback_data="subscription_connect")],
[InlineKeyboardButton(text=texts.t("BACK_TO_MAIN_MENU_BUTTON", "⬅️ В главное меню"),
callback_data="back_to_menu")],
])
else:
purchase_text = texts.SUBSCRIPTION_PURCHASED
if discount_note:
purchase_text = f"{purchase_text}\n\n{discount_note}"
success_text = texts.t(
"SUBSCRIPTION_LINK_GENERATING_NOTICE",
"{purchase_text}\n\nСсылка генерируется, перейдите в раздел 'Моя подписка' через несколько секунд.",
).format(purchase_text=purchase_text)
connect_keyboard = get_back_keyboard(db_user.language)
purchase_completed = True
logger.info(
"Пользователь %s купил подписку на %s дней за %s",
db_user.telegram_id,
data['period_days'],
final_price / 100,
)
await user_cart_service.delete_user_cart(db_user.id)
return PurchaseExecutionResult(
success=True,
message=success_text,
keyboard=connect_keyboard,
purchase_completed=purchase_completed,
)
except Exception as error:
logger.error(f"Ошибка покупки подписки: {error}")
return PurchaseExecutionResult(
success=False,
error_message=texts.ERROR,
purchase_completed=purchase_completed,
)
async def confirm_purchase(
callback: types.CallbackQuery,
state: FSMContext,
@@ -1621,19 +1960,25 @@ async def confirm_purchase(
await callback.answer()
return
purchase_completed = False
purchase_reason = f"Покупка подписки на {data['period_days']} дней"
result = await _complete_purchase_workflow(
db=db,
db_user=db_user,
data=data,
final_price=final_price,
final_traffic_gb=final_traffic_gb,
months_in_period=months_in_period,
promo_offer_discount_value=promo_offer_discount_value,
promo_offer_discount_percent=promo_offer_discount_percent,
server_prices=server_prices,
texts=texts,
bot=getattr(callback, "bot", None),
purchase_reason=purchase_reason,
)
try:
success = await subtract_user_balance(
db,
db_user,
final_price,
f"Покупка подписки на {data['period_days']} дней",
consume_promo_offer=promo_offer_discount_value > 0,
)
if not success:
missing_kopeks = final_price - db_user.balance_kopeks
if not result.success:
if result.missing_amount is not None:
missing_kopeks = result.missing_amount
message_text = texts.t(
"ADDON_INSUFFICIENT_FUNDS_MESSAGE",
(
@@ -1658,285 +2003,22 @@ async def confirm_purchase(
),
parse_mode="HTML",
)
await callback.answer()
return
existing_subscription = db_user.subscription
was_trial_conversion = False
current_time = datetime.utcnow()
if existing_subscription:
logger.info(f"Обновляем существующую подписку пользователя {db_user.telegram_id}")
bonus_period = timedelta()
if existing_subscription.is_trial:
logger.info(f"Конверсия из триала в платную для пользователя {db_user.telegram_id}")
was_trial_conversion = True
trial_duration = (current_time - existing_subscription.start_date).days
if settings.TRIAL_ADD_REMAINING_DAYS_TO_PAID and existing_subscription.end_date:
remaining_trial_delta = existing_subscription.end_date - current_time
if remaining_trial_delta.total_seconds() > 0:
bonus_period = remaining_trial_delta
logger.info(
"Добавляем оставшееся время триала (%s) к новой подписке пользователя %s",
bonus_period,
db_user.telegram_id,
)
try:
from app.database.crud.subscription_conversion import create_subscription_conversion
await create_subscription_conversion(
db=db,
user_id=db_user.id,
trial_duration_days=trial_duration,
payment_method="balance",
first_payment_amount_kopeks=final_price,
first_paid_period_days=data['period_days']
)
logger.info(
f"Записана конверсия: {trial_duration} дн. триал → {data['period_days']} дн. платная за {final_price / 100}")
except Exception as conversion_error:
logger.error(f"Ошибка записи конверсии: {conversion_error}")
existing_subscription.is_trial = False
existing_subscription.status = SubscriptionStatus.ACTIVE.value
existing_subscription.traffic_limit_gb = final_traffic_gb
existing_subscription.device_limit = data['devices']
existing_subscription.connected_squads = data['countries']
existing_subscription.start_date = current_time
existing_subscription.end_date = current_time + timedelta(days=data['period_days']) + bonus_period
existing_subscription.updated_at = current_time
existing_subscription.traffic_used_gb = 0.0
await db.commit()
await db.refresh(existing_subscription)
subscription = existing_subscription
else:
logger.info(f"Создаем новую подписку для пользователя {db_user.telegram_id}")
subscription = await create_paid_subscription_with_traffic_mode(
db=db,
user_id=db_user.id,
duration_days=data['period_days'],
device_limit=data['devices'],
connected_squads=data['countries'],
traffic_gb=final_traffic_gb
)
from app.utils.user_utils import mark_user_as_had_paid_subscription
await mark_user_as_had_paid_subscription(db, db_user)
from app.database.crud.server_squad import get_server_ids_by_uuids, add_user_to_servers
from app.database.crud.subscription import add_subscription_servers
server_ids = await get_server_ids_by_uuids(db, data['countries'])
if server_ids:
await add_subscription_servers(db, subscription, server_ids, server_prices)
await add_user_to_servers(db, server_ids)
logger.info(f"Сохранены цены серверов за весь период: {server_prices}")
await db.refresh(db_user)
subscription_service = SubscriptionService()
if db_user.remnawave_uuid:
remnawave_user = await subscription_service.update_remnawave_user(
db,
subscription,
reset_traffic=settings.RESET_TRAFFIC_ON_PAYMENT,
reset_reason="покупка подписки",
)
else:
remnawave_user = await subscription_service.create_remnawave_user(
db,
subscription,
reset_traffic=settings.RESET_TRAFFIC_ON_PAYMENT,
reset_reason="покупка подписки",
)
if not remnawave_user:
logger.error(f"Не удалось создать/обновить RemnaWave пользователя для {db_user.telegram_id}")
remnawave_user = await subscription_service.create_remnawave_user(
db,
subscription,
reset_traffic=settings.RESET_TRAFFIC_ON_PAYMENT,
reset_reason="покупка подписки (повторная попытка)",
)
transaction = await create_transaction(
db=db,
user_id=db_user.id,
type=TransactionType.SUBSCRIPTION_PAYMENT,
amount_kopeks=final_price,
description=f"Подписка на {data['period_days']} дней ({months_in_period} мес)"
)
try:
notification_service = AdminNotificationService(callback.bot)
await notification_service.send_subscription_purchase_notification(
db, db_user, subscription, transaction, data['period_days'], was_trial_conversion
)
except Exception as e:
logger.error(f"Ошибка отправки уведомления о покупке: {e}")
await db.refresh(db_user)
await db.refresh(subscription)
subscription_link = get_display_subscription_link(subscription)
hide_subscription_link = settings.should_hide_subscription_link()
discount_note = ""
if promo_offer_discount_value > 0:
discount_note = texts.t(
"SUBSCRIPTION_PROMO_DISCOUNT_NOTE",
"⚡ Доп. скидка {percent}%: -{amount}",
).format(
percent=promo_offer_discount_percent,
amount=texts.format_price(promo_offer_discount_value),
)
if remnawave_user and subscription_link:
if settings.is_happ_cryptolink_mode():
success_text = (
f"{texts.SUBSCRIPTION_PURCHASED}\n\n"
+ texts.t(
"SUBSCRIPTION_HAPP_LINK_PROMPT",
"🔒 Ссылка на подписку создана. Нажмите кнопку \"Подключиться\" ниже, чтобы открыть её в Happ.",
)
+ "\n\n"
+ texts.t(
"SUBSCRIPTION_IMPORT_INSTRUCTION_PROMPT",
"📱 Нажмите кнопку ниже, чтобы получить инструкцию по настройке VPN на вашем устройстве",
)
)
elif hide_subscription_link:
success_text = (
f"{texts.SUBSCRIPTION_PURCHASED}\n\n"
+ texts.t(
"SUBSCRIPTION_LINK_HIDDEN_NOTICE",
" Ссылка подписки доступна по кнопкам ниже или в разделе \"Моя подписка\".",
)
+ "\n\n"
+ texts.t(
"SUBSCRIPTION_IMPORT_INSTRUCTION_PROMPT",
"📱 Нажмите кнопку ниже, чтобы получить инструкцию по настройке VPN на вашем устройстве",
)
)
else:
import_link_section = texts.t(
"SUBSCRIPTION_IMPORT_LINK_SECTION",
"🔗 <b>Ваша ссылка для импорта в VPN приложение:</b>\\n<code>{subscription_url}</code>",
).format(subscription_url=subscription_link)
success_text = (
f"{texts.SUBSCRIPTION_PURCHASED}\n\n"
f"{import_link_section}\n\n"
f"{texts.t('SUBSCRIPTION_IMPORT_INSTRUCTION_PROMPT', '📱 Нажмите кнопку ниже, чтобы получить инструкцию по настройке VPN на вашем устройстве')}"
)
if discount_note:
success_text = f"{success_text}\n\n{discount_note}"
connect_mode = settings.CONNECT_BUTTON_MODE
if connect_mode == "miniapp_subscription":
connect_keyboard = InlineKeyboardMarkup(inline_keyboard=[
[
InlineKeyboardButton(
text=texts.t("CONNECT_BUTTON", "🔗 Подключиться"),
web_app=types.WebAppInfo(url=subscription_link),
)
],
[InlineKeyboardButton(text=texts.t("BACK_TO_MAIN_MENU_BUTTON", "⬅️ В главное меню"),
callback_data="back_to_menu")],
])
elif connect_mode == "miniapp_custom":
if not settings.MINIAPP_CUSTOM_URL:
await callback.answer(
texts.t(
"CUSTOM_MINIAPP_URL_NOT_SET",
"⚠ Кастомная ссылка для мини-приложения не настроена",
),
show_alert=True,
)
return
connect_keyboard = InlineKeyboardMarkup(inline_keyboard=[
[
InlineKeyboardButton(
text=texts.t("CONNECT_BUTTON", "🔗 Подключиться"),
web_app=types.WebAppInfo(url=settings.MINIAPP_CUSTOM_URL),
)
],
[InlineKeyboardButton(text=texts.t("BACK_TO_MAIN_MENU_BUTTON", "⬅️ В главное меню"),
callback_data="back_to_menu")],
])
elif connect_mode == "link":
rows = [
[InlineKeyboardButton(text=texts.t("CONNECT_BUTTON", "🔗 Подключиться"), url=subscription_link)]
]
happ_row = get_happ_download_button_row(texts)
if happ_row:
rows.append(happ_row)
rows.append([InlineKeyboardButton(text=texts.t("BACK_TO_MAIN_MENU_BUTTON", "⬅️ В главное меню"),
callback_data="back_to_menu")])
connect_keyboard = InlineKeyboardMarkup(inline_keyboard=rows)
elif connect_mode == "happ_cryptolink":
rows = [
[
InlineKeyboardButton(
text=texts.t("CONNECT_BUTTON", "🔗 Подключиться"),
callback_data="open_subscription_link",
)
]
]
happ_row = get_happ_download_button_row(texts)
if happ_row:
rows.append(happ_row)
rows.append([InlineKeyboardButton(text=texts.t("BACK_TO_MAIN_MENU_BUTTON", "⬅️ В главное меню"),
callback_data="back_to_menu")])
connect_keyboard = InlineKeyboardMarkup(inline_keyboard=rows)
else:
connect_keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text=texts.t("CONNECT_BUTTON", "🔗 Подключиться"),
callback_data="subscription_connect")],
[InlineKeyboardButton(text=texts.t("BACK_TO_MAIN_MENU_BUTTON", "⬅️ В главное меню"),
callback_data="back_to_menu")],
])
await callback.message.edit_text(
success_text,
reply_markup=connect_keyboard,
parse_mode="HTML"
)
else:
purchase_text = texts.SUBSCRIPTION_PURCHASED
if discount_note:
purchase_text = f"{purchase_text}\n\n{discount_note}"
await callback.message.edit_text(
texts.t(
"SUBSCRIPTION_LINK_GENERATING_NOTICE",
"{purchase_text}\n\nСсылка генерируется, перейдите в раздел 'Моя подписка' через несколько секунд.",
).format(purchase_text=purchase_text),
result.error_message or texts.ERROR,
reply_markup=get_back_keyboard(db_user.language)
)
purchase_completed = True
logger.info(
f"Пользователь {db_user.telegram_id} купил подписку на {data['period_days']} дней за {final_price / 100}")
await callback.answer()
return
except Exception as e:
logger.error(f"Ошибка покупки подписки: {e}")
purchase_completed = result.purchase_completed
if result.message:
await callback.message.edit_text(
texts.ERROR,
reply_markup=get_back_keyboard(db_user.language)
result.message,
reply_markup=result.keyboard or get_back_keyboard(db_user.language),
parse_mode="HTML",
)
if purchase_completed:
@@ -1945,6 +2027,97 @@ async def confirm_purchase(
await state.clear()
await callback.answer()
async def attempt_auto_purchase_after_topup(
db: AsyncSession,
db_user: User,
bot: Optional[Bot],
) -> bool:
"""Пробует автоматически завершить покупку подписки после пополнения баланса."""
if not settings.is_autobuy_after_topup_enabled():
return False
cart_data = await user_cart_service.get_user_cart(db_user.id)
if not cart_data:
return False
texts = get_texts(db_user.language)
try:
_, prepared_data = await _prepare_subscription_summary(db_user, cart_data, texts)
except ValueError as error:
logger.error(
"Не удалось пересобрать сохраненную корзину пользователя %s: %s",
db_user.telegram_id,
error,
)
await user_cart_service.delete_user_cart(db_user.id)
return False
final_price = prepared_data.get('total_price', 0)
if db_user.balance_kopeks < final_price or final_price <= 0:
return False
months_in_period = prepared_data.get(
'months_in_period', calculate_months_from_days(prepared_data['period_days'])
)
promo_offer_discount_value = prepared_data.get('promo_offer_discount_value', 0)
promo_offer_discount_percent = prepared_data.get('promo_offer_discount_percent', 0)
if settings.is_traffic_fixed():
final_traffic_gb = settings.get_fixed_traffic_limit()
else:
final_traffic_gb = prepared_data.get('final_traffic_gb', prepared_data.get('traffic_gb', 0))
server_prices = prepared_data.get('server_prices_for_period', [])
result = await _complete_purchase_workflow(
db=db,
db_user=db_user,
data=prepared_data,
final_price=final_price,
final_traffic_gb=final_traffic_gb,
months_in_period=months_in_period,
promo_offer_discount_value=promo_offer_discount_value,
promo_offer_discount_percent=promo_offer_discount_percent,
server_prices=server_prices,
texts=texts,
bot=bot,
purchase_reason=f"Автопокупка подписки на {prepared_data['period_days']} дней",
)
if not result.success:
if result.error_message:
logger.warning(
"Автопокупка подписки для пользователя %s не выполнена: %s",
db_user.telegram_id,
result.error_message,
)
return False
if result.message and bot:
try:
await bot.send_message(
chat_id=db_user.telegram_id,
text=result.message,
parse_mode="HTML",
reply_markup=result.keyboard or get_back_keyboard(db_user.language),
)
except Exception as error:
logger.error(
"Не удалось отправить сообщение об автопокупке пользователю %s: %s",
db_user.telegram_id,
error,
)
if result.purchase_completed:
await clear_subscription_checkout_draft(db_user.id)
return result.purchase_completed
async def resume_subscription_checkout(
callback: types.CallbackQuery,
state: FSMContext,

View File

@@ -101,6 +101,58 @@ class PaymentCommonMixin:
db=db,
)
if settings.is_autobuy_after_topup_enabled():
full_user = user
session_for_purchase: AsyncSession | None = db
if full_user is None and db is not None:
try:
full_user = await get_user_by_telegram_id(db, telegram_id)
except Exception as fetch_error:
logger.warning(
"Не удалось получить пользователя %s из переданной сессии для автопокупки: %s",
telegram_id,
fetch_error,
)
if full_user is None:
try:
async for db_session in get_db():
try:
full_user = await get_user_by_telegram_id(db_session, telegram_id)
session_for_purchase = db_session
except Exception as fetch_error:
logger.warning(
"Не удалось получить пользователя %s из новой сессии: %s",
telegram_id,
fetch_error,
)
full_user = None
else:
break
except Exception as fetch_error:
logger.warning(
"Ошибка открытия сессии для автопокупки пользователя %s: %s",
telegram_id,
fetch_error,
)
if full_user is not None and session_for_purchase is not None:
try:
from app.handlers.subscription.purchase import attempt_auto_purchase_after_topup
await attempt_auto_purchase_after_topup(
session_for_purchase,
full_user,
getattr(self, "bot", None),
)
except Exception as auto_error:
logger.error(
"Ошибка автопокупки подписки после пополнения для пользователя %s: %s",
telegram_id,
auto_error,
)
try:
keyboard = await self.build_topup_success_keyboard(user_snapshot)

View File

@@ -198,6 +198,7 @@ class BotConfigurationService:
"BASE_SUBSCRIPTION_PRICE": "SUBSCRIPTIONS_CORE",
"DEFAULT_TRAFFIC_RESET_STRATEGY": "TRAFFIC",
"RESET_TRAFFIC_ON_PAYMENT": "TRAFFIC",
"AUTOBUY_AFTER_TOPUP_ENABLED": "PAYMENT",
"TRAFFIC_SELECTION_MODE": "TRAFFIC",
"FIXED_TRAFFIC_LIMIT_GB": "TRAFFIC",
"AVAILABLE_SUBSCRIPTION_PERIODS": "PERIODS",
@@ -474,6 +475,15 @@ class BotConfigurationService:
"warning": "Слишком малый интервал может привести к частым обращениям к платёжным API.",
"dependencies": "PAYMENT_VERIFICATION_AUTO_CHECK_ENABLED",
},
"AUTOBUY_AFTER_TOPUP_ENABLED": {
"description": (
"Включает автоматическое завершение сохранённой покупки подписки после пополнения"
" баланса, если средств стало достаточно."
),
"format": "Булево значение.",
"example": "true",
"warning": "Перед включением убедитесь, что расчёт корзины корректен и пользователи понимают поведение.",
},
"SUPPORT_TICKET_SLA_MINUTES": {
"description": "Лимит времени для ответа модераторов на тикет в минутах.",
"format": "Целое число от 1 до 1440.",

View File

@@ -0,0 +1,106 @@
import sys
from pathlib import Path
from unittest.mock import AsyncMock, patch
import pytest
sys.path.append(str(Path(__file__).resolve().parents[1]))
from app.handlers.subscription.purchase import ( # noqa: E402
attempt_auto_purchase_after_topup,
PurchaseExecutionResult,
)
from app.services.payment.common import PaymentCommonMixin # noqa: E402
from app.config import settings # noqa: E402
@pytest.mark.asyncio
async def test_attempt_auto_purchase_after_topup_disabled(monkeypatch):
monkeypatch.setattr(settings, "AUTOBUY_AFTER_TOPUP_ENABLED", False)
with patch("app.handlers.subscription.purchase.user_cart_service") as cart_service:
cart_service.get_user_cart = AsyncMock()
result = await attempt_auto_purchase_after_topup(
AsyncMock(),
AsyncMock(),
AsyncMock(),
)
assert result is False
cart_service.get_user_cart.assert_not_awaited()
@pytest.mark.asyncio
async def test_attempt_auto_purchase_after_topup_success(monkeypatch):
monkeypatch.setattr(settings, "AUTOBUY_AFTER_TOPUP_ENABLED", True)
user = AsyncMock()
user.language = "ru"
user.telegram_id = 123
user.id = 123
user.balance_kopeks = 20000
prepared_data = {
"total_price": 10000,
"period_days": 30,
"months_in_period": 1,
"final_traffic_gb": 0,
"server_prices_for_period": [],
}
bot = AsyncMock()
with patch("app.handlers.subscription.purchase.user_cart_service") as cart_service, \
patch("app.handlers.subscription.purchase._prepare_subscription_summary") as prepare_summary, \
patch("app.handlers.subscription.purchase._complete_purchase_workflow") as complete_purchase, \
patch("app.handlers.subscription.purchase.clear_subscription_checkout_draft") as clear_draft:
cart_service.get_user_cart = AsyncMock(return_value={"period_days": 30, "total_price": 10000})
prepare_summary.return_value = (None, prepared_data)
complete_purchase.return_value = PurchaseExecutionResult(
success=True,
message="done",
keyboard=None,
purchase_completed=True,
)
result = await attempt_auto_purchase_after_topup(
AsyncMock(),
user,
bot,
)
assert result is True
bot.send_message.assert_awaited()
clear_draft.assert_awaited_with(user.id)
class _DummyPayment(PaymentCommonMixin):
def __init__(self):
self.bot = AsyncMock()
async def build_topup_success_keyboard(self, user):
return AsyncMock()
@pytest.mark.asyncio
async def test_send_payment_notification_triggers_autobuy(monkeypatch):
monkeypatch.setattr(settings, "AUTOBUY_AFTER_TOPUP_ENABLED", True)
dummy = _DummyPayment()
user = AsyncMock()
user.telegram_id = 42
with patch("app.handlers.subscription.purchase.attempt_auto_purchase_after_topup", new=AsyncMock(return_value=True)) as autopurchase:
await dummy._send_payment_success_notification(
telegram_id=user.telegram_id,
amount_kopeks=1000,
user=user,
db=AsyncMock(),
payment_method_title="Test",
)
autopurchase.assert_awaited()