From c00d7ef4d3a458c27b06bd80eeab6f941f2ea98e Mon Sep 17 00:00:00 2001 From: Egor Date: Wed, 5 Nov 2025 22:03:38 +0300 Subject: [PATCH] Update poll_service.py --- app/services/poll_service.py | 129 +++++++++++++++++++---------------- 1 file changed, 71 insertions(+), 58 deletions(-) diff --git a/app/services/poll_service.py b/app/services/poll_service.py index 75ee3368..e2b02193 100644 --- a/app/services/poll_service.py +++ b/app/services/poll_service.py @@ -74,6 +74,7 @@ async def send_poll_to_users( poll_id = poll.id poll_snapshot = SimpleNamespace( + id=poll.id, title=poll.title, description=poll.description, reward_enabled=poll.reward_enabled, @@ -89,72 +90,84 @@ async def send_poll_to_users( for user in users ] - for index, user in enumerate(user_snapshots, start=1): - existing_response = await db.execute( - select(PollResponse.id).where( - and_( - PollResponse.poll_id == poll_id, - PollResponse.user_id == user.id, + # Используем семафор для ограничения одновременных отправок + semaphore = asyncio.Semaphore(20) + + async def send_poll_invitation(user_snapshot): + """Отправляет приглашение к опросу одному пользователю""" + async with semaphore: + try: + existing_response = await db.execute( + select(PollResponse.id).where( + and_( + PollResponse.poll_id == poll_id, + PollResponse.user_id == user_snapshot.id, + ) + ) ) - ) - ) - if existing_response.scalar_one_or_none(): - skipped += 1 - continue + existing_id = existing_response.scalar_one_or_none() + if existing_id: + return "skipped" - response = PollResponse( - poll_id=poll_id, - user_id=user.id, - ) - db.add(response) - - try: - await db.flush() - - text = _build_poll_invitation_text(poll_snapshot, user.language) - keyboard = build_start_keyboard(response.id, user.language) - - await bot.send_message( - chat_id=user.telegram_id, - text=text, - reply_markup=keyboard, - parse_mode="HTML", - disable_web_page_preview=True, - ) - - await db.commit() - sent += 1 - - if index % 20 == 0: - await asyncio.sleep(1) - except TelegramBadRequest as error: - error_text = str(error).lower() - if "chat not found" in error_text or "bot was blocked by the user" in error_text: - skipped += 1 - logger.info( - "ℹ️ Пропуск пользователя %s при отправке опроса %s: %s", - user.telegram_id, - poll_id, - error, + response = PollResponse( + poll_id=poll_id, + user_id=user_snapshot.id, ) - else: # pragma: no cover - unexpected telegram error - failed += 1 + db.add(response) + + await db.flush() + + text = _build_poll_invitation_text(poll, user_snapshot.language) + keyboard = build_start_keyboard(response.id, user_snapshot.language) + + await bot.send_message( + chat_id=user_snapshot.telegram_id, + text=text, + reply_markup=keyboard, + parse_mode="HTML", + disable_web_page_preview=True, + ) + + await db.commit() + return "sent" + except TelegramBadRequest as error: + error_text = str(error).lower() + if "chat not found" in error_text or "bot was blocked by the user" in error_text: + await db.rollback() + return "skipped" + else: # pragma: no cover - unexpected telegram error + await db.rollback() + return "failed" + except Exception as error: # pragma: no cover - defensive logging + await db.rollback() logger.error( "❌ Ошибка отправки опроса %s пользователю %s: %s", poll_id, - user.telegram_id, + user_snapshot.telegram_id, error, ) - await db.rollback() - except Exception as error: # pragma: no cover - defensive logging - failed += 1 - logger.error( - "❌ Ошибка отправки опроса %s пользователю %s: %s", - poll_id, - user.telegram_id, - error, - ) - await db.rollback() + return "failed" + + # Отправляем приглашения пакетами для повышения производительности + batch_size = 100 + for i in range(0, len(user_snapshots), batch_size): + batch = user_snapshots[i:i + batch_size] + tasks = [send_poll_invitation(user_snapshot) for user_snapshot in batch] + results = await asyncio.gather(*tasks, return_exceptions=True) + + for result in results: + if isinstance(result, str): # Успешно выполненная задача + if result == "sent": + sent += 1 + elif result == "failed": + failed += 1 + elif result == "skipped": + skipped += 1 + elif isinstance(result, Exception): # Ошибка выполнения задачи + failed += 1 + + # Небольшая задержка между пакетами для снижения нагрузки на API + await asyncio.sleep(0.1) return { "sent": sent,