Refactor promo offer sending with asyncio semaphore

Refactor promo offer sending to use asyncio semaphore for concurrent processing and batch sending.
This commit is contained in:
Egor
2025-11-05 21:52:04 +03:00
committed by GitHub
parent 1cc31729eb
commit 2a652c562b

View File

@@ -1899,63 +1899,87 @@ async def _send_offer_to_users(
sent = 0
failed = 0
for user in users:
try:
offer_record = await upsert_discount_offer(
db,
user_id=user.id,
subscription_id=user.subscription.id if user.subscription else None,
notification_type=f"promo_template_{template.id}",
discount_percent=template.discount_percent,
bonus_amount_kopeks=0,
valid_hours=template.valid_hours,
effect_type=effect_type,
extra_data={
"template_id": template.id,
"offer_type": template.offer_type,
"test_duration_hours": template.test_duration_hours,
"test_squad_uuids": template.test_squad_uuids,
"active_discount_hours": template.active_discount_hours,
},
)
# Ограничение на количество одновременных отправок
semaphore = asyncio.Semaphore(20)
user_texts = get_texts(user.language or db_user.language)
keyboard_rows: List[List[InlineKeyboardButton]] = [
[
build_miniapp_or_callback_button(
text=template.button_text,
callback_data=f"claim_discount_{offer_record.id}",
)
]
]
keyboard_rows.append([
InlineKeyboardButton(
text=user_texts.t("PROMO_OFFER_CLOSE", "❌ Закрыть"),
callback_data="promo_offer_close",
async def send_single_offer(user):
"""Отправляет одно предложение с семафором ограничения"""
async with semaphore:
try:
offer_record = await upsert_discount_offer(
db,
user_id=user.id,
subscription_id=getattr(user, "subscription", None).id if getattr(user, "subscription", None) else None,
notification_type=f"promo_template_{template.id}",
discount_percent=template.discount_percent,
bonus_amount_kopeks=0,
valid_hours=template.valid_hours,
effect_type=effect_type,
extra_data={
"template_id": template.id,
"offer_type": template.offer_type,
"test_duration_hours": template.test_duration_hours,
"test_squad_uuids": template.test_squad_uuids,
"active_discount_hours": template.active_discount_hours,
},
)
])
keyboard = InlineKeyboardMarkup(inline_keyboard=keyboard_rows)
user_texts = get_texts(user.language or db_user.language)
keyboard_rows: List[List[InlineKeyboardButton]] = [
[
build_miniapp_or_callback_button(
text=template.button_text,
callback_data=f"claim_discount_{offer_record.id}",
)
]
]
message_text = _render_template_text(
template,
user.language or db_user.language,
server_name=squad_name,
)
await bot.send_message(
chat_id=user.telegram_id,
text=message_text,
reply_markup=keyboard,
parse_mode="HTML",
)
sent += 1
except (TelegramForbiddenError, TelegramBadRequest) as exc:
logger.warning("Не удалось отправить предложение пользователю %s: %s", user.telegram_id, exc)
failed += 1
except Exception as exc: # pragma: no cover - defensive logging
logger.error("Ошибка рассылки промо предложения пользователю %s: %s", user.telegram_id, exc)
failed += 1
keyboard_rows.append([
InlineKeyboardButton(
text=user_texts.t("PROMO_OFFER_CLOSE", "❌ Закрыть"),
callback_data="promo_offer_close",
)
])
keyboard = InlineKeyboardMarkup(inline_keyboard=keyboard_rows)
message_text = _render_template_text(
template,
user.language or db_user.language,
server_name=squad_name,
)
await bot.send_message(
chat_id=user.telegram_id,
text=message_text,
reply_markup=keyboard,
parse_mode="HTML",
)
return True
except (TelegramForbiddenError, TelegramBadRequest) as exc:
logger.warning("Не удалось отправить предложение пользователю %s: %s", user.telegram_id, exc)
return False
except Exception as exc: # pragma: no cover - defensive logging
logger.error("Ошибка рассылки промо предложения пользователю %s: %s", user.telegram_id, exc)
return False
# Отправляем предложения пакетами для эффективности
batch_size = 100
for i in range(0, len(users), batch_size):
batch = users[i:i + batch_size]
tasks = [send_single_offer(user) for user in batch]
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, bool): # Успешно или неуспешно
if result:
sent += 1
else:
failed += 1
elif isinstance(result, Exception): # Ошибка выполнения задачи
failed += 1
# Небольшая задержка между пакетами для снижения нагрузки на API
await asyncio.sleep(0.1)
return sent, failed