import logging from collections.abc import Sequence from datetime import datetime from typing import TYPE_CHECKING, Any, Optional from sqlalchemy.ext.asyncio import AsyncSession from app.config import settings if TYPE_CHECKING: # pragma: no cover from app.database.models import PromoGroup, User logger = logging.getLogger(__name__) def calculate_months_from_days(days: int) -> int: return max(1, round(days / 30)) def get_remaining_months(end_date: datetime) -> int: current_time = datetime.utcnow() if end_date <= current_time: return 1 remaining_days = (end_date - current_time).days return max(1, round(remaining_days / 30)) def calculate_period_multiplier(period_days: int) -> tuple[int, float]: exact_months = period_days / 30 months_count = max(1, round(exact_months)) logger.debug(f'Период {period_days} дней = {exact_months:.2f} точных месяцев ≈ {months_count} месяцев для расчета') return months_count, exact_months def calculate_prorated_price(monthly_price: int, end_date: datetime, min_charge_months: int = 1) -> tuple[int, int]: months_remaining = get_remaining_months(end_date) months_to_charge = max(min_charge_months, months_remaining) total_price = monthly_price * months_to_charge logger.debug( f'Расчет пропорциональной цены: {monthly_price / 100}₽/мес × {months_to_charge} мес = {total_price / 100}₽' ) return total_price, months_to_charge def apply_percentage_discount(amount: int, percent: int) -> tuple[int, int]: if amount <= 0 or percent <= 0: return amount, 0 clamped_percent = max(0, min(100, percent)) discount_value = amount * clamped_percent // 100 discounted_amount = amount - discount_value # Round the discounted price up to the nearest full ruble (100 kopeks) # to avoid undercharging users because of fractional kopeks. if discount_value >= 100 and discounted_amount % 100: discounted_amount += 100 - (discounted_amount % 100) discounted_amount = min(discounted_amount, amount) discount_value = amount - discounted_amount logger.debug( 'Применена скидка %s%%: %s → %s (скидка %s)', clamped_percent, amount, discounted_amount, discount_value, ) return discounted_amount, discount_value def resolve_discount_percent( user: Optional['User'], promo_group: Optional['PromoGroup'], category: str, *, period_days: int | None = None, ) -> int: """Определяет размер скидки для указанной категории.""" if user is not None: try: return user.get_promo_discount(category, period_days) except AttributeError: # pragma: no cover - defensive guard pass if promo_group is not None: return promo_group.get_discount_percent(category, period_days) return 0 async def compute_simple_subscription_price( db: AsyncSession, params: dict[str, Any], *, user: Optional['User'] = None, resolved_squad_uuids: Sequence[str] | None = None, ) -> tuple[int, dict[str, Any]]: """Вычисляет стоимость простой подписки с учетом всех доплат и скидок.""" period_days = int(params.get('period_days', 30) or 30) attr_name = f'PRICE_{period_days}_DAYS' base_price_original = getattr(settings, attr_name, settings.BASE_SUBSCRIPTION_PRICE) traffic_limit_raw = params.get('traffic_limit_gb') try: traffic_limit = int(traffic_limit_raw) if traffic_limit_raw is not None else None except (TypeError, ValueError): # pragma: no cover - defensive conversion traffic_limit = None if traffic_limit is None or traffic_limit <= 0: # Default simple subscriptions already include unlimited traffic. traffic_price_original = 0 else: traffic_price_original = settings.get_traffic_price(traffic_limit) device_limit_raw = params.get('device_limit', settings.DEFAULT_DEVICE_LIMIT) try: device_limit = int(device_limit_raw) except (TypeError, ValueError): # pragma: no cover - defensive conversion device_limit = settings.DEFAULT_DEVICE_LIMIT additional_devices = max(0, device_limit - settings.DEFAULT_DEVICE_LIMIT) devices_price_original = additional_devices * settings.PRICE_PER_DEVICE promo_group: PromoGroup | None = params.get('promo_group') if promo_group is None: promo_group_id = params.get('promo_group_id') if promo_group_id: from app.database.crud.promo_group import get_promo_group_by_id promo_group = await get_promo_group_by_id(db, int(promo_group_id)) if promo_group is None and user is not None: promo_group = user.get_primary_promo_group() period_discount_percent = resolve_discount_percent( user, promo_group, 'period', period_days=period_days, ) base_discount = base_price_original * period_discount_percent // 100 base_price_original - base_discount traffic_discount_percent = resolve_discount_percent( user, promo_group, 'traffic', period_days=period_days, ) traffic_discount = traffic_price_original * traffic_discount_percent // 100 traffic_price_original - traffic_discount devices_discount_percent = resolve_discount_percent( user, promo_group, 'devices', period_days=period_days, ) devices_discount = devices_price_original * devices_discount_percent // 100 devices_price_original - devices_discount servers_discount_percent = resolve_discount_percent( user, promo_group, 'servers', period_days=period_days, ) resolved_uuids: list[str] = [] if resolved_squad_uuids: resolved_uuids.extend([uuid for uuid in resolved_squad_uuids if uuid]) else: raw_squad = params.get('squad_uuid') if isinstance(raw_squad, (list, tuple, set)): resolved_uuids.extend([str(uuid) for uuid in raw_squad if uuid]) elif raw_squad: resolved_uuids.append(str(raw_squad)) from app.database.crud.server_squad import get_server_squad_by_uuid server_breakdown: list[dict[str, Any]] = [] servers_price_original = 0 servers_discount_total = 0 for squad_uuid in resolved_uuids: server = await get_server_squad_by_uuid(db, squad_uuid) if not server: logger.warning( 'SIMPLE_SUBSCRIPTION_PRICE_SERVER_NOT_FOUND | squad=%s', squad_uuid, ) server_breakdown.append( { 'uuid': squad_uuid, 'name': None, 'available': False, 'original_price': 0, 'discount': 0, 'final_price': 0, } ) continue if not server.is_available or server.is_full: logger.warning( 'SIMPLE_SUBSCRIPTION_PRICE_SERVER_UNAVAILABLE | squad=%s | available=%s | full=%s', squad_uuid, server.is_available, server.is_full, ) server_breakdown.append( { 'uuid': squad_uuid, 'name': server.display_name, 'available': False, 'original_price': 0, 'discount': 0, 'final_price': 0, } ) continue original_price = server.price_kopeks discount_value = original_price * servers_discount_percent // 100 final_price = original_price - discount_value servers_price_original += original_price servers_discount_total += discount_value server_breakdown.append( { 'uuid': squad_uuid, 'name': server.display_name, 'available': True, 'original_price': original_price, 'discount': discount_value, 'final_price': final_price, } ) total_before_discount = ( base_price_original + traffic_price_original + devices_price_original + servers_price_original ) total_discount = base_discount + traffic_discount + devices_discount + servers_discount_total total_price = max(0, total_before_discount - total_discount) breakdown = { 'base_price': base_price_original, 'base_discount': base_discount, 'traffic_price': traffic_price_original, 'traffic_discount': traffic_discount, 'devices_price': devices_price_original, 'devices_discount': devices_discount, 'servers_price': servers_price_original, 'servers_discount': servers_discount_total, 'servers_final': sum(item['final_price'] for item in server_breakdown), 'server_details': server_breakdown, 'total_before_discount': total_before_discount, 'total_discount': total_discount, 'resolved_squad_uuids': resolved_uuids, 'applied_promo_group_id': getattr(promo_group, 'id', None) if promo_group else None, 'period_discount_percent': period_discount_percent, 'traffic_discount_percent': traffic_discount_percent, 'devices_discount_percent': devices_discount_percent, 'servers_discount_percent': servers_discount_percent, } return total_price, breakdown def _pluralize_days_ru(n: int) -> str: """Склонение слова 'день' по числу: 1 день, 2 дня, 5 дней.""" mod100 = n % 100 mod10 = n % 10 if 11 <= mod100 <= 19: return 'дней' if mod10 == 1: return 'день' if 2 <= mod10 <= 4: return 'дня' return 'дней' def format_period_description(days: int, language: str = 'ru') -> str: language_code = (language or 'ru').split('-')[0].lower() if language_code in {'ru', 'fa'}: if days == 30: return '1 месяц' if days == 60: return '2 месяца' if days == 90: return '3 месяца' if days == 180: return '6 месяцев' if days == 360: return '12 месяцев' return f'{days} {_pluralize_days_ru(days)}' if days == 30: return '1 month' if days == 60: return '2 months' if days == 90: return '3 months' if days == 180: return '6 months' if days == 360: return '12 months' day_word = 'day' if days == 1 else 'days' return f'{days} {day_word}' def validate_pricing_calculation(base_price: int, monthly_additions: int, months: int, total_calculated: int) -> bool: expected_total = base_price + (monthly_additions * months) is_valid = expected_total == total_calculated if not is_valid: logger.warning( f'Несоответствие в расчете цены: ожидалось {expected_total / 100}₽, получено {total_calculated / 100}₽' ) logger.warning( f'Детали: базовая цена {base_price / 100}₽ + месячные дополнения {monthly_additions / 100}₽ × {months} мес' ) return is_valid STANDARD_PERIODS = { 14: {'months': 0.5, 'display_ru': '2 недели', 'display_en': '2 weeks'}, 30: {'months': 1, 'display_ru': '1 месяц', 'display_en': '1 month'}, 60: {'months': 2, 'display_ru': '2 месяца', 'display_en': '2 months'}, 90: {'months': 3, 'display_ru': '3 месяца', 'display_en': '3 months'}, 180: {'months': 6, 'display_ru': '6 месяцев', 'display_en': '6 months'}, 360: {'months': 12, 'display_ru': '1 год', 'display_en': '1 year'}, } def get_period_info(days: int) -> dict: return STANDARD_PERIODS.get(days)