Update tariff_purchase.py

This commit is contained in:
Egor
2026-01-11 04:08:48 +03:00
committed by GitHub
parent b213f7deb4
commit 752abfc6b5

View File

@@ -1357,6 +1357,493 @@ async def confirm_tariff_switch(
await callback.answer("Произошла ошибка при переключении тарифа", show_alert=True)
# ==================== Мгновенное переключение тарифов (без выбора периода) ====================
def _calculate_instant_switch_cost(
current_tariff: Tariff,
new_tariff: Tariff,
remaining_days: int,
db_user: Optional[User] = None,
) -> tuple[int, bool]:
"""
Рассчитывает стоимость мгновенного переключения тарифа.
Если новый тариф дороже - доплата пропорционально оставшимся дням.
Если дешевле или равен - бесплатно.
Returns:
(upgrade_cost_kopeks, is_upgrade)
"""
# Получаем месячные цены тарифов
current_monthly = current_tariff.get_price_for_period(30) or 0
new_monthly = new_tariff.get_price_for_period(30) or 0
# Применяем скидку промогруппы если есть
discount_percent = 0
if db_user:
discount_percent = _get_user_period_discount(db_user, 30)
if discount_percent > 0:
new_monthly = _apply_promo_discount(new_monthly, discount_percent)
# Рассчитываем разницу
price_diff = new_monthly - current_monthly
if price_diff <= 0:
# Downgrade или тот же уровень - бесплатно
return 0, False
# Upgrade - доплата пропорционально оставшимся дням
upgrade_cost = int(price_diff * remaining_days / 30)
return upgrade_cost, True
def format_instant_switch_list_text(
tariffs: List[Tariff],
current_tariff: Tariff,
remaining_days: int,
db_user: Optional[User] = None,
) -> str:
"""Форматирует текст со списком тарифов для мгновенного переключения."""
lines = [
"📦 <b>Мгновенная смена тарифа</b>",
f"📌 Текущий: <b>{current_tariff.name}</b>",
f"⏰ Осталось: <b>{remaining_days} дн.</b>",
"",
"💡 При переключении остаток дней сохраняется.",
"⬆️ Повышение тарифа = доплата за разницу",
"⬇️ Понижение = бесплатно",
"",
]
for tariff in tariffs:
if tariff.id == current_tariff.id:
continue
traffic_gb = tariff.traffic_limit_gb
traffic = "" if traffic_gb == 0 else f"{traffic_gb}ГБ"
# Рассчитываем стоимость переключения
cost, is_upgrade = _calculate_instant_switch_cost(
current_tariff, tariff, remaining_days, db_user
)
if is_upgrade:
cost_text = f"⬆️ +{_format_price_kopeks(cost, compact=True)}"
else:
cost_text = "⬇️ Бесплатно"
lines.append(f"<b>{tariff.name}</b> — {traffic}/{tariff.device_limit}📱 {cost_text}")
if tariff.description:
lines.append(f"<i>{tariff.description}</i>")
lines.append("")
return "\n".join(lines)
def get_instant_switch_keyboard(
tariffs: List[Tariff],
current_tariff: Tariff,
remaining_days: int,
language: str,
db_user: Optional[User] = None,
) -> InlineKeyboardMarkup:
"""Создает клавиатуру для мгновенного переключения тарифа."""
texts = get_texts(language)
buttons = []
for tariff in tariffs:
if tariff.id == current_tariff.id:
continue
# Рассчитываем стоимость
cost, is_upgrade = _calculate_instant_switch_cost(
current_tariff, tariff, remaining_days, db_user
)
if is_upgrade:
btn_text = f"📦 {tariff.name} (+{_format_price_kopeks(cost, compact=True)})"
else:
btn_text = f"📦 {tariff.name} (бесплатно)"
buttons.append([
InlineKeyboardButton(
text=btn_text,
callback_data=f"instant_sw_preview:{tariff.id}"
)
])
buttons.append([
InlineKeyboardButton(text=texts.BACK, callback_data="menu_subscription")
])
return InlineKeyboardMarkup(inline_keyboard=buttons)
def get_instant_switch_confirm_keyboard(
tariff_id: int,
language: str,
) -> InlineKeyboardMarkup:
"""Создает клавиатуру подтверждения мгновенного переключения."""
texts = get_texts(language)
return InlineKeyboardMarkup(inline_keyboard=[
[
InlineKeyboardButton(
text="✅ Подтвердить переключение",
callback_data=f"instant_sw_confirm:{tariff_id}"
)
],
[
InlineKeyboardButton(
text=texts.BACK,
callback_data="instant_switch"
)
]
])
def get_instant_switch_insufficient_balance_keyboard(
tariff_id: int,
language: str,
) -> InlineKeyboardMarkup:
"""Создает клавиатуру при недостаточном балансе для мгновенного переключения."""
texts = get_texts(language)
return InlineKeyboardMarkup(inline_keyboard=[
[
InlineKeyboardButton(
text="💳 Пополнить баланс",
callback_data="balance_topup"
)
],
[
InlineKeyboardButton(
text=texts.BACK,
callback_data="instant_switch"
)
]
])
@error_handler
async def show_instant_switch_list(
callback: types.CallbackQuery,
db_user: User,
db: AsyncSession,
state: FSMContext,
):
"""Показывает список тарифов для мгновенного переключения."""
from datetime import datetime
texts = get_texts(db_user.language)
await state.clear()
# Проверяем наличие активной подписки
subscription = await get_subscription_by_user_id(db, db_user.id)
if not subscription:
await callback.answer("У вас нет активной подписки", show_alert=True)
return
if not subscription.tariff_id:
await callback.answer("У вашей подписки нет тарифа", show_alert=True)
return
# Получаем текущий тариф
current_tariff = await get_tariff_by_id(db, subscription.tariff_id)
if not current_tariff:
await callback.answer("Текущий тариф не найден", show_alert=True)
return
# Рассчитываем оставшиеся дни
remaining_days = 0
if subscription.end_date:
remaining_days = max(0, (subscription.end_date - datetime.utcnow()).days)
if remaining_days == 0:
await callback.message.edit_text(
"❌ <b>Переключение недоступно</b>\n\n"
"У вашей подписки не осталось активных дней.\n"
"Используйте продление или покупку нового тарифа.",
reply_markup=InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text=texts.BACK, callback_data="menu_subscription")]
]),
parse_mode="HTML"
)
await callback.answer()
return
# Получаем доступные тарифы
promo_group_id = getattr(db_user, 'promo_group_id', None)
tariffs = await get_tariffs_for_user(db, promo_group_id)
# Фильтруем текущий тариф
available_tariffs = [t for t in tariffs if t.id != current_tariff.id]
if not available_tariffs:
await callback.message.edit_text(
"😔 <b>Нет доступных тарифов для переключения</b>\n\n"
"Вы уже используете единственный доступный тариф.",
reply_markup=InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text=texts.BACK, callback_data="menu_subscription")]
]),
parse_mode="HTML"
)
await callback.answer()
return
# Формируем текст со списком тарифов
switch_text = format_instant_switch_list_text(
tariffs, current_tariff, remaining_days, db_user
)
await callback.message.edit_text(
switch_text,
reply_markup=get_instant_switch_keyboard(
tariffs, current_tariff, remaining_days, db_user.language, db_user
),
parse_mode="HTML"
)
await state.update_data(
current_tariff_id=current_tariff.id,
remaining_days=remaining_days,
)
await callback.answer()
@error_handler
async def preview_instant_switch(
callback: types.CallbackQuery,
db_user: User,
db: AsyncSession,
state: FSMContext,
):
"""Показывает превью мгновенного переключения тарифа."""
from datetime import datetime
tariff_id = int(callback.data.split(":")[1])
new_tariff = await get_tariff_by_id(db, tariff_id)
if not new_tariff or not new_tariff.is_active:
await callback.answer("Тариф недоступен", show_alert=True)
return
# Получаем данные из состояния
data = await state.get_data()
current_tariff_id = data.get('current_tariff_id')
remaining_days = data.get('remaining_days', 0)
# Если данных нет в state, получаем заново
subscription = await get_subscription_by_user_id(db, db_user.id)
if not subscription or not subscription.tariff_id:
await callback.answer("Подписка не найдена", show_alert=True)
return
current_tariff_id = current_tariff_id or subscription.tariff_id
current_tariff = await get_tariff_by_id(db, current_tariff_id)
if not current_tariff:
await callback.answer("Текущий тариф не найден", show_alert=True)
return
if not remaining_days and subscription.end_date:
remaining_days = max(0, (subscription.end_date - datetime.utcnow()).days)
# Рассчитываем стоимость переключения
upgrade_cost, is_upgrade = _calculate_instant_switch_cost(
current_tariff, new_tariff, remaining_days, db_user
)
# Проверяем баланс
user_balance = db_user.balance_kopeks or 0
traffic = _format_traffic(new_tariff.traffic_limit_gb)
current_traffic = _format_traffic(current_tariff.traffic_limit_gb)
texts = get_texts(db_user.language)
if is_upgrade:
# Upgrade - нужна доплата
if user_balance >= upgrade_cost:
await callback.message.edit_text(
f"⬆️ <b>Повышение тарифа</b>\n\n"
f"📌 Текущий: <b>{current_tariff.name}</b>\n"
f" • Трафик: {current_traffic}\n"
f" • Устройств: {current_tariff.device_limit}\n\n"
f"📦 Новый: <b>{new_tariff.name}</b>\n"
f" • Трафик: {traffic}\n"
f" • Устройств: {new_tariff.device_limit}\n\n"
f"⏰ Осталось дней: <b>{remaining_days}</b>\n"
f"💰 <b>Доплата: {_format_price_kopeks(upgrade_cost)}</b>\n\n"
f"💳 Ваш баланс: {_format_price_kopeks(user_balance)}\n"
f"После оплаты: {_format_price_kopeks(user_balance - upgrade_cost)}",
reply_markup=get_instant_switch_confirm_keyboard(tariff_id, db_user.language),
parse_mode="HTML"
)
else:
missing = upgrade_cost - user_balance
await callback.message.edit_text(
f"❌ <b>Недостаточно средств</b>\n\n"
f"📦 Новый тариф: <b>{new_tariff.name}</b>\n"
f"💰 Требуется доплата: {_format_price_kopeks(upgrade_cost)}\n\n"
f"💳 Ваш баланс: {_format_price_kopeks(user_balance)}\n"
f"⚠️ Не хватает: <b>{_format_price_kopeks(missing)}</b>",
reply_markup=get_instant_switch_insufficient_balance_keyboard(tariff_id, db_user.language),
parse_mode="HTML"
)
else:
# Downgrade или тот же уровень - бесплатно
await callback.message.edit_text(
f"⬇️ <b>Переключение тарифа</b>\n\n"
f"📌 Текущий: <b>{current_tariff.name}</b>\n"
f" • Трафик: {current_traffic}\n"
f" • Устройств: {current_tariff.device_limit}\n\n"
f"📦 Новый: <b>{new_tariff.name}</b>\n"
f" • Трафик: {traffic}\n"
f" • Устройств: {new_tariff.device_limit}\n\n"
f"⏰ Осталось дней: <b>{remaining_days}</b>\n"
f"💰 <b>Бесплатно</b> (понижение/равный тариф)",
reply_markup=get_instant_switch_confirm_keyboard(tariff_id, db_user.language),
parse_mode="HTML"
)
await state.update_data(
switch_tariff_id=tariff_id,
upgrade_cost=upgrade_cost,
is_upgrade=is_upgrade,
current_tariff_id=current_tariff_id,
remaining_days=remaining_days,
)
await callback.answer()
@error_handler
async def confirm_instant_switch(
callback: types.CallbackQuery,
db_user: User,
db: AsyncSession,
state: FSMContext,
):
"""Подтверждает мгновенное переключение тарифа."""
from datetime import datetime, timedelta
tariff_id = int(callback.data.split(":")[1])
new_tariff = await get_tariff_by_id(db, tariff_id)
if not new_tariff or not new_tariff.is_active:
await callback.answer("Тариф недоступен", show_alert=True)
return
# Получаем данные из состояния
data = await state.get_data()
upgrade_cost = data.get('upgrade_cost', 0)
is_upgrade = data.get('is_upgrade', False)
remaining_days = data.get('remaining_days', 0)
# Проверяем подписку
subscription = await get_subscription_by_user_id(db, db_user.id)
if not subscription:
await callback.answer("Подписка не найдена", show_alert=True)
return
# Проверяем баланс если это upgrade
user_balance = db_user.balance_kopeks or 0
if is_upgrade and user_balance < upgrade_cost:
await callback.answer("Недостаточно средств на балансе", show_alert=True)
return
texts = get_texts(db_user.language)
try:
# Списываем баланс если это upgrade
if is_upgrade and upgrade_cost > 0:
success = await subtract_user_balance(
db, db_user, upgrade_cost,
f"Переключение на тариф {new_tariff.name}"
)
if not success:
await callback.answer("Ошибка списания баланса", show_alert=True)
return
# Получаем список серверов из нового тарифа
squads = new_tariff.allowed_squads or []
# Обновляем подписку с новыми параметрами тарифа
# НЕ меняем end_date - только параметры тарифа
subscription.tariff_id = new_tariff.id
subscription.traffic_limit_gb = new_tariff.traffic_limit_gb
subscription.device_limit = new_tariff.device_limit
subscription.connected_squads = squads
await db.commit()
await db.refresh(subscription)
# Обновляем пользователя в Remnawave
try:
subscription_service = SubscriptionService()
await subscription_service.create_remnawave_user(
db,
subscription,
reset_traffic=False, # Не сбрасываем трафик при переключении
reset_reason="мгновенное переключение тарифа",
)
except Exception as e:
logger.error(f"Ошибка обновления Remnawave при мгновенном переключении: {e}")
# Создаем транзакцию если была оплата
if is_upgrade and upgrade_cost > 0:
await create_transaction(
db,
user_id=db_user.id,
type=TransactionType.SUBSCRIPTION_PAYMENT,
amount_kopeks=-upgrade_cost,
description=f"Переключение на тариф {new_tariff.name}",
)
# Отправляем уведомление админу
try:
admin_notification_service = AdminNotificationService(callback.bot)
await admin_notification_service.send_subscription_purchase_notification(
db,
db_user,
subscription,
None,
remaining_days,
was_trial_conversion=False,
amount_kopeks=upgrade_cost,
)
except Exception as e:
logger.error(f"Ошибка отправки уведомления админу: {e}")
await state.clear()
traffic = _format_traffic(new_tariff.traffic_limit_gb)
if is_upgrade:
cost_text = f"💰 Списано: {_format_price_kopeks(upgrade_cost)}"
else:
cost_text = "💰 Бесплатно"
await callback.message.edit_text(
f"🎉 <b>Тариф успешно изменён!</b>\n\n"
f"📦 Новый тариф: <b>{new_tariff.name}</b>\n"
f"📊 Трафик: {traffic}\n"
f"📱 Устройств: {new_tariff.device_limit}\n"
f"⏰ Осталось дней: {remaining_days}\n"
f"{cost_text}",
reply_markup=InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="📱 Моя подписка", callback_data="menu_subscription")],
[InlineKeyboardButton(text=texts.BACK, callback_data="back_to_menu")]
]),
parse_mode="HTML"
)
await callback.answer("Тариф изменён!", show_alert=True)
except Exception as e:
logger.error(f"Ошибка при мгновенном переключении тарифа: {e}", exc_info=True)
await callback.answer("Произошла ошибка при переключении тарифа", show_alert=True)
def register_tariff_purchase_handlers(dp: Dispatcher):
"""Регистрирует обработчики покупки по тарифам."""
# Список тарифов (для режима tariffs)
@@ -1376,8 +1863,13 @@ def register_tariff_purchase_handlers(dp: Dispatcher):
dp.callback_query.register(select_tariff_extend_period, F.data.startswith("tariff_extend:"))
dp.callback_query.register(confirm_tariff_extend, F.data.startswith("tariff_ext_confirm:"))
# Переключение тарифов
# Переключение тарифов (с выбором периода)
dp.callback_query.register(show_tariff_switch_list, F.data == "tariff_switch")
dp.callback_query.register(select_tariff_switch, F.data.startswith("tariff_sw_select:"))
dp.callback_query.register(select_tariff_switch_period, F.data.startswith("tariff_sw_period:"))
dp.callback_query.register(confirm_tariff_switch, F.data.startswith("tariff_sw_confirm:"))
# Мгновенное переключение тарифов (без выбора периода)
dp.callback_query.register(show_instant_switch_list, F.data == "instant_switch")
dp.callback_query.register(preview_instant_switch, F.data.startswith("instant_sw_preview:"))
dp.callback_query.register(confirm_instant_switch, F.data.startswith("instant_sw_confirm:"))