Merge pull request #2046 from BEDOLAGA-DEV/dev5

Dev5
This commit is contained in:
Egor
2025-11-25 02:49:53 +03:00
committed by GitHub

View File

@@ -8,6 +8,7 @@ from typing import Optional
from aiogram import Bot
from aiogram.types import InlineKeyboardMarkup
from sqlalchemy.exc import InterfaceError, SQLAlchemyError
from app.database.database import AsyncSessionLocal
from app.database.models import BroadcastHistory
@@ -22,6 +23,8 @@ logger = logging.getLogger(__name__)
VALID_MEDIA_TYPES = {"photo", "video", "document"}
LARGE_BROADCAST_THRESHOLD = 20_000
PROGRESS_UPDATE_STEP = 5_000
@dataclass(slots=True)
@@ -137,52 +140,53 @@ class BroadcastService:
keyboard = self._build_keyboard(config.selected_buttons)
# Ограничение на количество одновременных отправок
semaphore = asyncio.Semaphore(20)
if len(recipients) > LARGE_BROADCAST_THRESHOLD:
logger.info(
"Запускаем стабильный режим рассылки для %s получателей", len(recipients)
)
(
sent_count,
failed_count,
cancelled_during_run,
) = await self._run_resilient_broadcast(
broadcast_id,
recipients,
config,
keyboard,
cancel_event,
)
else:
(
sent_count,
failed_count,
cancelled_during_run,
) = await self._run_standard_broadcast(
broadcast_id,
recipients,
config,
keyboard,
cancel_event,
)
async def send_single_message(user):
"""Отправляет одно сообщение с семафором ограничения"""
async with semaphore:
if cancel_event.is_set():
return False
if cancelled_during_run:
logger.info(
"Рассылка %s была отменена во время выполнения, финальный статус уже установлен",
broadcast_id,
)
return
telegram_id = getattr(user, "telegram_id", None)
if telegram_id is None:
return False
if cancel_event.is_set():
logger.info(
"Запрос на отмену рассылки %s пришел после завершения отправки, фиксируем итоговый статус",
broadcast_id,
)
try:
await self._deliver_message(telegram_id, config, keyboard)
return True
except Exception as exc: # noqa: BLE001
logger.error(
"Ошибка отправки рассылки %s пользователю %s: %s",
broadcast_id,
telegram_id,
exc,
)
return False
# Отправляем сообщения пакетами для эффективности
batch_size = 100
for i in range(0, len(recipients), batch_size):
if cancel_event.is_set():
await self._mark_cancelled(broadcast_id, sent_count, failed_count)
return
batch = recipients[i:i + batch_size]
tasks = [send_single_message(user) for user in batch]
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if result is True:
sent_count += 1
else:
failed_count += 1
# Небольшая задержка между пакетами для снижения нагрузки на API
await asyncio.sleep(0.1)
await self._mark_finished(broadcast_id, sent_count, failed_count, cancelled=False)
await self._mark_finished(
broadcast_id,
sent_count,
failed_count,
cancelled=False,
)
except asyncio.CancelledError:
await self._mark_cancelled(broadcast_id, sent_count, failed_count)
@@ -198,6 +202,127 @@ class BroadcastService:
return await get_custom_users(session, criteria)
return await get_target_users(session, target)
async def _run_standard_broadcast(
self,
broadcast_id: int,
recipients: list,
config: BroadcastConfig,
keyboard: Optional[InlineKeyboardMarkup],
cancel_event: asyncio.Event,
) -> tuple[int, int, bool]:
"""Базовый режим рассылки для небольших списков."""
sent_count = 0
failed_count = 0
# Ограничение на количество одновременных отправок
semaphore = asyncio.Semaphore(20)
async def send_single_message(user):
"""Отправляет одно сообщение с семафором ограничения"""
async with semaphore:
if cancel_event.is_set():
return False
telegram_id = getattr(user, "telegram_id", None)
if telegram_id is None:
return False
try:
await self._deliver_message(telegram_id, config, keyboard)
return True
except Exception as exc: # noqa: BLE001
logger.error(
"Ошибка отправки рассылки %s пользователю %s: %s",
broadcast_id,
telegram_id,
exc,
)
return False
# Отправляем сообщения пакетами для эффективности
batch_size = 100
for i in range(0, len(recipients), batch_size):
if cancel_event.is_set():
await self._mark_cancelled(broadcast_id, sent_count, failed_count)
return sent_count, failed_count, True
batch = recipients[i:i + batch_size]
tasks = [send_single_message(user) for user in batch]
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if result is True:
sent_count += 1
else:
failed_count += 1
# Небольшая задержка между пакетами для снижения нагрузки на API
await asyncio.sleep(0.1)
return sent_count, failed_count, False
async def _run_resilient_broadcast(
self,
broadcast_id: int,
recipients: list,
config: BroadcastConfig,
keyboard: Optional[InlineKeyboardMarkup],
cancel_event: asyncio.Event,
) -> tuple[int, int, bool]:
"""Режим рассылки с периодическим обновлением статуса для больших списков."""
sent_count = 0
failed_count = 0
# Ограничение на количество одновременных отправок
semaphore = asyncio.Semaphore(15)
async def send_single_message(user):
async with semaphore:
if cancel_event.is_set():
return False
telegram_id = getattr(user, "telegram_id", None)
if telegram_id is None:
return False
try:
await self._deliver_message(telegram_id, config, keyboard)
return True
except Exception as exc: # noqa: BLE001
logger.error(
"Ошибка отправки рассылки %s пользователю %s: %s",
broadcast_id,
telegram_id,
exc,
)
return False
batch_size = 100
for i in range(0, len(recipients), batch_size):
if cancel_event.is_set():
await self._mark_cancelled(broadcast_id, sent_count, failed_count)
return sent_count, failed_count, True
batch = recipients[i:i + batch_size]
tasks = [send_single_message(user) for user in batch]
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if result is True:
sent_count += 1
else:
failed_count += 1
processed = sent_count + failed_count
if processed % PROGRESS_UPDATE_STEP == 0:
await self._update_progress(broadcast_id, sent_count, failed_count)
await asyncio.sleep(0.1)
return sent_count, failed_count, False
def _build_keyboard(self, selected_buttons: Optional[list[str]]) -> Optional[InlineKeyboardMarkup]:
if selected_buttons is None:
selected_buttons = []
@@ -251,18 +376,14 @@ class BroadcastService:
*,
cancelled: bool,
) -> None:
async with AsyncSessionLocal() as session:
broadcast = await session.get(BroadcastHistory, broadcast_id)
if not broadcast:
return
broadcast.sent_count = sent_count
broadcast.failed_count = failed_count
broadcast.status = "cancelled" if cancelled else (
await self._safe_status_update(
broadcast_id,
sent_count,
failed_count,
status="cancelled" if cancelled else (
"completed" if failed_count == 0 else "partial"
)
broadcast.completed_at = datetime.utcnow()
await session.commit()
),
)
async def _mark_cancelled(
self,
@@ -283,16 +404,70 @@ class BroadcastService:
sent_count: int = 0,
failed_count: int = 0,
) -> None:
async with AsyncSessionLocal() as session:
broadcast = await session.get(BroadcastHistory, broadcast_id)
if not broadcast:
return
await self._safe_status_update(
broadcast_id,
sent_count,
failed_count,
status="failed",
)
broadcast.sent_count = sent_count
broadcast.failed_count = failed_count or broadcast.failed_count
broadcast.status = "failed"
broadcast.completed_at = datetime.utcnow()
await session.commit()
async def _update_progress(
self,
broadcast_id: int,
sent_count: int,
failed_count: int,
) -> None:
"""Периодически обновляет прогресс рассылки, чтобы держать соединение активным."""
await self._safe_status_update(
broadcast_id,
sent_count,
failed_count,
status="in_progress",
update_completed_at=False,
)
async def _safe_status_update(
self,
broadcast_id: int,
sent_count: int,
failed_count: int,
*,
status: str,
update_completed_at: bool = True,
) -> None:
attempts = 0
while attempts < 2:
try:
async with AsyncSessionLocal() as session:
broadcast = await session.get(BroadcastHistory, broadcast_id)
if not broadcast:
return
broadcast.sent_count = sent_count
broadcast.failed_count = failed_count
broadcast.status = status
if update_completed_at:
broadcast.completed_at = datetime.utcnow()
await session.commit()
return
except InterfaceError as exc:
attempts += 1
logger.warning(
"Проблемы с соединением при обновлении статуса рассылки %s: %s. Повтор %s/2",
broadcast_id,
exc,
attempts,
)
await asyncio.sleep(0.2)
except SQLAlchemyError:
logger.exception(
"Не удалось обновить статус рассылки %s", broadcast_id
)
return
broadcast_service = BroadcastService()