diff --git a/app/handlers/admin/messages.py b/app/handlers/admin/messages.py index e94b3202..e7cbc802 100644 --- a/app/handlers/admin/messages.py +++ b/app/handlers/admin/messages.py @@ -6,6 +6,7 @@ from aiogram import Dispatcher, types, F from aiogram.exceptions import TelegramBadRequest, TelegramForbiddenError, TelegramRetryAfter from aiogram.fsm.context import FSMContext from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.exc import InterfaceError from sqlalchemy import select, func, and_, or_ from app.config import settings @@ -17,6 +18,7 @@ from app.database.models import ( SubscriptionStatus, BroadcastHistory, ) +from app.database.database import AsyncSessionLocal from app.keyboards.admin import ( get_admin_messages_keyboard, get_broadcast_target_keyboard, get_custom_criteria_keyboard, get_broadcast_history_keyboard, @@ -87,6 +89,56 @@ def create_broadcast_keyboard(selected_buttons: list, language: str = "ru") -> O return types.InlineKeyboardMarkup(inline_keyboard=keyboard) +async def _persist_broadcast_result( + db: AsyncSession, + broadcast_history: BroadcastHistory, + sent_count: int, + failed_count: int, + status: str, +) -> None: + """Сохраняет результаты рассылки с повторной попыткой при обрыве соединения.""" + + broadcast_history.sent_count = sent_count + broadcast_history.failed_count = failed_count + broadcast_history.status = status + broadcast_history.completed_at = datetime.utcnow() + + try: + await db.commit() + return + except InterfaceError as error: + logger.warning( + "Соединение с БД потеряно при сохранении результатов рассылки, пробуем еще раз", + exc_info=error, + ) + await db.rollback() + + try: + async with AsyncSessionLocal() as retry_session: + retry_history = await retry_session.get(BroadcastHistory, broadcast_history.id) + if not retry_history: + logger.critical( + "Не удалось найти запись BroadcastHistory #%s для повторной записи результатов", + broadcast_history.id, + ) + return + + retry_history.sent_count = sent_count + retry_history.failed_count = failed_count + retry_history.status = status + retry_history.completed_at = broadcast_history.completed_at + await retry_session.commit() + logger.info( + "Результаты рассылки успешно сохранены после повторного подключения к БД (id=%s)", + broadcast_history.id, + ) + except Exception as retry_error: + logger.critical( + "Не удалось сохранить результаты рассылки после восстановления подключения", + exc_info=retry_error, + ) + + @admin_required @error_handler async def show_messages_menu( @@ -854,11 +906,14 @@ async def confirm_broadcast( # Небольшая задержка между пакетами для снижения нагрузки на API await asyncio.sleep(0.25) - broadcast_history.sent_count = sent_count - broadcast_history.failed_count = failed_count - broadcast_history.status = "completed" if failed_count == 0 else "partial" - broadcast_history.completed_at = datetime.utcnow() - await db.commit() + status = "completed" if failed_count == 0 else "partial" + await _persist_broadcast_result( + db=db, + broadcast_history=broadcast_history, + sent_count=sent_count, + failed_count=failed_count, + status=status, + ) media_info = "" if has_media: diff --git a/app/services/payment/platega.py b/app/services/payment/platega.py index 99ebdcc7..b2bb9ab8 100644 --- a/app/services/payment/platega.py +++ b/app/services/payment/platega.py @@ -336,6 +336,10 @@ class PlategaPaymentMixin: logger.error("Пользователь %s не найден для Platega", payment.user_id) return payment + promo_group = user.get_primary_promo_group() + subscription = getattr(user, "subscription", None) + referrer_info = format_referrer_info(user) + transaction_external_id = ( str(payload.get("id")) if isinstance(payload, dict) and payload.get("id") @@ -393,10 +397,6 @@ class PlategaPaymentMixin: user.updated_at = datetime.utcnow() await db.commit() await db.refresh(user) - - promo_group = user.get_primary_promo_group() - subscription = getattr(user, "subscription", None) - referrer_info = format_referrer_info(user) topup_status = "🆕 Первое пополнение" if was_first_topup else "🔄 Пополнение" try: