Update poll_service.py

This commit is contained in:
Egor
2025-11-05 22:03:38 +03:00
committed by GitHub
parent 2a652c562b
commit c00d7ef4d3

View File

@@ -74,6 +74,7 @@ async def send_poll_to_users(
poll_id = poll.id
poll_snapshot = SimpleNamespace(
id=poll.id,
title=poll.title,
description=poll.description,
reward_enabled=poll.reward_enabled,
@@ -89,72 +90,84 @@ async def send_poll_to_users(
for user in users
]
for index, user in enumerate(user_snapshots, start=1):
existing_response = await db.execute(
select(PollResponse.id).where(
and_(
PollResponse.poll_id == poll_id,
PollResponse.user_id == user.id,
# Используем семафор для ограничения одновременных отправок
semaphore = asyncio.Semaphore(20)
async def send_poll_invitation(user_snapshot):
"""Отправляет приглашение к опросу одному пользователю"""
async with semaphore:
try:
existing_response = await db.execute(
select(PollResponse.id).where(
and_(
PollResponse.poll_id == poll_id,
PollResponse.user_id == user_snapshot.id,
)
)
)
)
)
if existing_response.scalar_one_or_none():
skipped += 1
continue
existing_id = existing_response.scalar_one_or_none()
if existing_id:
return "skipped"
response = PollResponse(
poll_id=poll_id,
user_id=user.id,
)
db.add(response)
try:
await db.flush()
text = _build_poll_invitation_text(poll_snapshot, user.language)
keyboard = build_start_keyboard(response.id, user.language)
await bot.send_message(
chat_id=user.telegram_id,
text=text,
reply_markup=keyboard,
parse_mode="HTML",
disable_web_page_preview=True,
)
await db.commit()
sent += 1
if index % 20 == 0:
await asyncio.sleep(1)
except TelegramBadRequest as error:
error_text = str(error).lower()
if "chat not found" in error_text or "bot was blocked by the user" in error_text:
skipped += 1
logger.info(
" Пропуск пользователя %s при отправке опроса %s: %s",
user.telegram_id,
poll_id,
error,
response = PollResponse(
poll_id=poll_id,
user_id=user_snapshot.id,
)
else: # pragma: no cover - unexpected telegram error
failed += 1
db.add(response)
await db.flush()
text = _build_poll_invitation_text(poll, user_snapshot.language)
keyboard = build_start_keyboard(response.id, user_snapshot.language)
await bot.send_message(
chat_id=user_snapshot.telegram_id,
text=text,
reply_markup=keyboard,
parse_mode="HTML",
disable_web_page_preview=True,
)
await db.commit()
return "sent"
except TelegramBadRequest as error:
error_text = str(error).lower()
if "chat not found" in error_text or "bot was blocked by the user" in error_text:
await db.rollback()
return "skipped"
else: # pragma: no cover - unexpected telegram error
await db.rollback()
return "failed"
except Exception as error: # pragma: no cover - defensive logging
await db.rollback()
logger.error(
"❌ Ошибка отправки опроса %s пользователю %s: %s",
poll_id,
user.telegram_id,
user_snapshot.telegram_id,
error,
)
await db.rollback()
except Exception as error: # pragma: no cover - defensive logging
failed += 1
logger.error(
"❌ Ошибка отправки опроса %s пользователю %s: %s",
poll_id,
user.telegram_id,
error,
)
await db.rollback()
return "failed"
# Отправляем приглашения пакетами для повышения производительности
batch_size = 100
for i in range(0, len(user_snapshots), batch_size):
batch = user_snapshots[i:i + batch_size]
tasks = [send_poll_invitation(user_snapshot) for user_snapshot in batch]
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, str): # Успешно выполненная задача
if result == "sent":
sent += 1
elif result == "failed":
failed += 1
elif result == "skipped":
skipped += 1
elif isinstance(result, Exception): # Ошибка выполнения задачи
failed += 1
# Небольшая задержка между пакетами для снижения нагрузки на API
await asyncio.sleep(0.1)
return {
"sent": sent,