diff --git a/app/services/broadcast_service.py b/app/services/broadcast_service.py index 5def4e4b..746e6b20 100644 --- a/app/services/broadcast_service.py +++ b/app/services/broadcast_service.py @@ -8,6 +8,7 @@ from typing import Optional from aiogram import Bot from aiogram.types import InlineKeyboardMarkup +from sqlalchemy.exc import InterfaceError, SQLAlchemyError from app.database.database import AsyncSessionLocal from app.database.models import BroadcastHistory @@ -22,6 +23,8 @@ logger = logging.getLogger(__name__) VALID_MEDIA_TYPES = {"photo", "video", "document"} +LARGE_BROADCAST_THRESHOLD = 20_000 +PROGRESS_UPDATE_STEP = 5_000 @dataclass(slots=True) @@ -137,52 +140,53 @@ class BroadcastService: keyboard = self._build_keyboard(config.selected_buttons) - # Ограничение на количество одновременных отправок - semaphore = asyncio.Semaphore(20) + if len(recipients) > LARGE_BROADCAST_THRESHOLD: + logger.info( + "Запускаем стабильный режим рассылки для %s получателей", len(recipients) + ) + ( + sent_count, + failed_count, + cancelled_during_run, + ) = await self._run_resilient_broadcast( + broadcast_id, + recipients, + config, + keyboard, + cancel_event, + ) + else: + ( + sent_count, + failed_count, + cancelled_during_run, + ) = await self._run_standard_broadcast( + broadcast_id, + recipients, + config, + keyboard, + cancel_event, + ) - async def send_single_message(user): - """Отправляет одно сообщение с семафором ограничения""" - async with semaphore: - if cancel_event.is_set(): - return False + if cancelled_during_run: + logger.info( + "Рассылка %s была отменена во время выполнения, финальный статус уже установлен", + broadcast_id, + ) + return - telegram_id = getattr(user, "telegram_id", None) - if telegram_id is None: - return False + if cancel_event.is_set(): + logger.info( + "Запрос на отмену рассылки %s пришел после завершения отправки, фиксируем итоговый статус", + broadcast_id, + ) - try: - await self._deliver_message(telegram_id, config, keyboard) - return True - except Exception as exc: # noqa: BLE001 - logger.error( - "Ошибка отправки рассылки %s пользователю %s: %s", - broadcast_id, - telegram_id, - exc, - ) - return False - - # Отправляем сообщения пакетами для эффективности - batch_size = 100 - for i in range(0, len(recipients), batch_size): - if cancel_event.is_set(): - await self._mark_cancelled(broadcast_id, sent_count, failed_count) - return - - batch = recipients[i:i + batch_size] - tasks = [send_single_message(user) for user in batch] - results = await asyncio.gather(*tasks, return_exceptions=True) - - for result in results: - if result is True: - sent_count += 1 - else: - failed_count += 1 - - # Небольшая задержка между пакетами для снижения нагрузки на API - await asyncio.sleep(0.1) - - await self._mark_finished(broadcast_id, sent_count, failed_count, cancelled=False) + await self._mark_finished( + broadcast_id, + sent_count, + failed_count, + cancelled=False, + ) except asyncio.CancelledError: await self._mark_cancelled(broadcast_id, sent_count, failed_count) @@ -198,6 +202,127 @@ class BroadcastService: return await get_custom_users(session, criteria) return await get_target_users(session, target) + async def _run_standard_broadcast( + self, + broadcast_id: int, + recipients: list, + config: BroadcastConfig, + keyboard: Optional[InlineKeyboardMarkup], + cancel_event: asyncio.Event, + ) -> tuple[int, int, bool]: + """Базовый режим рассылки для небольших списков.""" + + sent_count = 0 + failed_count = 0 + + # Ограничение на количество одновременных отправок + semaphore = asyncio.Semaphore(20) + + async def send_single_message(user): + """Отправляет одно сообщение с семафором ограничения""" + async with semaphore: + if cancel_event.is_set(): + return False + + telegram_id = getattr(user, "telegram_id", None) + if telegram_id is None: + return False + + try: + await self._deliver_message(telegram_id, config, keyboard) + return True + except Exception as exc: # noqa: BLE001 + logger.error( + "Ошибка отправки рассылки %s пользователю %s: %s", + broadcast_id, + telegram_id, + exc, + ) + return False + + # Отправляем сообщения пакетами для эффективности + batch_size = 100 + for i in range(0, len(recipients), batch_size): + if cancel_event.is_set(): + await self._mark_cancelled(broadcast_id, sent_count, failed_count) + return sent_count, failed_count, True + + batch = recipients[i:i + batch_size] + tasks = [send_single_message(user) for user in batch] + results = await asyncio.gather(*tasks, return_exceptions=True) + + for result in results: + if result is True: + sent_count += 1 + else: + failed_count += 1 + + # Небольшая задержка между пакетами для снижения нагрузки на API + await asyncio.sleep(0.1) + + return sent_count, failed_count, False + + async def _run_resilient_broadcast( + self, + broadcast_id: int, + recipients: list, + config: BroadcastConfig, + keyboard: Optional[InlineKeyboardMarkup], + cancel_event: asyncio.Event, + ) -> tuple[int, int, bool]: + """Режим рассылки с периодическим обновлением статуса для больших списков.""" + + sent_count = 0 + failed_count = 0 + + # Ограничение на количество одновременных отправок + semaphore = asyncio.Semaphore(15) + + async def send_single_message(user): + async with semaphore: + if cancel_event.is_set(): + return False + + telegram_id = getattr(user, "telegram_id", None) + if telegram_id is None: + return False + + try: + await self._deliver_message(telegram_id, config, keyboard) + return True + except Exception as exc: # noqa: BLE001 + logger.error( + "Ошибка отправки рассылки %s пользователю %s: %s", + broadcast_id, + telegram_id, + exc, + ) + return False + + batch_size = 100 + for i in range(0, len(recipients), batch_size): + if cancel_event.is_set(): + await self._mark_cancelled(broadcast_id, sent_count, failed_count) + return sent_count, failed_count, True + + batch = recipients[i:i + batch_size] + tasks = [send_single_message(user) for user in batch] + results = await asyncio.gather(*tasks, return_exceptions=True) + + for result in results: + if result is True: + sent_count += 1 + else: + failed_count += 1 + + processed = sent_count + failed_count + if processed % PROGRESS_UPDATE_STEP == 0: + await self._update_progress(broadcast_id, sent_count, failed_count) + + await asyncio.sleep(0.1) + + return sent_count, failed_count, False + def _build_keyboard(self, selected_buttons: Optional[list[str]]) -> Optional[InlineKeyboardMarkup]: if selected_buttons is None: selected_buttons = [] @@ -251,18 +376,14 @@ class BroadcastService: *, cancelled: bool, ) -> None: - async with AsyncSessionLocal() as session: - broadcast = await session.get(BroadcastHistory, broadcast_id) - if not broadcast: - return - - broadcast.sent_count = sent_count - broadcast.failed_count = failed_count - broadcast.status = "cancelled" if cancelled else ( + await self._safe_status_update( + broadcast_id, + sent_count, + failed_count, + status="cancelled" if cancelled else ( "completed" if failed_count == 0 else "partial" - ) - broadcast.completed_at = datetime.utcnow() - await session.commit() + ), + ) async def _mark_cancelled( self, @@ -283,16 +404,70 @@ class BroadcastService: sent_count: int = 0, failed_count: int = 0, ) -> None: - async with AsyncSessionLocal() as session: - broadcast = await session.get(BroadcastHistory, broadcast_id) - if not broadcast: - return + await self._safe_status_update( + broadcast_id, + sent_count, + failed_count, + status="failed", + ) - broadcast.sent_count = sent_count - broadcast.failed_count = failed_count or broadcast.failed_count - broadcast.status = "failed" - broadcast.completed_at = datetime.utcnow() - await session.commit() + async def _update_progress( + self, + broadcast_id: int, + sent_count: int, + failed_count: int, + ) -> None: + """Периодически обновляет прогресс рассылки, чтобы держать соединение активным.""" + + await self._safe_status_update( + broadcast_id, + sent_count, + failed_count, + status="in_progress", + update_completed_at=False, + ) + + async def _safe_status_update( + self, + broadcast_id: int, + sent_count: int, + failed_count: int, + *, + status: str, + update_completed_at: bool = True, + ) -> None: + attempts = 0 + + while attempts < 2: + try: + async with AsyncSessionLocal() as session: + broadcast = await session.get(BroadcastHistory, broadcast_id) + if not broadcast: + return + + broadcast.sent_count = sent_count + broadcast.failed_count = failed_count + broadcast.status = status + + if update_completed_at: + broadcast.completed_at = datetime.utcnow() + + await session.commit() + return + except InterfaceError as exc: + attempts += 1 + logger.warning( + "Проблемы с соединением при обновлении статуса рассылки %s: %s. Повтор %s/2", + broadcast_id, + exc, + attempts, + ) + await asyncio.sleep(0.2) + except SQLAlchemyError: + logger.exception( + "Не удалось обновить статус рассылки %s", broadcast_id + ) + return broadcast_service = BroadcastService()