Merge pull request #1936 from BEDOLAGA-DEV/bedolaga/fix-email-dispatch-limit-to-100000-users-yvuqli

Throttle broadcast sending for large campaigns
This commit is contained in:
Egor
2025-11-20 17:31:23 +03:00
committed by GitHub

View File

@@ -3,6 +3,7 @@ import asyncio
from datetime import datetime, timedelta
from typing import Optional
from aiogram import Dispatcher, types, F
from aiogram.exceptions import TelegramBadRequest, TelegramForbiddenError, TelegramRetryAfter
from aiogram.fsm.context import FSMContext
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func, and_, or_
@@ -764,52 +765,77 @@ async def confirm_broadcast(
broadcast_keyboard = create_broadcast_keyboard(selected_buttons, db_user.language)
# Ограничение на количество одновременных отправок
semaphore = asyncio.Semaphore(20)
# Ограничение на количество одновременных отправок и базовая задержка между сообщениями,
# чтобы избежать перегрузки бота и лимитов Telegram при больших рассылках
max_concurrent_sends = 5
per_message_delay = 0.05
semaphore = asyncio.Semaphore(max_concurrent_sends)
async def send_single_broadcast(user):
"""Отправляет одно сообщение рассылки с семафором ограничения"""
async with semaphore:
try:
if has_media and media_file_id:
if media_type == "photo":
await callback.bot.send_photo(
for attempt in range(3):
try:
if has_media and media_file_id:
if media_type == "photo":
await callback.bot.send_photo(
chat_id=user.telegram_id,
photo=media_file_id,
caption=message_text,
parse_mode="HTML",
reply_markup=broadcast_keyboard
)
elif media_type == "video":
await callback.bot.send_video(
chat_id=user.telegram_id,
video=media_file_id,
caption=message_text,
parse_mode="HTML",
reply_markup=broadcast_keyboard
)
elif media_type == "document":
await callback.bot.send_document(
chat_id=user.telegram_id,
document=media_file_id,
caption=message_text,
parse_mode="HTML",
reply_markup=broadcast_keyboard
)
else:
await callback.bot.send_message(
chat_id=user.telegram_id,
photo=media_file_id,
caption=message_text,
text=message_text,
parse_mode="HTML",
reply_markup=broadcast_keyboard
)
elif media_type == "video":
await callback.bot.send_video(
chat_id=user.telegram_id,
video=media_file_id,
caption=message_text,
parse_mode="HTML",
reply_markup=broadcast_keyboard
)
elif media_type == "document":
await callback.bot.send_document(
chat_id=user.telegram_id,
document=media_file_id,
caption=message_text,
parse_mode="HTML",
reply_markup=broadcast_keyboard
)
else:
await callback.bot.send_message(
chat_id=user.telegram_id,
text=message_text,
parse_mode="HTML",
reply_markup=broadcast_keyboard
await asyncio.sleep(per_message_delay)
return True, user.telegram_id
except TelegramRetryAfter as e:
retry_delay = min(e.retry_after + 1, 30)
logger.warning(
f"Превышен лимит Telegram для {user.telegram_id}, ожидание {retry_delay} сек."
)
return True, user.telegram_id
except Exception as e:
logger.error(f"Ошибка отправки рассылки пользователю {user.telegram_id}: {e}")
return False, user.telegram_id
await asyncio.sleep(retry_delay)
except TelegramForbiddenError:
# Пользователь мог удалить бота или запретить сообщения
logger.info(f"Рассылка недоступна для пользователя {user.telegram_id}: Forbidden")
return False, user.telegram_id
except TelegramBadRequest as e:
logger.error(
f"Некорректный запрос при рассылке пользователю {user.telegram_id}: {e}"
)
return False, user.telegram_id
except Exception as e:
logger.error(
f"Ошибка отправки рассылки пользователю {user.telegram_id} (попытка {attempt + 1}/3): {e}"
)
await asyncio.sleep(0.5 * (attempt + 1))
return False, user.telegram_id
# Отправляем сообщения пакетами для эффективности
batch_size = 100
batch_size = 50
for i in range(0, len(users), batch_size):
batch = users[i:i + batch_size]
tasks = [send_single_broadcast(user) for user in batch]
@@ -826,7 +852,7 @@ async def confirm_broadcast(
failed_count += 1
# Небольшая задержка между пакетами для снижения нагрузки на API
await asyncio.sleep(0.1)
await asyncio.sleep(0.25)
broadcast_history.sent_count = sent_count
broadcast_history.failed_count = failed_count
@@ -868,7 +894,24 @@ async def get_target_users_count(db: AsyncSession, target: str) -> int:
async def get_target_users(db: AsyncSession, target: str) -> list:
users = await get_users_list(db, offset=0, limit=10000, status=UserStatus.ACTIVE)
# Загружаем всех активных пользователей батчами, чтобы не ограничиваться 10к
users: list[User] = []
offset = 0
batch_size = 5000
while True:
batch = await get_users_list(
db,
offset=offset,
limit=batch_size,
status=UserStatus.ACTIVE,
)
if not batch:
break
users.extend(batch)
offset += batch_size
if target == "all":
return users