diff --git a/app/cabinet/routes/balance.py b/app/cabinet/routes/balance.py index 47575599..4c109ceb 100644 --- a/app/cabinet/routes/balance.py +++ b/app/cabinet/routes/balance.py @@ -420,9 +420,9 @@ async def create_topup( ) if result: payment_url = ( - result.get('web_app_invoice_url') + result.get('bot_invoice_url') or result.get('mini_app_invoice_url') - or result.get('bot_invoice_url') + or result.get('web_app_invoice_url') ) payment_id = result.get('invoice_id') or str(result.get('local_payment_id', 'pending')) else: diff --git a/app/cabinet/routes/subscription.py b/app/cabinet/routes/subscription.py index fd3d39f9..407ffbea 100644 --- a/app/cabinet/routes/subscription.py +++ b/app/cabinet/routes/subscription.py @@ -3216,7 +3216,10 @@ async def update_countries( added_server_ids = await get_server_ids_by_uuids(db, added) if added_server_ids: await add_subscription_servers(db, user.subscription, added_server_ids, added_server_prices) - await add_user_to_servers(db, added_server_ids) + try: + await add_user_to_servers(db, added_server_ids) + except Exception as e: + logger.error(f'Ошибка обновления счётчика серверов: {e}') # Update connected squads user.subscription.connected_squads = selected_countries diff --git a/app/database/crud/server_squad.py b/app/database/crud/server_squad.py index caedf03f..553214dd 100644 --- a/app/database/crud/server_squad.py +++ b/app/database/crud/server_squad.py @@ -740,14 +740,13 @@ async def add_user_to_servers(db: AsyncSession, server_squad_ids: list[int]) -> .values(current_users=ServerSquad.current_users + 1) ) - await db.commit() + await db.flush() logger.info(f'✅ Увеличен счетчик пользователей для серверов: {server_squad_ids}') return True except Exception as e: logger.error(f'Ошибка увеличения счетчика пользователей: {e}') - await db.rollback() - return False + raise async def remove_user_from_servers(db: AsyncSession, server_squad_ids: list[int]) -> bool: @@ -759,14 +758,13 @@ async def remove_user_from_servers(db: AsyncSession, server_squad_ids: list[int] .values(current_users=func.greatest(ServerSquad.current_users - 1, 0)) ) - await db.commit() + await db.flush() logger.info(f'✅ Уменьшен счетчик пользователей для серверов: {server_squad_ids}') return True except Exception as e: logger.error(f'Ошибка уменьшения счетчика пользователей: {e}') - await db.rollback() - return False + raise async def get_server_ids_by_uuids(db: AsyncSession, squad_uuids: list[str]) -> list[int]: diff --git a/app/database/crud/subscription.py b/app/database/crud/subscription.py index c6239162..da0d8d2d 100644 --- a/app/database/crud/subscription.py +++ b/app/database/crud/subscription.py @@ -357,9 +357,8 @@ async def extend_subscription( ) # Определяем, происходит ли СМЕНА тарифа (а не продление того же) - is_tariff_change = ( - tariff_id is not None and subscription.tariff_id is not None and tariff_id != subscription.tariff_id - ) + # Включает переход из классического режима (tariff_id=None) в тарифный + is_tariff_change = tariff_id is not None and (subscription.tariff_id is None or tariff_id != subscription.tariff_id) if is_tariff_change: logger.info(f'🔄 Обнаружена СМЕНА тарифа: {subscription.tariff_id} → {tariff_id}') @@ -440,17 +439,28 @@ async def extend_subscription( if traffic_limit_gb is not None: old_traffic = subscription.traffic_limit_gb - subscription.traffic_limit_gb = traffic_limit_gb subscription.traffic_used_gb = 0.0 - # Сбрасываем все докупки трафика при смене тарифа - from sqlalchemy import delete as sql_delete - from app.database.models import TrafficPurchase + if is_tariff_change: + # При СМЕНЕ тарифа сбрасываем все докупки трафика + subscription.traffic_limit_gb = traffic_limit_gb + from sqlalchemy import delete as sql_delete - await db.execute(sql_delete(TrafficPurchase).where(TrafficPurchase.subscription_id == subscription.id)) - subscription.purchased_traffic_gb = 0 - subscription.traffic_reset_at = None # Сбрасываем дату сброса трафика - logger.info(f'📊 Обновлен лимит трафика: {old_traffic} ГБ → {traffic_limit_gb} ГБ (все докупки сброшены)') + from app.database.models import TrafficPurchase + + await db.execute(sql_delete(TrafficPurchase).where(TrafficPurchase.subscription_id == subscription.id)) + subscription.purchased_traffic_gb = 0 + subscription.traffic_reset_at = None + logger.info( + f'📊 Обновлен лимит трафика: {old_traffic} ГБ → {traffic_limit_gb} ГБ (смена тарифа, докупки сброшены)' + ) + else: + # При ПРОДЛЕНИИ того же тарифа — сохраняем докупленный трафик + purchased = subscription.purchased_traffic_gb or 0 + subscription.traffic_limit_gb = traffic_limit_gb + purchased + logger.info( + f'📊 Обновлен лимит трафика: {old_traffic} ГБ → {traffic_limit_gb + purchased} ГБ (докупки сохранены: {purchased} ГБ)' + ) elif settings.RESET_TRAFFIC_ON_PAYMENT: subscription.traffic_used_gb = 0.0 # В режиме тарифов сохраняем докупленный трафик при продлении diff --git a/app/database/models.py b/app/database/models.py index fc265fcf..8403ef37 100644 --- a/app/database/models.py +++ b/app/database/models.py @@ -1159,17 +1159,25 @@ class Subscription(Base): @property def is_active(self) -> bool: current_time = datetime.utcnow() - return self.status == SubscriptionStatus.ACTIVE.value and self.end_date > current_time + return ( + self.status == SubscriptionStatus.ACTIVE.value + and self.end_date is not None + and self.end_date > current_time + ) @property def is_expired(self) -> bool: """Проверяет, истёк ли срок подписки""" - return self.end_date <= datetime.utcnow() + return self.end_date is not None and self.end_date <= datetime.utcnow() @property def should_be_expired(self) -> bool: current_time = datetime.utcnow() - return self.status == SubscriptionStatus.ACTIVE.value and self.end_date <= current_time + return ( + self.status == SubscriptionStatus.ACTIVE.value + and self.end_date is not None + and self.end_date <= current_time + ) @property def actual_status(self) -> str: @@ -1182,12 +1190,12 @@ class Subscription(Base): return 'disabled' if self.status == SubscriptionStatus.ACTIVE.value: - if self.end_date <= current_time: + if self.end_date is None or self.end_date <= current_time: return 'expired' return 'active' if self.status == SubscriptionStatus.TRIAL.value: - if self.end_date <= current_time: + if self.end_date is None or self.end_date <= current_time: return 'expired' return 'trial' @@ -1230,6 +1238,8 @@ class Subscription(Base): @property def days_left(self) -> int: + if self.end_date is None: + return 0 current_time = datetime.utcnow() if self.end_date <= current_time: return 0 @@ -1255,11 +1265,10 @@ class Subscription(Base): @property def traffic_used_percent(self) -> float: - if self.traffic_limit_gb == 0: + if not self.traffic_limit_gb: return 0.0 - if self.traffic_limit_gb > 0: - return min((self.traffic_used_gb / self.traffic_limit_gb) * 100, 100.0) - return 0.0 + used = self.traffic_used_gb or 0.0 + return min((used / self.traffic_limit_gb) * 100, 100.0) def extend_subscription(self, days: int): if self.end_date > datetime.utcnow(): diff --git a/app/handlers/admin/updates.py b/app/handlers/admin/updates.py index af1ed887..62637e1b 100644 --- a/app/handlers/admin/updates.py +++ b/app/handlers/admin/updates.py @@ -69,6 +69,10 @@ async def show_updates_menu(callback: types.CallbackQuery, db_user: User, db: As await callback.answer() except Exception as e: + if 'message is not modified' in str(e).lower(): + logger.debug('📝 Сообщение не изменено в show_updates_menu') + await callback.answer() + return logger.error(f'Ошибка показа меню обновлений: {e}') await callback.answer('❌ Ошибка загрузки меню обновлений', show_alert=True) @@ -118,6 +122,9 @@ async def check_updates(callback: types.CallbackQuery, db_user: User, db: AsyncS await callback.message.edit_text(message, reply_markup=keyboard, parse_mode='HTML') except Exception as e: + if 'message is not modified' in str(e).lower(): + logger.debug('📝 Сообщение не изменено в check_updates') + return logger.error(f'Ошибка проверки обновлений: {e}') await callback.message.edit_text( f'❌ ОШИБКА ПРОВЕРКИ ОБНОВЛЕНИЙ\n\n' @@ -142,7 +149,6 @@ async def show_version_info(callback: types.CallbackQuery, db_user: User, db: As newer_releases = version_info['newer_releases'] has_updates = version_info['has_updates'] last_check = version_info['last_check'] - version_info['repo_url'] current_info = '📦 ТЕКУЩАЯ ВЕРСИЯ\n\n' @@ -198,6 +204,9 @@ async def show_version_info(callback: types.CallbackQuery, db_user: User, db: As ) except Exception as e: + if 'message is not modified' in str(e).lower(): + logger.debug('📝 Сообщение не изменено в show_version_info') + return logger.error(f'Ошибка получения информации о версиях: {e}') await callback.message.edit_text( f'❌ ОШИБКА ЗАГРУЗКИ\n\n' diff --git a/app/handlers/start.py b/app/handlers/start.py index 3118a703..d4b9e829 100644 --- a/app/handlers/start.py +++ b/app/handlers/start.py @@ -931,14 +931,11 @@ async def process_privacy_policy_accept(callback: types.CallbackQuery, state: FS await callback.message.edit_text( privacy_policy_required_text, reply_markup=get_privacy_policy_keyboard(language) ) + except TelegramBadRequest as e: + if 'message is not modified' not in str(e): + logger.warning(f'Ошибка при показе сообщения об отклонении политики: {e}') except Exception as e: - logger.error(f'Ошибка при показе сообщения об отклонении политики конфиденциальности: {e}') - try: - await callback.message.edit_text( - privacy_policy_required_text, reply_markup=get_privacy_policy_keyboard(language) - ) - except: - pass + logger.warning(f'Ошибка при показе сообщения об отклонении политики: {e}') logger.info(f'✅ Политика конфиденциальности обработана для пользователя {callback.from_user.id}') diff --git a/app/handlers/subscription/pricing.py b/app/handlers/subscription/pricing.py index 8213a0db..7d935dee 100644 --- a/app/handlers/subscription/pricing.py +++ b/app/handlers/subscription/pricing.py @@ -404,15 +404,16 @@ async def get_subscription_info_text(subscription, texts, db_user, db: AsyncSess status_text = '⌛ Истекла' type_text = 'Платная подписка' - if subscription.traffic_limit_gb == 0: + traffic_limit = subscription.traffic_limit_gb or 0 + if traffic_limit == 0: if settings.is_traffic_fixed(): traffic_text = '∞ Безлимитный' else: traffic_text = '∞ Безлимитный' elif settings.is_traffic_fixed(): - traffic_text = f'{subscription.traffic_limit_gb} ГБ' + traffic_text = f'{traffic_limit} ГБ' else: - traffic_text = f'{subscription.traffic_limit_gb} ГБ' + traffic_text = f'{traffic_limit} ГБ' subscription_cost = await get_subscription_cost(subscription, db) @@ -444,7 +445,7 @@ async def get_subscription_info_text(subscription, texts, db_user, db: AsyncSess info_text += f'\n💰 Стоимость подписки в месяц: {texts.format_price(subscription_cost)}' # Отображаем докупленный трафик - if subscription.traffic_limit_gb > 0: # Только для лимитированных тарифов + if (subscription.traffic_limit_gb or 0) > 0: # Только для лимитированных тарифов from datetime import datetime from sqlalchemy import select as sql_select diff --git a/app/handlers/subscription/purchase.py b/app/handlers/subscription/purchase.py index c8ffc466..723d0d4f 100644 --- a/app/handlers/subscription/purchase.py +++ b/app/handlers/subscription/purchase.py @@ -2420,26 +2420,31 @@ async def confirm_purchase(callback: types.CallbackQuery, state: FSMContext, db_ promo_offer_discount_percent = 0 # Валидация: проверяем что cached_total_price соответствует ожидаемой финальной цене - # Допускаем небольшое расхождение из-за округления (до 5%) - price_difference = abs(final_price - cached_total_price) - max_allowed_difference = max(500, int(final_price * 0.05)) # 5% или минимум 5₽ - - if price_difference > max_allowed_difference: - # Слишком большое расхождение - блокируем покупку - logger.error( - f'Критическое расхождение цены для пользователя {db_user.telegram_id}: ' - f'кэш={cached_total_price / 100}₽, пересчет={final_price / 100}₽, ' - f'разница={price_difference / 100}₽ (>{max_allowed_difference / 100}₽). ' - f'Покупка заблокирована.' - ) - await callback.answer('Цена изменилась. Пожалуйста, начните оформление заново.', show_alert=True) - return - if price_difference > 100: # допуск 1₽ - # Небольшое расхождение - логируем предупреждение но продолжаем - logger.warning( - f'Расхождение цены для пользователя {db_user.telegram_id}: ' + # Блокируем только если цена ВЫРОСЛА (пользователь переплатит). + # Если цена снизилась (промо-скидка активировалась) — разрешаем покупку по новой цене. + price_difference = final_price - cached_total_price + if price_difference > 0: + max_allowed_increase = max(500, int(final_price * 0.05)) # 5% или минимум 5₽ + if price_difference > max_allowed_increase: + logger.error( + f'Цена выросла для пользователя {db_user.telegram_id}: ' + f'кэш={cached_total_price / 100}₽, пересчет={final_price / 100}₽, ' + f'разница=+{price_difference / 100}₽ (>{max_allowed_increase / 100}₽). ' + f'Покупка заблокирована.' + ) + await callback.answer('Цена изменилась. Пожалуйста, начните оформление заново.', show_alert=True) + return + if price_difference > 100: # допуск 1₽ + logger.warning( + f'Небольшой рост цены для пользователя {db_user.telegram_id}: ' + f'кэш={cached_total_price / 100}₽, пересчет={final_price / 100}₽. ' + f'Используем пересчитанную цену.' + ) + elif price_difference < -100: # цена снизилась более чем на 1₽ + logger.info( + f'Цена снизилась для пользователя {db_user.telegram_id}: ' f'кэш={cached_total_price / 100}₽, пересчет={final_price / 100}₽. ' - f'Используем пересчитанную цену.' + f'Применяем новую цену.' ) # Используем пересчитанную цену diff --git a/app/middlewares/channel_checker.py b/app/middlewares/channel_checker.py index 505b1299..c46f0105 100644 --- a/app/middlewares/channel_checker.py +++ b/app/middlewares/channel_checker.py @@ -6,7 +6,7 @@ from typing import Any import redis.asyncio as aioredis from aiogram import BaseMiddleware, Bot, types from aiogram.enums import ChatMemberStatus -from aiogram.exceptions import TelegramBadRequest, TelegramForbiddenError +from aiogram.exceptions import TelegramBadRequest, TelegramForbiddenError, TelegramNetworkError from aiogram.fsm.context import FSMContext from aiogram.types import CallbackQuery, Message, TelegramObject, Update @@ -197,6 +197,9 @@ class ChannelCheckerMiddleware(BaseMiddleware): logger.error(f'❌ Ошибка запроса к каналу {channel_id}: {e}') await self._capture_start_payload(state, event, bot) return await self._deny_message(event, bot, channel_link, channel_id) + except TelegramNetworkError as e: + logger.warning(f'⚠️ Таймаут при проверке подписки на канал: {e}') + return await handler(event, data) except Exception as e: logger.error(f'❌ Неожиданная ошибка при проверке подписки: {e}') return await handler(event, data) diff --git a/app/services/backup_service.py b/app/services/backup_service.py index 0c32c342..c6f90fbc 100644 --- a/app/services/backup_service.py +++ b/app/services/backup_service.py @@ -7,7 +7,7 @@ import shutil import tarfile import tempfile from dataclasses import asdict, dataclass -from datetime import datetime, timedelta +from datetime import date as dt_date, datetime, time as dt_time, timedelta from pathlib import Path from typing import Any @@ -24,14 +24,41 @@ from app.database.models import ( AdvertisingCampaign, AdvertisingCampaignRegistration, BroadcastHistory, + ButtonClickLog, + CloudPaymentsPayment, + ContestAttempt, + ContestRound, + ContestTemplate, CryptoBotPayment, DiscountOffer, + FaqPage, + FaqSetting, + FreekassaPayment, + HeleketPayment, + KassaAiPayment, + MainMenuButton, + MenuLayoutHistory, MonitoringLog, MulenPayPayment, Pal24Payment, + PaymentMethodConfig, + PinnedMessage, + PlategaPayment, + Poll, + PollAnswer, + PollOption, + PollQuestion, + PollResponse, + PrivacyPolicy, PromoCode, PromoCodeUse, PromoGroup, + PromoOfferLog, + PromoOfferTemplate, + PublicOffer, + ReferralContest, + ReferralContestEvent, + ReferralContestVirtualParticipant, ReferralEarning, SentNotification, ServerSquad, @@ -39,19 +66,33 @@ from app.database.models import ( Squad, Subscription, SubscriptionConversion, + SubscriptionEvent, SubscriptionServer, + SubscriptionTemporaryAccess, SupportAuditLog, SystemSetting, Tariff, Ticket, TicketMessage, + TicketNotification, + TrafficPurchase, Transaction, User, UserMessage, + UserPromoGroup, + WataPayment, WebApiToken, + Webhook, + WebhookDelivery, WelcomeText, + WheelConfig, + WheelPrize, + WheelSpin, + WithdrawalRequest, YooKassaPayment, + payment_method_promo_groups, server_squad_promo_groups, + tariff_promo_groups, ) @@ -122,6 +163,53 @@ class BackupService: TicketMessage, SupportAuditLog, WebApiToken, + # --- Payment providers (FK: users, transactions) --- + HeleketPayment, + WataPayment, + PlategaPayment, + CloudPaymentsPayment, + FreekassaPayment, + KassaAiPayment, + # --- Settings/content --- + PaymentMethodConfig, + PrivacyPolicy, + PublicOffer, + FaqSetting, + FaqPage, + PinnedMessage, + MainMenuButton, + MenuLayoutHistory, + # --- User data (FK: users, promo_groups, subscriptions) --- + UserPromoGroup, + TrafficPurchase, + SubscriptionEvent, + SubscriptionTemporaryAccess, + PromoOfferTemplate, + PromoOfferLog, + # --- Referral/contests (FK: users) --- + WithdrawalRequest, + ReferralContest, + ReferralContestEvent, + ReferralContestVirtualParticipant, + ContestTemplate, + ContestRound, + ContestAttempt, + # --- Polls (FK chain: polls -> questions -> options -> answers) --- + Poll, + PollQuestion, + PollOption, + PollResponse, + PollAnswer, + # --- Webhooks --- + Webhook, + WebhookDelivery, + # --- Wheel (FK chain: configs -> prizes -> spins) --- + WheelConfig, + WheelPrize, + WheelSpin, + # --- Support --- + TicketNotification, + ButtonClickLog, ] self.backup_models_ordered = self._base_backup_models.copy() @@ -131,6 +219,8 @@ class BackupService: self.association_tables = { 'server_squad_promo_groups': server_squad_promo_groups, + 'tariff_promo_groups': tariff_promo_groups, + 'payment_method_promo_groups': payment_method_promo_groups, } def _load_settings(self) -> BackupSettings: @@ -538,7 +628,7 @@ class BackupService: except Exception as exc: logger.error('Ошибка при экспорте данных: %s', exc) - raise exc + raise async def _collect_files(self, staging_dir: Path, include_logs: bool) -> list[dict[str, Any]]: files_info: list[dict[str, Any]] = [] @@ -623,7 +713,7 @@ class BackupService: mode = 'r:gz' if backup_path.suffixes and backup_path.suffixes[-1] == '.gz' else 'r' with tarfile.open(backup_path, mode) as tar: - tar.extractall(temp_path) + tar.extractall(temp_path, filter='data') metadata_path = temp_path / 'metadata.json' if not metadata_path.exists(): @@ -785,20 +875,31 @@ class BackupService: logger.info('📁 Снимок директории data восстановлен') async def _restore_files(self, files_info: list[dict[str, Any]], temp_path: Path): + allowed_base = self.data_dir.resolve() + for file_info in files_info: relative_path = file_info.get('relative_path') target_path = Path(file_info.get('path', '')) if not relative_path or not target_path: continue - source_file = temp_path / relative_path + target_resolved = target_path.resolve() + if not str(target_resolved).startswith(str(allowed_base) + os.sep) and target_resolved != allowed_base: + logger.warning('Заблокирована запись за пределами data_dir: %s', target_path) + continue + + source_file = (temp_path / relative_path).resolve() + if not str(source_file).startswith(str(temp_path.resolve()) + os.sep): + logger.warning('Path traversal в relative_path: %s', relative_path) + continue + if not source_file.exists(): logger.warning('Файл %s отсутствует в архиве', relative_path) continue - target_path.parent.mkdir(parents=True, exist_ok=True) - await asyncio.to_thread(shutil.copy2, source_file, target_path) - logger.info('📁 Файл %s восстановлен', target_path) + target_resolved.parent.mkdir(parents=True, exist_ok=True) + await asyncio.to_thread(shutil.copy2, source_file, target_resolved) + logger.info('📁 Файл %s восстановлен', target_resolved) async def _restore_database_payload( self, @@ -914,7 +1015,7 @@ class BackupService: except Exception as exc: await db.rollback() logger.error('Ошибка при восстановлении: %s', exc) - raise exc + raise return restored_tables, restored_records @@ -994,10 +1095,9 @@ class BackupService: except Exception as e: logger.error(f'Ошибка при восстановлении пользователя: {e}') - await db.rollback() - raise e + raise - await db.commit() + await db.flush() logger.info('✅ Пользователи без реферальных связей восстановлены') async def _update_user_referrals(self, db: AsyncSession, backup_data: dict): @@ -1031,7 +1131,7 @@ class BackupService: logger.error(f'Ошибка при обновлении реферальной связи: {e}') continue - await db.commit() + await db.flush() logger.info('✅ Реферальные связи обновлены') def _process_record_data(self, record_data: dict, model, table_name: str) -> dict: @@ -1058,6 +1158,18 @@ class BackupService: except (ValueError, TypeError) as e: logger.warning(f'Не удалось парсить дату {value} для поля {key}: {e}') processed_data[key] = datetime.utcnow() + elif column_type_str == 'TIME' and isinstance(value, str): + try: + processed_data[key] = dt_time.fromisoformat(value) + except (ValueError, TypeError) as e: + logger.warning(f'Не удалось парсить время {value} для поля {key}: {e}') + processed_data[key] = dt_time(hour=12, minute=0) + elif column_type_str == 'DATE' and isinstance(value, str): + try: + processed_data[key] = dt_date.fromisoformat(value) + except (ValueError, TypeError) as e: + logger.warning(f'Не удалось парсить дату {value} для поля {key}: {e}') + processed_data[key] = None elif ('BOOLEAN' in column_type_str or 'BOOL' in column_type_str) and isinstance(value, str): processed_data[key] = value.lower() in ('true', '1', 'yes', 'on') elif ( @@ -1089,11 +1201,8 @@ class BackupService: return processed_data - def _get_primary_key_column(self, model) -> str | None: - for col in model.__table__.columns: - if col.primary_key: - return col.name - return None + def _get_primary_key_columns(self, model) -> list[str]: + return [col.name for col in model.__table__.columns if col.primary_key] async def _export_association_tables(self, db: AsyncSession) -> dict[str, list[dict[str, Any]]]: association_data: dict[str, list[dict[str, Any]]] = {} @@ -1119,63 +1228,60 @@ class BackupService: restored_tables = 0 restored_records = 0 - if 'server_squad_promo_groups' in association_data: - restored = await self._restore_server_squad_promo_groups( - db, association_data['server_squad_promo_groups'], clear_existing + for table_name, table_obj in self.association_tables.items(): + if table_name not in association_data: + continue + col_names = [col.name for col in table_obj.columns] + restored = await self._restore_association_table( + db, table_obj, table_name, association_data[table_name], clear_existing, col_names ) restored_tables += 1 restored_records += restored return restored_tables, restored_records - async def _restore_server_squad_promo_groups( - self, db: AsyncSession, records: list[dict[str, Any]], clear_existing: bool + async def _restore_association_table( + self, + db: AsyncSession, + table_obj, + table_name: str, + records: list[dict[str, Any]], + clear_existing: bool, + col_names: list[str], ) -> int: if not records: return 0 if clear_existing: - await db.execute(server_squad_promo_groups.delete()) + await db.execute(table_obj.delete()) restored = 0 for record in records: - server_id = record.get('server_squad_id') - promo_id = record.get('promo_group_id') + values = {col: record.get(col) for col in col_names} - if server_id is None or promo_id is None: - logger.warning('Пропущена некорректная запись server_squad_promo_groups: %s', record) + if any(v is None for v in values.values()): + logger.warning('Пропущена некорректная запись %s: %s', table_name, record) continue try: + first_col = col_names[0] exists_stmt = ( - select(server_squad_promo_groups.c.server_squad_id) - .where( - server_squad_promo_groups.c.server_squad_id == server_id, - server_squad_promo_groups.c.promo_group_id == promo_id, - ) + select(table_obj.c[first_col]) + .where(*[table_obj.c[col] == values[col] for col in col_names]) .limit(1) ) existing = await db.execute(exists_stmt) if existing.scalar_one_or_none() is not None: - logger.debug( - 'Запись server_squad_promo_groups (%s, %s) уже существует', - server_id, - promo_id, - ) + logger.debug('Запись %s %s уже существует', table_name, values) continue - await db.execute( - server_squad_promo_groups.insert().values(server_squad_id=server_id, promo_group_id=promo_id) - ) + await db.execute(table_obj.insert().values(**values)) restored += 1 except Exception as e: - logger.error( - 'Ошибка при восстановлении связи server_squad_promo_groups (%s, %s): %s', server_id, promo_id, e - ) - await db.rollback() - raise e + logger.error('Ошибка при восстановлении связи %s %s: %s', table_name, values, e) + raise return restored @@ -1205,17 +1311,16 @@ class BackupService: logger.warning(f'⚠️ Тариф {tariff_id} не найден, устанавливаем tariff_id=NULL для подписки') processed_data['tariff_id'] = None - primary_key_col = self._get_primary_key_column(model) + pk_cols = self._get_primary_key_columns(model) - if primary_key_col and primary_key_col in processed_data: - existing_record = await db.execute( - select(model).where(getattr(model, primary_key_col) == processed_data[primary_key_col]) - ) + if pk_cols and all(col in processed_data for col in pk_cols): + where_clause = [getattr(model, col) == processed_data[col] for col in pk_cols] + existing_record = await db.execute(select(model).where(*where_clause)) existing = existing_record.scalar_one_or_none() - if existing and not clear_existing: + if existing: for key, value in processed_data.items(): - if key != primary_key_col: + if key not in pk_cols: setattr(existing, key, value) else: instance = model(**processed_data) @@ -1229,17 +1334,69 @@ class BackupService: except Exception as e: logger.error(f'Ошибка восстановления записи в {table_name}: {e}') logger.error(f'Проблемные данные: {record_data}') - await db.rollback() - raise e + raise return restored_count async def _clear_database_tables(self, db: AsyncSession, backup_data: dict[str, Any] | None = None): tables_order = [ + # --- Association tables (no FK deps on them, safe to delete first) --- 'server_squad_promo_groups', + 'tariff_promo_groups', + 'payment_method_promo_groups', + # --- Polls (child -> parent order) --- + 'poll_answers', + 'poll_responses', + 'poll_options', + 'poll_questions', + 'polls', + # --- Wheel (child -> parent) --- + 'wheel_spins', + 'wheel_prizes', + 'wheel_configs', + # --- Contests (child -> parent) --- + 'contest_attempts', + 'contest_rounds', + 'contest_templates', + 'referral_contest_virtual_participants', + 'referral_contest_events', + 'referral_contests', + # --- Webhooks --- + 'webhook_deliveries', + 'webhooks', + # --- Promo offers --- + 'promo_offer_logs', + 'promo_offer_templates', + 'subscription_temporary_access', + # --- User engagement --- + 'subscription_events', + 'traffic_purchases', + 'user_promo_groups', + 'withdrawal_requests', + # --- Support extras --- + 'ticket_notifications', + 'button_click_logs', + # --- Payment providers --- + 'heleket_payments', + 'wata_payments', + 'platega_payments', + 'cloudpayments_payments', + 'freekassa_payments', + 'kassa_ai_payments', + # --- Content/config --- + 'pinned_messages', + 'main_menu_buttons', + 'menu_layout_history', + 'faq_pages', + 'faq_settings', + 'privacy_policies', + 'public_offers', + 'payment_method_configs', + # --- Original tables (preserved order) --- + 'support_audit_logs', 'ticket_messages', 'tickets', - 'support_audit_logs', + 'cabinet_refresh_tokens', 'advertising_campaign_registrations', 'advertising_campaigns', 'subscription_servers', @@ -1408,9 +1565,11 @@ class BackupService: async def delete_backup(self, backup_filename: str) -> tuple[bool, str]: try: - backup_path = self.backup_dir / backup_filename + backup_path = (self.backup_dir / backup_filename).resolve() + if not str(backup_path).startswith(str(self.backup_dir.resolve()) + os.sep): + return False, '❌ Недопустимое имя файла бекапа' - if not backup_path.exists(): + if not backup_path.is_file(): return False, f'❌ Файл бекапа не найден: {backup_filename}' backup_path.unlink() diff --git a/app/services/monitoring_service.py b/app/services/monitoring_service.py index a7f2aa6b..4df729fa 100644 --- a/app/services/monitoring_service.py +++ b/app/services/monitoring_service.py @@ -5,7 +5,7 @@ from pathlib import Path from typing import Any from aiogram.enums import ChatMemberStatus -from aiogram.exceptions import TelegramBadRequest, TelegramForbiddenError +from aiogram.exceptions import TelegramBadRequest, TelegramForbiddenError, TelegramNetworkError from sqlalchemy import and_, or_, select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload @@ -1286,6 +1286,13 @@ class MonitoringService: exc, ) return False + except TelegramNetworkError as e: + logger.warning( + 'Таймаут отправки уведомления об истечении подписки пользователю %s: %s', + user.telegram_id, + e, + ) + return False except Exception as e: logger.error( 'Ошибка отправки уведомления об истечении подписки пользователю %s: %s', @@ -1335,6 +1342,13 @@ class MonitoringService: exc, ) return False + except TelegramNetworkError as e: + logger.warning( + 'Таймаут отправки уведомления об окончании тестовой подписки пользователю %s: %s', + user.telegram_id, + e, + ) + return False except Exception as e: logger.error( 'Ошибка отправки уведомления об окончании тестовой подписки пользователю %s: %s', @@ -1410,6 +1424,13 @@ class MonitoringService: exc, ) return False + except TelegramNetworkError as e: + logger.warning( + 'Таймаут отправки уведомления об отсутствии подключения пользователю %s: %s', + user.telegram_id, + e, + ) + return False except Exception as e: logger.error( 'Ошибка отправки уведомления об отсутствии подключения пользователю %s: %s', @@ -1473,6 +1494,13 @@ class MonitoringService: exc, ) return False + except TelegramNetworkError as error: + logger.warning( + 'Таймаут отправки уведомления об отписке от канала пользователю %s: %s', + user.telegram_id, + error, + ) + return False except Exception as error: logger.error( 'Ошибка отправки уведомления об отписке от канала пользователю %s: %s', @@ -1537,6 +1565,13 @@ class MonitoringService: exc, ) return False + except TelegramNetworkError as e: + logger.warning( + 'Таймаут отправки напоминания об истекшей подписке пользователю %s: %s', + user.telegram_id, + e, + ) + return False except Exception as e: logger.error( 'Ошибка отправки напоминания об истекшей подписке пользователю %s: %s', @@ -1629,6 +1664,13 @@ class MonitoringService: exc, ) return False + except TelegramNetworkError as e: + logger.warning( + 'Таймаут отправки скидочного уведомления пользователю %s: %s', + user.telegram_id, + e, + ) + return False except Exception as e: logger.error( 'Ошибка отправки скидочного уведомления пользователю %s: %s', @@ -1653,6 +1695,12 @@ class MonitoringService: user.telegram_id, exc, ) + except TelegramNetworkError as e: + logger.warning( + 'Таймаут отправки уведомления об автоплатеже пользователю %s: %s', + user.telegram_id, + e, + ) except Exception as e: logger.error( 'Ошибка отправки уведомления об автоплатеже пользователю %s: %s', @@ -1690,6 +1738,12 @@ class MonitoringService: user.telegram_id, exc, ) + except TelegramNetworkError as e: + logger.warning( + 'Таймаут отправки уведомления о неудачном автоплатеже пользователю %s: %s', + user.telegram_id, + e, + ) except Exception as e: logger.error( 'Ошибка отправки уведомления о неудачном автоплатеже пользователю %s: %s', diff --git a/app/services/remnawave_service.py b/app/services/remnawave_service.py index 36d35ad3..08709fa4 100644 --- a/app/services/remnawave_service.py +++ b/app/services/remnawave_service.py @@ -1335,9 +1335,18 @@ class RemnaWaveService: pending_uuid_mutations.clear() try: await db.rollback() # Выполняем rollback при ошибке - except: + except Exception: pass - continue + # After rollback all ORM objects in the session are expired. + # Accessing their attributes triggers a lazy load which fails + # in async context (greenlet_spawn error). Break the loop to + # prevent cascading failures for every remaining user. + logger.warning( + '⚠️ Сессия повреждена после rollback, прерываем обработку (обработано %d/%d пользователей)', + i + 1, + len(unique_panel_users), + ) + break else: if uuid_mutation and uuid_mutation.has_changes(): diff --git a/app/services/remnawave_webhook_service.py b/app/services/remnawave_webhook_service.py index ff7f7f6e..9a62149d 100644 --- a/app/services/remnawave_webhook_service.py +++ b/app/services/remnawave_webhook_service.py @@ -16,17 +16,20 @@ from typing import Any from aiogram import Bot from aiogram.types import InlineKeyboardButton, InlineKeyboardMarkup +from sqlalchemy import delete from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.orm.exc import StaleDataError from app.database.crud.subscription import ( deactivate_subscription, + decrement_subscription_server_counts, expire_subscription, get_subscription_by_user_id, reactivate_subscription, update_subscription_usage, ) from app.database.crud.user import get_user_by_remnawave_uuid, get_user_by_telegram_id -from app.database.models import Subscription, SubscriptionStatus, User +from app.database.models import Subscription, SubscriptionServer, SubscriptionStatus, User from app.localization.texts import get_texts from app.services.admin_notification_service import AdminNotificationService from app.services.notification_delivery_service import NotificationType, notification_delivery_service @@ -164,11 +167,23 @@ class RemnaWaveWebhookService: ) return False + user_id = user.id try: await handler(db, user, subscription, data) return True + except StaleDataError: + logger.warning( + 'RemnaWave webhook %s: entity already deleted for user %s (concurrent deletion)', + event_name, + user_id, + ) + try: + await db.rollback() + except Exception: + pass + return True except Exception: - logger.exception('Error processing RemnaWave webhook event %s for user %s', event_name, user.id) + logger.exception('Error processing RemnaWave webhook event %s for user %s', event_name, user_id) try: await db.rollback() except Exception: @@ -561,20 +576,33 @@ class RemnaWaveWebhookService: ) -> None: if subscription: self._stamp_webhook_update(subscription) + + # Decrement server counters BEFORE clearing connected_squads + await decrement_subscription_server_counts(db, subscription) + if subscription.status != SubscriptionStatus.EXPIRED.value: - await expire_subscription(db, subscription) + subscription.status = SubscriptionStatus.EXPIRED.value logger.info( 'Webhook: subscription %s marked expired (user deleted in panel) for user %s', subscription.id, user.id, ) - else: - await db.commit() + + # Clear subscription data — panel user no longer exists + subscription.subscription_url = None + subscription.subscription_crypto_link = None + subscription.remnawave_short_uuid = None + subscription.connected_squads = None + subscription.updated_at = datetime.now(UTC).replace(tzinfo=None) + + # Remove SubscriptionServer link rows + await db.execute(delete(SubscriptionServer).where(SubscriptionServer.subscription_id == subscription.id)) # Clear remnawave linkage if user.remnawave_uuid: user.remnawave_uuid = None - await db.commit() + + await db.commit() await self._notify_user(user, 'WEBHOOK_SUB_DELETED', reply_markup=self._get_renew_keyboard(user)) diff --git a/app/utils/miniapp_buttons.py b/app/utils/miniapp_buttons.py index c3b5592f..c5c1dd1f 100644 --- a/app/utils/miniapp_buttons.py +++ b/app/utils/miniapp_buttons.py @@ -4,32 +4,28 @@ from aiogram.types import InlineKeyboardButton from app.config import settings -DEFAULT_UNAVAILABLE_CALLBACK = 'menu_profile_unavailable' - - def build_miniapp_or_callback_button( text: str, *, callback_data: str, - unavailable_callback: str = DEFAULT_UNAVAILABLE_CALLBACK, ) -> InlineKeyboardButton: - """Create a button that opens the miniapp in text menu mode. + """Create a button that opens the miniapp or falls back to a callback. - When the simplified text menu mode is enabled we should avoid exposing - deep bot flows and redirect the user to the configured miniapp instead. - If the miniapp URL is missing we fall back to a safe callback that shows - an alert about the unavailable profile rather than opening disabled - sections of the bot. + In text menu mode, if ``MINIAPP_CUSTOM_URL`` is configured the button + opens the full cabinet miniapp. Otherwise (or outside text menu mode) + the regular ``callback_data`` is used so the user stays in the bot. + + Only ``MINIAPP_CUSTOM_URL`` is considered here — the purchase-only URL + (``MINIAPP_PURCHASE_URL``) is intentionally excluded because it cannot + display subscription details and would load indefinitely. """ if settings.is_text_main_menu_mode(): - miniapp_url = settings.get_main_menu_miniapp_url() + miniapp_url = (settings.MINIAPP_CUSTOM_URL or '').strip() if miniapp_url: return InlineKeyboardButton( text=text, web_app=types.WebAppInfo(url=miniapp_url), ) - safe_callback = unavailable_callback or DEFAULT_UNAVAILABLE_CALLBACK - return InlineKeyboardButton(text=text, callback_data=safe_callback) return InlineKeyboardButton(text=text, callback_data=callback_data) diff --git a/app/webapi/routes/miniapp.py b/app/webapi/routes/miniapp.py index d1e3f0e5..e4b10fbc 100644 --- a/app/webapi/routes/miniapp.py +++ b/app/webapi/routes/miniapp.py @@ -5926,7 +5926,10 @@ async def update_subscription_servers_endpoint( if added_server_ids: await add_subscription_servers(db, subscription, added_server_ids, added_server_prices) - await add_user_to_servers(db, added_server_ids) + try: + await add_user_to_servers(db, added_server_ids) + except Exception as e: + logger.error(f'Ошибка обновления счётчика серверов (add): {e}') removed_server_ids = [ catalog[uuid].get('server_id') for uuid in removed if catalog[uuid].get('server_id') is not None @@ -5934,7 +5937,10 @@ async def update_subscription_servers_endpoint( if removed_server_ids: await remove_subscription_servers(db, subscription.id, removed_server_ids) - await remove_user_from_servers(db, removed_server_ids) + try: + await remove_user_from_servers(db, removed_server_ids) + except Exception as e: + logger.error(f'Ошибка обновления счётчика серверов (remove): {e}') ordered_selection = [] seen_selection = set()