Merge pull request #2061 from BEDOLAGA-DEV/dev5

Dev5
This commit is contained in:
Egor
2025-11-25 10:09:29 +03:00
committed by GitHub
2 changed files with 64 additions and 9 deletions

View File

@@ -6,6 +6,7 @@ from aiogram import Dispatcher, types, F
from aiogram.exceptions import TelegramBadRequest, TelegramForbiddenError, TelegramRetryAfter
from aiogram.fsm.context import FSMContext
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.exc import InterfaceError
from sqlalchemy import select, func, and_, or_
from app.config import settings
@@ -17,6 +18,7 @@ from app.database.models import (
SubscriptionStatus,
BroadcastHistory,
)
from app.database.database import AsyncSessionLocal
from app.keyboards.admin import (
get_admin_messages_keyboard, get_broadcast_target_keyboard,
get_custom_criteria_keyboard, get_broadcast_history_keyboard,
@@ -87,6 +89,56 @@ def create_broadcast_keyboard(selected_buttons: list, language: str = "ru") -> O
return types.InlineKeyboardMarkup(inline_keyboard=keyboard)
async def _persist_broadcast_result(
db: AsyncSession,
broadcast_history: BroadcastHistory,
sent_count: int,
failed_count: int,
status: str,
) -> None:
"""Сохраняет результаты рассылки с повторной попыткой при обрыве соединения."""
broadcast_history.sent_count = sent_count
broadcast_history.failed_count = failed_count
broadcast_history.status = status
broadcast_history.completed_at = datetime.utcnow()
try:
await db.commit()
return
except InterfaceError as error:
logger.warning(
"Соединение с БД потеряно при сохранении результатов рассылки, пробуем еще раз",
exc_info=error,
)
await db.rollback()
try:
async with AsyncSessionLocal() as retry_session:
retry_history = await retry_session.get(BroadcastHistory, broadcast_history.id)
if not retry_history:
logger.critical(
"Не удалось найти запись BroadcastHistory #%s для повторной записи результатов",
broadcast_history.id,
)
return
retry_history.sent_count = sent_count
retry_history.failed_count = failed_count
retry_history.status = status
retry_history.completed_at = broadcast_history.completed_at
await retry_session.commit()
logger.info(
"Результаты рассылки успешно сохранены после повторного подключения к БД (id=%s)",
broadcast_history.id,
)
except Exception as retry_error:
logger.critical(
"Не удалось сохранить результаты рассылки после восстановления подключения",
exc_info=retry_error,
)
@admin_required
@error_handler
async def show_messages_menu(
@@ -854,11 +906,14 @@ async def confirm_broadcast(
# Небольшая задержка между пакетами для снижения нагрузки на API
await asyncio.sleep(0.25)
broadcast_history.sent_count = sent_count
broadcast_history.failed_count = failed_count
broadcast_history.status = "completed" if failed_count == 0 else "partial"
broadcast_history.completed_at = datetime.utcnow()
await db.commit()
status = "completed" if failed_count == 0 else "partial"
await _persist_broadcast_result(
db=db,
broadcast_history=broadcast_history,
sent_count=sent_count,
failed_count=failed_count,
status=status,
)
media_info = ""
if has_media:

View File

@@ -336,6 +336,10 @@ class PlategaPaymentMixin:
logger.error("Пользователь %s не найден для Platega", payment.user_id)
return payment
promo_group = user.get_primary_promo_group()
subscription = getattr(user, "subscription", None)
referrer_info = format_referrer_info(user)
transaction_external_id = (
str(payload.get("id"))
if isinstance(payload, dict) and payload.get("id")
@@ -393,10 +397,6 @@ class PlategaPaymentMixin:
user.updated_at = datetime.utcnow()
await db.commit()
await db.refresh(user)
promo_group = user.get_primary_promo_group()
subscription = getattr(user, "subscription", None)
referrer_info = format_referrer_info(user)
topup_status = "🆕 Первое пополнение" if was_first_topup else "🔄 Пополнение"
try: