diff --git a/app/handlers/admin/promo_offers.py b/app/handlers/admin/promo_offers.py index 577c51e2..f5fd938a 100644 --- a/app/handlers/admin/promo_offers.py +++ b/app/handlers/admin/promo_offers.py @@ -1899,63 +1899,87 @@ async def _send_offer_to_users( sent = 0 failed = 0 - for user in users: - try: - offer_record = await upsert_discount_offer( - db, - user_id=user.id, - subscription_id=user.subscription.id if user.subscription else None, - notification_type=f"promo_template_{template.id}", - discount_percent=template.discount_percent, - bonus_amount_kopeks=0, - valid_hours=template.valid_hours, - effect_type=effect_type, - extra_data={ - "template_id": template.id, - "offer_type": template.offer_type, - "test_duration_hours": template.test_duration_hours, - "test_squad_uuids": template.test_squad_uuids, - "active_discount_hours": template.active_discount_hours, - }, - ) + # Ограничение на количество одновременных отправок + semaphore = asyncio.Semaphore(20) - user_texts = get_texts(user.language or db_user.language) - keyboard_rows: List[List[InlineKeyboardButton]] = [ - [ - build_miniapp_or_callback_button( - text=template.button_text, - callback_data=f"claim_discount_{offer_record.id}", - ) - ] - ] - - keyboard_rows.append([ - InlineKeyboardButton( - text=user_texts.t("PROMO_OFFER_CLOSE", "❌ Закрыть"), - callback_data="promo_offer_close", + async def send_single_offer(user): + """Отправляет одно предложение с семафором ограничения""" + async with semaphore: + try: + offer_record = await upsert_discount_offer( + db, + user_id=user.id, + subscription_id=getattr(user, "subscription", None).id if getattr(user, "subscription", None) else None, + notification_type=f"promo_template_{template.id}", + discount_percent=template.discount_percent, + bonus_amount_kopeks=0, + valid_hours=template.valid_hours, + effect_type=effect_type, + extra_data={ + "template_id": template.id, + "offer_type": template.offer_type, + "test_duration_hours": template.test_duration_hours, + "test_squad_uuids": template.test_squad_uuids, + "active_discount_hours": template.active_discount_hours, + }, ) - ]) - keyboard = InlineKeyboardMarkup(inline_keyboard=keyboard_rows) + user_texts = get_texts(user.language or db_user.language) + keyboard_rows: List[List[InlineKeyboardButton]] = [ + [ + build_miniapp_or_callback_button( + text=template.button_text, + callback_data=f"claim_discount_{offer_record.id}", + ) + ] + ] - message_text = _render_template_text( - template, - user.language or db_user.language, - server_name=squad_name, - ) - await bot.send_message( - chat_id=user.telegram_id, - text=message_text, - reply_markup=keyboard, - parse_mode="HTML", - ) - sent += 1 - except (TelegramForbiddenError, TelegramBadRequest) as exc: - logger.warning("Не удалось отправить предложение пользователю %s: %s", user.telegram_id, exc) - failed += 1 - except Exception as exc: # pragma: no cover - defensive logging - logger.error("Ошибка рассылки промо предложения пользователю %s: %s", user.telegram_id, exc) - failed += 1 + keyboard_rows.append([ + InlineKeyboardButton( + text=user_texts.t("PROMO_OFFER_CLOSE", "❌ Закрыть"), + callback_data="promo_offer_close", + ) + ]) + + keyboard = InlineKeyboardMarkup(inline_keyboard=keyboard_rows) + + message_text = _render_template_text( + template, + user.language or db_user.language, + server_name=squad_name, + ) + await bot.send_message( + chat_id=user.telegram_id, + text=message_text, + reply_markup=keyboard, + parse_mode="HTML", + ) + return True + except (TelegramForbiddenError, TelegramBadRequest) as exc: + logger.warning("Не удалось отправить предложение пользователю %s: %s", user.telegram_id, exc) + return False + except Exception as exc: # pragma: no cover - defensive logging + logger.error("Ошибка рассылки промо предложения пользователю %s: %s", user.telegram_id, exc) + return False + + # Отправляем предложения пакетами для эффективности + batch_size = 100 + for i in range(0, len(users), batch_size): + batch = users[i:i + batch_size] + tasks = [send_single_offer(user) for user in batch] + results = await asyncio.gather(*tasks, return_exceptions=True) + + for result in results: + if isinstance(result, bool): # Успешно или неуспешно + if result: + sent += 1 + else: + failed += 1 + elif isinstance(result, Exception): # Ошибка выполнения задачи + failed += 1 + + # Небольшая задержка между пакетами для снижения нагрузки на API + await asyncio.sleep(0.1) return sent, failed