fix(broadcast): stabilize mass broadcast for 100k+ users

- Add real-time progress bar with updates every 500 msgs / 5 sec
- Fix Telegram rate limiting: batch=25, delay=1.0s (~25 msg/sec)
- Add global flood_wait_until to prevent semaphore slot starvation
- Add parse_mode=HTML for web API broadcasts
- Separate error handling for FloodWait, Forbidden, BadRequest
- Convert ORM objects to scalars before long broadcast operations
- Add email recipients dataclass to prevent detached ORM state
This commit is contained in:
Fringg
2026-02-05 07:10:43 +03:00
parent e8a413c3c3
commit 13ebfdb5c4
2 changed files with 396 additions and 292 deletions

View File

@@ -1146,7 +1146,10 @@ async def confirm_broadcast(callback: types.CallbackQuery, db_user: User, state:
admin_language: str = db_user.language
await safe_edit_or_send_text(
callback, '📨 Начинаю рассылку...\n\n⏳ Это может занять несколько минут.', reply_markup=None, parse_mode='HTML'
callback,
'📨 <b>Подготовка рассылки...</b>\n\n⏳ Загружаю список получателей...',
reply_markup=None,
parse_mode='HTML',
)
# Загружаем пользователей и сразу извлекаем telegram_id в список
@@ -1193,93 +1196,198 @@ async def confirm_broadcast(callback: types.CallbackQuery, db_user: User, state:
broadcast_keyboard = create_broadcast_keyboard(selected_buttons, admin_language)
# Ограничение на количество одновременных отправок и базовая задержка между сообщениями,
# чтобы избежать перегрузки бота и лимитов Telegram при больших рассылках
max_concurrent_sends = 5
per_message_delay = 0.05
semaphore = asyncio.Semaphore(max_concurrent_sends)
# =========================================================================
# Rate limiting: Telegram допускает ~30 msg/sec для бота.
# Используем batch_size=25 + 1 сек задержка между батчами = ~25 msg/sec
# с запасом, чтобы не получать FloodWait.
# Semaphore=25 — все сообщения батча отправляются параллельно.
# =========================================================================
_BATCH_SIZE = 25
_BATCH_DELAY = 1.0 # секунда между батчами
_MAX_SEND_RETRIES = 3
# Обновляем прогресс каждые N батчей (не каждое сообщение — иначе FloodWait на edit_text)
_PROGRESS_UPDATE_INTERVAL = max(1, 500 // _BATCH_SIZE) # ~каждые 500 сообщений
# Минимальный интервал между обновлениями прогресса (секунды)
_PROGRESS_MIN_INTERVAL = 5.0
async def send_single_broadcast(telegram_id: int) -> tuple[bool, int]:
"""Отправляет одно сообщение рассылки с семафором ограничения."""
async with semaphore:
for attempt in range(3):
try:
if has_media and media_file_id:
if media_type == 'photo':
await callback.bot.send_photo(
chat_id=telegram_id,
photo=media_file_id,
caption=message_text,
parse_mode='HTML',
reply_markup=broadcast_keyboard,
)
elif media_type == 'video':
await callback.bot.send_video(
chat_id=telegram_id,
video=media_file_id,
caption=message_text,
parse_mode='HTML',
reply_markup=broadcast_keyboard,
)
elif media_type == 'document':
await callback.bot.send_document(
chat_id=telegram_id,
document=media_file_id,
caption=message_text,
parse_mode='HTML',
reply_markup=broadcast_keyboard,
)
# Глобальная пауза при FloodWait — тормозим ВСЕ отправки, а не один слот семафора
flood_wait_until: float = 0.0
async def send_single_broadcast(telegram_id: int) -> bool:
"""Отправляет одно сообщение. Возвращает True при успехе."""
nonlocal flood_wait_until
for attempt in range(_MAX_SEND_RETRIES):
# Глобальная пауза при FloodWait
now = asyncio.get_event_loop().time()
if flood_wait_until > now:
await asyncio.sleep(flood_wait_until - now)
try:
if has_media and media_file_id:
send_method = {
'photo': callback.bot.send_photo,
'video': callback.bot.send_video,
'document': callback.bot.send_document,
}.get(media_type)
if send_method:
media_kwarg = {
'photo': 'photo',
'video': 'video',
'document': 'document',
}[media_type]
await send_method(
chat_id=telegram_id,
**{media_kwarg: media_file_id},
caption=message_text,
parse_mode='HTML',
reply_markup=broadcast_keyboard,
)
else:
# Неизвестный media_type — отправляем как текст
await callback.bot.send_message(
chat_id=telegram_id,
text=message_text,
parse_mode='HTML',
reply_markup=broadcast_keyboard,
)
else:
await callback.bot.send_message(
chat_id=telegram_id,
text=message_text,
parse_mode='HTML',
reply_markup=broadcast_keyboard,
)
return True
await asyncio.sleep(per_message_delay)
return True, telegram_id
except TelegramRetryAfter as e:
retry_delay = min(e.retry_after + 1, 30)
logger.warning(f'Превышен лимит Telegram для {telegram_id}, ожидание {retry_delay} сек.')
await asyncio.sleep(retry_delay)
except TelegramForbiddenError:
# Пользователь мог удалить бота или запретить сообщения
logger.info(f'Рассылка недоступна для пользователя {telegram_id}: Forbidden')
return False, telegram_id
except TelegramBadRequest as e:
logger.error(f'Некорректный запрос при рассылке пользователю {telegram_id}: {e}')
return False, telegram_id
except Exception as e:
logger.error(f'Ошибка отправки рассылки пользователю {telegram_id} (попытка {attempt + 1}/3): {e}')
except TelegramRetryAfter as e:
# Глобальная пауза — тормозим все корутины
wait_seconds = e.retry_after + 1
flood_wait_until = asyncio.get_event_loop().time() + wait_seconds
logger.warning(
'FloodWait: Telegram просит подождать %d сек (пользователь %d, попытка %d/%d)',
e.retry_after,
telegram_id,
attempt + 1,
_MAX_SEND_RETRIES,
)
await asyncio.sleep(wait_seconds)
except TelegramForbiddenError:
return False
except TelegramBadRequest as e:
logger.debug('BadRequest при рассылке пользователю %d: %s', telegram_id, e)
return False
except Exception as e:
logger.error(
'Ошибка отправки пользователю %d (попытка %d/%d): %s',
telegram_id,
attempt + 1,
_MAX_SEND_RETRIES,
e,
)
if attempt < _MAX_SEND_RETRIES - 1:
await asyncio.sleep(0.5 * (attempt + 1))
return False, telegram_id
return False
# Отправляем сообщения пакетами для эффективности
batch_size = 50
for i in range(0, len(recipient_telegram_ids), batch_size):
batch = recipient_telegram_ids[i : i + batch_size]
tasks = [send_single_broadcast(tid) for tid in batch]
results = await asyncio.gather(*tasks, return_exceptions=True)
# =========================================================================
# Прогресс-бар в реальном времени (как в сканере заблокированных)
# =========================================================================
total_recipients = len(recipient_telegram_ids)
last_progress_update: float = 0.0
# ID сообщения, которое обновляем (может быть заменено при ошибке)
progress_message = callback.message
def _build_progress_text(
current_sent: int,
current_failed: int,
total: int,
phase: str = 'sending',
) -> str:
processed = current_sent + current_failed
percent = round(processed / total * 100, 1) if total > 0 else 0
bar_length = 20
filled = int(bar_length * processed / total) if total > 0 else 0
bar = '' * filled + '' * (bar_length - filled)
if phase == 'sending':
return (
f'📨 <b>Рассылка в процессе...</b>\n\n'
f'[{bar}] {percent}%\n\n'
f'📊 <b>Прогресс:</b>\n'
f'• Отправлено: {current_sent}\n'
f'• Ошибок: {current_failed}\n'
f'• Обработано: {processed}/{total}\n\n'
f'Не закрывайте диалог — рассылка продолжается...'
)
return ''
async def _update_progress_message(current_sent: int, current_failed: int) -> None:
"""Безопасно обновляет сообщение с прогрессом."""
nonlocal last_progress_update, progress_message
now = asyncio.get_event_loop().time()
if now - last_progress_update < _PROGRESS_MIN_INTERVAL:
return
last_progress_update = now
text = _build_progress_text(current_sent, current_failed, total_recipients)
try:
await progress_message.edit_text(text, parse_mode='HTML')
except TelegramRetryAfter as e:
# Не паникуем — пропускаем обновление прогресса
logger.debug('FloodWait при обновлении прогресса, пропускаем: %d сек', e.retry_after)
except TelegramBadRequest:
# Сообщение удалено или контент не изменился — отправляем новое
try:
progress_message = await callback.bot.send_message(
chat_id=callback.message.chat.id,
text=text,
parse_mode='HTML',
)
except Exception:
pass
except Exception:
pass # Не ломаем рассылку из-за ошибок обновления прогресса
# Первое обновление прогресса
await _update_progress_message(0, 0)
# =========================================================================
# Основной цикл рассылки — батчами по _BATCH_SIZE
# =========================================================================
for batch_idx, i in enumerate(range(0, total_recipients, _BATCH_SIZE)):
batch = recipient_telegram_ids[i : i + _BATCH_SIZE]
# Отправляем батч параллельно
results = await asyncio.gather(
*[send_single_broadcast(tid) for tid in batch],
return_exceptions=True,
)
for result in results:
if isinstance(result, tuple): # (success, telegram_id)
success, _ = result
if success:
if isinstance(result, bool):
if result:
sent_count += 1
else:
failed_count += 1
elif isinstance(result, Exception):
failed_count += 1
logger.error('Необработанное исключение в рассылке: %s', result)
# Небольшая задержка между пакетами для снижения нагрузки на API
await asyncio.sleep(0.25)
# Обновляем прогресс каждые _PROGRESS_UPDATE_INTERVAL батчей
if batch_idx % _PROGRESS_UPDATE_INTERVAL == 0:
await _update_progress_message(sent_count, failed_count)
# Задержка между батчами для соблюдения rate limits
await asyncio.sleep(_BATCH_DELAY)
# Учитываем пропущенных email-only пользователей
skipped_email_users = total_users_count - len(recipient_telegram_ids)
skipped_email_users = total_users_count - total_recipients
if skipped_email_users > 0:
logger.info(f'Пропущено {skipped_email_users} email-only пользователей при рассылке')
logger.info('Пропущено %d email-only пользователей при рассылке', skipped_email_users)
status = 'completed' if failed_count == 0 else 'partial'
@@ -1291,31 +1399,25 @@ async def confirm_broadcast(callback: types.CallbackQuery, db_user: User, state:
status=status,
)
media_info = ''
if has_media:
media_info = f'\n🖼️ <b>Медиафайл:</b> {media_type}'
success_rate = round(sent_count / total_users_count * 100, 1) if total_users_count else 0
media_info = f'\n🖼️ <b>Медиафайл:</b> {media_type}' if has_media else ''
# Используем заранее сохранённое имя админа
result_text = f"""
✅ <b>Рассылка завершена!</b>
result_text = (
f'✅ <b>Рассылка завершена!</b>\n\n'
f'📊 <b>Результат:</b>\n'
f'• Отправлено: {sent_count}\n'
f'Не доставлено: {failed_count}\n'
f'Всего пользователей: {total_users_count}\n'
f'• Успешность: {success_rate}%{media_info}\n\n'
f'<b>Администратор:</b> {admin_name}'
)
📊 <b>Результат:</b>
- Отправлено: {sent_count}
- Не доставлено: {failed_count}
- Всего пользователей: {total_users_count}
- Успешность: {round(sent_count / total_users_count * 100, 1) if total_users_count else 0}%{media_info}
<b>Администратор:</b> {admin_name}
"""
back_keyboard = types.InlineKeyboardMarkup(
inline_keyboard=[[types.InlineKeyboardButton(text='📨 К рассылкам', callback_data='admin_messages')]]
)
try:
await callback.message.edit_text(
result_text,
reply_markup=types.InlineKeyboardMarkup(
inline_keyboard=[[types.InlineKeyboardButton(text='📨 К рассылкам', callback_data='admin_messages')]]
),
parse_mode='HTML',
)
await progress_message.edit_text(result_text, reply_markup=back_keyboard, parse_mode='HTML')
except TelegramBadRequest as e:
error_msg = str(e).lower()
if (
@@ -1323,15 +1425,10 @@ async def confirm_broadcast(callback: types.CallbackQuery, db_user: User, state:
or 'there is no text' in error_msg
or "message can't be edited" in error_msg
):
# Сообщение удалено или это медиа - отправляем новое
await callback.bot.send_message(
chat_id=callback.message.chat.id,
text=result_text,
reply_markup=types.InlineKeyboardMarkup(
inline_keyboard=[
[types.InlineKeyboardButton(text='📨 К рассылкам', callback_data='admin_messages')]
]
),
reply_markup=back_keyboard,
parse_mode='HTML',
)
else:
@@ -1339,7 +1436,12 @@ async def confirm_broadcast(callback: types.CallbackQuery, db_user: User, state:
await state.clear()
logger.info(
f'Рассылка выполнена админом {admin_telegram_id}: {sent_count}/{total_users_count} (медиа: {has_media})'
'Рассылка завершена админом %s: sent=%d, failed=%d, total=%d (медиа: %s)',
admin_telegram_id,
sent_count,
failed_count,
total_users_count,
has_media,
)

View File

@@ -7,6 +7,7 @@ from datetime import datetime
from typing import TYPE_CHECKING
from aiogram import Bot
from aiogram.exceptions import TelegramBadRequest, TelegramForbiddenError, TelegramRetryAfter
from aiogram.types import InlineKeyboardMarkup
from sqlalchemy.exc import InterfaceError, SQLAlchemyError
@@ -27,8 +28,18 @@ logger = logging.getLogger(__name__)
VALID_MEDIA_TYPES = {'photo', 'video', 'document'}
LARGE_BROADCAST_THRESHOLD = 20_000
PROGRESS_UPDATE_STEP = 5_000
# =========================================================================
# Telegram rate limits: ~30 msg/sec для бота.
# batch_size=25 + 1 sec delay = ~25 msg/sec с запасом.
# =========================================================================
_TG_BATCH_SIZE = 25
_TG_BATCH_DELAY = 1.0 # секунда между батчами
_TG_MAX_RETRIES = 3 # retry при FloodWait / transient errors
# Прогресс обновляется каждые ~500 сообщений ИЛИ раз в 5 секунд (что наступит раньше)
_PROGRESS_UPDATE_MESSAGES = 500
_PROGRESS_MIN_INTERVAL_SEC = 5.0
# Email broadcast rate limiting: max 8 emails per second
EMAIL_RATE_LIMIT = 8
@@ -61,6 +72,14 @@ class EmailBroadcastConfig:
initiator_name: str | None = None
@dataclass(slots=True)
class _EmailRecipient:
"""Скалярные данные получателя email (без ORM)."""
email: str
user_name: str
@dataclass(slots=True)
class _BroadcastTask:
task: asyncio.Task
@@ -136,7 +155,8 @@ class BroadcastService:
broadcast.failed_count = 0
await session.commit()
recipients = await self._fetch_recipients(config.target)
# _fetch_recipients теперь возвращает list[int] (telegram_id), а не ORM-объекты
recipient_ids: list[int] = await self._fetch_recipients(config.target)
async with AsyncSessionLocal() as session:
broadcast = await session.get(BroadcastHistory, broadcast_id)
@@ -144,45 +164,35 @@ class BroadcastService:
logger.error('Запись рассылки %s удалена до запуска', broadcast_id)
return
broadcast.total_count = len(recipients)
broadcast.total_count = len(recipient_ids)
await session.commit()
if cancel_event.is_set():
await self._mark_cancelled(broadcast_id, sent_count, failed_count)
return
if not recipients:
if not recipient_ids:
logger.info('Рассылка %s: получатели не найдены', broadcast_id)
await self._mark_finished(broadcast_id, sent_count, failed_count, cancelled=False)
return
keyboard = self._build_keyboard(config.selected_buttons)
if len(recipients) > LARGE_BROADCAST_THRESHOLD:
logger.info('Запускаем стабильный режим рассылки для %s получателей', len(recipients))
(
sent_count,
failed_count,
cancelled_during_run,
) = await self._run_resilient_broadcast(
broadcast_id,
recipients,
config,
keyboard,
cancel_event,
)
else:
(
sent_count,
failed_count,
cancelled_during_run,
) = await self._run_standard_broadcast(
broadcast_id,
recipients,
config,
keyboard,
cancel_event,
)
logger.info(
'Рассылка %s: начинаем отправку %d получателям (batch=%d, delay=%.1fs)',
broadcast_id,
len(recipient_ids),
_TG_BATCH_SIZE,
_TG_BATCH_DELAY,
)
sent_count, failed_count, cancelled_during_run = await self._send_batched(
broadcast_id,
recipient_ids,
config,
keyboard,
cancel_event,
)
if cancelled_during_run:
logger.info(
@@ -211,140 +221,125 @@ class BroadcastService:
logger.exception('Критическая ошибка при выполнении рассылки %s: %s', broadcast_id, exc)
await self._mark_failed(broadcast_id, sent_count, failed_count)
async def _fetch_recipients(self, target: str):
async def _fetch_recipients(self, target: str) -> list[int]:
"""Загружает получателей и возвращает список telegram_id (скаляры, не ORM-объекты)."""
async with AsyncSessionLocal() as session:
if target.startswith('custom_'):
criteria = target[len('custom_') :]
return await get_custom_users(session, criteria)
return await get_target_users(session, target)
users_orm = await get_custom_users(session, criteria)
else:
users_orm = await get_target_users(session, target)
async def _run_standard_broadcast(
# Извлекаем telegram_id сразу, пока сессия жива.
# После выхода из блока ORM-объекты станут detached.
return [u.telegram_id for u in users_orm if u.telegram_id is not None]
async def _send_batched(
self,
broadcast_id: int,
recipients: list,
recipient_ids: list[int],
config: BroadcastConfig,
keyboard: InlineKeyboardMarkup | None,
cancel_event: asyncio.Event,
) -> tuple[int, int, bool]:
"""Базовый режим рассылки для небольших списков."""
"""
Единый метод рассылки для любого количества получателей.
Батчинг по _TG_BATCH_SIZE сообщений с _TG_BATCH_DELAY задержкой.
Прогресс обновляется каждые _PROGRESS_UPDATE_MESSAGES сообщений.
Глобальная пауза при FloodWait.
"""
sent_count = 0
failed_count = 0
# Ограничение на количество одновременных отправок
semaphore = asyncio.Semaphore(20)
# Глобальная пауза при FloodWait — все корутины ждут
flood_wait_until: float = 0.0
last_progress_update: float = 0.0
last_progress_count: int = 0
async def send_single(telegram_id: int) -> bool:
nonlocal flood_wait_until
for attempt in range(_TG_MAX_RETRIES):
# Глобальная пауза при FloodWait
now = asyncio.get_event_loop().time()
if flood_wait_until > now:
await asyncio.sleep(flood_wait_until - now)
async def send_single_message(user):
"""Отправляет одно сообщение с семафором ограничения"""
async with semaphore:
if cancel_event.is_set():
return False
telegram_id = getattr(user, 'telegram_id', None)
if telegram_id is None:
# Email-пользователи без telegram_id - пропускаем (не считаем ошибкой)
return None
try:
await self._deliver_message(telegram_id, config, keyboard)
return True
except Exception as exc:
logger.error(
'Ошибка отправки рассылки %s пользователю %s: %s',
except TelegramRetryAfter as e:
wait_seconds = e.retry_after + 1
flood_wait_until = asyncio.get_event_loop().time() + wait_seconds
logger.warning(
'FloodWait рассылки %s: Telegram просит %d сек (user=%d, попытка %d/%d)',
broadcast_id,
e.retry_after,
telegram_id,
exc,
attempt + 1,
_TG_MAX_RETRIES,
)
await asyncio.sleep(wait_seconds)
except TelegramForbiddenError:
return False
# Отправляем сообщения пакетами для эффективности
batch_size = 100
skipped_count = 0
for i in range(0, len(recipients), batch_size):
except TelegramBadRequest:
return False
except Exception as exc:
logger.error(
'Ошибка отправки рассылки %s пользователю %d (попытка %d/%d): %s',
broadcast_id,
telegram_id,
attempt + 1,
_TG_MAX_RETRIES,
exc,
)
if attempt < _TG_MAX_RETRIES - 1:
await asyncio.sleep(0.5 * (attempt + 1))
return False
for i in range(0, len(recipient_ids), _TG_BATCH_SIZE):
if cancel_event.is_set():
await self._mark_cancelled(broadcast_id, sent_count, failed_count)
return sent_count, failed_count, True
batch = recipients[i : i + batch_size]
tasks = [send_single_message(user) for user in batch]
results = await asyncio.gather(*tasks, return_exceptions=True)
batch = recipient_ids[i : i + _TG_BATCH_SIZE]
results = await asyncio.gather(
*[send_single(tid) for tid in batch],
return_exceptions=True,
)
for result in results:
if result is True:
sent_count += 1
elif result is None:
# Email-пользователи - пропускаем без ошибки
skipped_count += 1
else:
failed_count += 1
# Небольшая задержка между пакетами для снижения нагрузки на API
await asyncio.sleep(0.1)
return sent_count, failed_count, False
async def _run_resilient_broadcast(
self,
broadcast_id: int,
recipients: list,
config: BroadcastConfig,
keyboard: InlineKeyboardMarkup | None,
cancel_event: asyncio.Event,
) -> tuple[int, int, bool]:
"""Режим рассылки с периодическим обновлением статуса для больших списков."""
sent_count = 0
failed_count = 0
# Ограничение на количество одновременных отправок
semaphore = asyncio.Semaphore(15)
async def send_single_message(user):
async with semaphore:
if cancel_event.is_set():
return False
telegram_id = getattr(user, 'telegram_id', None)
if telegram_id is None:
# Email-пользователи без telegram_id - пропускаем (не считаем ошибкой)
return None
try:
await self._deliver_message(telegram_id, config, keyboard)
return True
except Exception as exc:
logger.error(
'Ошибка отправки рассылки %s пользователю %s: %s',
broadcast_id,
telegram_id,
exc,
)
return False
batch_size = 100
for i in range(0, len(recipients), batch_size):
if cancel_event.is_set():
await self._mark_cancelled(broadcast_id, sent_count, failed_count)
return sent_count, failed_count, True
batch = recipients[i : i + batch_size]
tasks = [send_single_message(user) for user in batch]
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if result is True:
sent_count += 1
elif result is None:
# Email-пользователи - пропускаем без ошибки
pass
else:
if isinstance(result, bool):
if result:
sent_count += 1
else:
failed_count += 1
elif isinstance(result, Exception):
failed_count += 1
logger.error('Необработанное исключение в рассылке %s: %s', broadcast_id, result)
# Обновляем прогресс в БД периодически
processed = sent_count + failed_count
if processed % PROGRESS_UPDATE_STEP == 0:
now = asyncio.get_event_loop().time()
if (
processed - last_progress_count >= _PROGRESS_UPDATE_MESSAGES
or now - last_progress_update >= _PROGRESS_MIN_INTERVAL_SEC
):
await self._update_progress(broadcast_id, sent_count, failed_count)
last_progress_count = processed
last_progress_update = now
await asyncio.sleep(0.1)
# Задержка между батчами для rate limiting
await asyncio.sleep(_TG_BATCH_DELAY)
return sent_count, failed_count, False
@@ -359,37 +354,36 @@ class BroadcastService:
config: BroadcastConfig,
keyboard: InlineKeyboardMarkup | None,
) -> None:
"""
Отправляет одно сообщение.
НЕ ловит исключения — TelegramRetryAfter, TelegramForbiddenError и др.
обрабатываются в вызывающем коде (_send_batched).
"""
if not self._bot:
raise RuntimeError('Телеграм-бот не инициализирован')
if config.media and config.media.type in VALID_MEDIA_TYPES:
caption = config.media.caption or config.message_text
if config.media.type == 'photo':
await self._bot.send_photo(
chat_id=telegram_id,
photo=config.media.file_id,
caption=caption,
reply_markup=keyboard,
)
elif config.media.type == 'video':
await self._bot.send_video(
chat_id=telegram_id,
video=config.media.file_id,
caption=caption,
reply_markup=keyboard,
)
elif config.media.type == 'document':
await self._bot.send_document(
chat_id=telegram_id,
document=config.media.file_id,
caption=caption,
reply_markup=keyboard,
)
media_methods = {
'photo': ('photo', self._bot.send_photo),
'video': ('video', self._bot.send_video),
'document': ('document', self._bot.send_document),
}
kwarg_name, send_method = media_methods[config.media.type]
await send_method(
chat_id=telegram_id,
**{kwarg_name: config.media.file_id},
caption=caption,
parse_mode='HTML',
reply_markup=keyboard,
)
return
await self._bot.send_message(
chat_id=telegram_id,
text=config.message_text,
parse_mode='HTML',
reply_markup=keyboard,
)
@@ -617,8 +611,13 @@ class EmailBroadcastService:
logger.exception('Critical error in email broadcast %s: %s', broadcast_id, exc)
await self._mark_failed(broadcast_id, sent_count, failed_count)
async def _fetch_email_recipients(self, target: str) -> list:
"""Fetch email recipients based on target filter."""
async def _fetch_email_recipients(self, target: str) -> list[_EmailRecipient]:
"""
Загружает получателей email-рассылки.
Возвращает список _EmailRecipient (скалярные данные), а не ORM-объектов,
чтобы избежать detached state при долгих рассылках.
"""
from sqlalchemy import select
from app.database.models import Subscription, SubscriptionStatus, User
@@ -632,18 +631,15 @@ class EmailBroadcastService:
]
if target == 'all_email':
# All users with verified email
query = select(User).where(*base_conditions)
elif target == 'email_only':
# Only email-registered users (no telegram)
query = select(User).where(
*base_conditions,
User.auth_type == 'email',
)
elif target == 'telegram_with_email':
# Telegram users who also have email
query = select(User).where(
*base_conditions,
User.auth_type == 'telegram',
@@ -651,7 +647,6 @@ class EmailBroadcastService:
)
elif target == 'active_email':
# Email users with active subscription
query = (
select(User)
.join(Subscription, User.id == Subscription.user_id)
@@ -662,7 +657,6 @@ class EmailBroadcastService:
)
elif target == 'expired_email':
# Email users with expired subscription
query = (
select(User)
.join(Subscription, User.id == Subscription.user_id)
@@ -681,8 +675,8 @@ class EmailBroadcastService:
logger.warning('Unknown email target filter: %s', target)
return []
# Load users in batches
users: list = []
# Загружаем батчами и извлекаем скаляры сразу
recipients: list[_EmailRecipient] = []
offset = 0
batch_size = 1000
@@ -693,108 +687,116 @@ class EmailBroadcastService:
if not batch:
break
users.extend(batch)
for user in batch:
email = user.email
if not email:
continue
# Формируем имя пользователя
user_name = user.username
if not user_name:
user_name = user.first_name or ''
if last_name := user.last_name:
user_name = f'{user_name} {last_name}'.strip()
if not user_name:
user_name = email.split('@')[0]
recipients.append(_EmailRecipient(email=email, user_name=user_name))
offset += batch_size
return users
return recipients
async def _send_emails(
self,
broadcast_id: int,
recipients: list,
recipients: list[_EmailRecipient],
config: EmailBroadcastConfig,
cancel_event: asyncio.Event,
) -> tuple[int, int, bool]:
"""Send emails with rate limiting."""
"""
Отправляет email-рассылку с rate limiting.
Использует run_in_executor для синхронного SMTP, ограничивая
параллельность семафором EMAIL_RATE_LIMIT.
"""
sent_count = 0
failed_count = 0
last_progress_count = 0
last_progress_time: float = 0.0
# Semaphore for rate limiting (max EMAIL_RATE_LIMIT concurrent sends)
semaphore = asyncio.Semaphore(EMAIL_RATE_LIMIT)
async def send_single_email(user) -> bool | None:
"""Send single email with rate limiting."""
async def send_single_email(recipient: _EmailRecipient) -> bool | None:
"""Отправляет один email."""
async with semaphore:
if cancel_event.is_set():
return None
email = getattr(user, 'email', None)
if not email:
return None
# Render template with variables
html_content = self._render_template(config.email_html_content, user)
subject = self._render_template(config.email_subject, user)
html_content = self._render_template(config.email_html_content, recipient)
subject = self._render_template(config.email_subject, recipient)
try:
# Run sync email send in executor to not block event loop
loop = asyncio.get_event_loop()
success = await loop.run_in_executor(
None,
self._email_service.send_email,
email,
recipient.email,
subject,
html_content,
)
return success
except Exception as exc:
logger.error(
'Error sending email broadcast %s to %s: %s',
'Ошибка отправки email рассылки %s на %s: %s',
broadcast_id,
email,
recipient.email,
exc,
)
return False
# Process in batches
for i in range(0, len(recipients), EMAIL_BATCH_SIZE):
if cancel_event.is_set():
await self._mark_cancelled(broadcast_id, sent_count, failed_count)
return sent_count, failed_count, True
batch = recipients[i : i + EMAIL_BATCH_SIZE]
tasks = [send_single_email(user) for user in batch]
tasks = [send_single_email(r) for r in batch]
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if result is True:
sent_count += 1
elif result is None:
# Skipped (cancelled or no email)
pass
pass # Cancelled or skipped
else:
failed_count += 1
# Update progress periodically
# Обновляем прогресс периодически
processed = sent_count + failed_count
if processed % PROGRESS_UPDATE_STEP == 0 or i + EMAIL_BATCH_SIZE >= len(recipients):
now = asyncio.get_event_loop().time()
if (
processed - last_progress_count >= _PROGRESS_UPDATE_MESSAGES
or now - last_progress_time >= _PROGRESS_MIN_INTERVAL_SEC
or i + EMAIL_BATCH_SIZE >= len(recipients)
):
await self._update_progress(broadcast_id, sent_count, failed_count)
last_progress_count = processed
last_progress_time = now
# Rate limiting delay between batches (ensure ~8 emails/sec)
# Rate limiting: ~8 emails/sec
await asyncio.sleep(EMAIL_BATCH_SIZE / EMAIL_RATE_LIMIT)
return sent_count, failed_count, False
def _render_template(self, template: str, user) -> str:
"""Render template with user variables."""
@staticmethod
def _render_template(template: str, recipient: _EmailRecipient) -> str:
"""Подставляет переменные в шаблон email."""
if not template:
return template
# Get user name
user_name = getattr(user, 'username', None)
if not user_name:
user_name = getattr(user, 'first_name', None) or ''
if last_name := getattr(user, 'last_name', None):
user_name = f'{user_name} {last_name}'.strip()
if not user_name:
user_name = getattr(user, 'email', '').split('@')[0] if getattr(user, 'email', None) else 'User'
email = getattr(user, 'email', '') or ''
# Replace template variables
result = template.replace('{{user_name}}', user_name)
result = result.replace('{{email}}', email)
result = template.replace('{{user_name}}', recipient.user_name)
result = result.replace('{{email}}', recipient.email)
return result
async def _mark_finished(