Merge pull request #1725 from Fr1ngg/dev4

fix
This commit is contained in:
Egor
2025-11-06 03:45:46 +03:00
committed by GitHub
17 changed files with 552 additions and 189 deletions

View File

@@ -163,7 +163,7 @@ class RemnaWaveAPI:
async def __aenter__(self):
conn_type = self._detect_connection_type()
logger.info(f"Подключение к Remnawave: {self.base_url} (тип: {conn_type})")
logger.debug(f"Подключение к Remnawave: {self.base_url} (тип: {conn_type})")
headers = self._prepare_auth_headers()

View File

@@ -449,4 +449,4 @@ def register_handlers(dp: Dispatcher):
dp.message.register(
admin_commands_help,
Command("admin_help")
)
)

View File

@@ -764,48 +764,69 @@ async def confirm_broadcast(
broadcast_keyboard = create_broadcast_keyboard(selected_buttons, db_user.language)
for user in users:
try:
if has_media and media_file_id:
if media_type == "photo":
await callback.bot.send_photo(
# Ограничение на количество одновременных отправок
semaphore = asyncio.Semaphore(20)
async def send_single_broadcast(user):
"""Отправляет одно сообщение рассылки с семафором ограничения"""
async with semaphore:
try:
if has_media and media_file_id:
if media_type == "photo":
await callback.bot.send_photo(
chat_id=user.telegram_id,
photo=media_file_id,
caption=message_text,
parse_mode="HTML",
reply_markup=broadcast_keyboard
)
elif media_type == "video":
await callback.bot.send_video(
chat_id=user.telegram_id,
video=media_file_id,
caption=message_text,
parse_mode="HTML",
reply_markup=broadcast_keyboard
)
elif media_type == "document":
await callback.bot.send_document(
chat_id=user.telegram_id,
document=media_file_id,
caption=message_text,
parse_mode="HTML",
reply_markup=broadcast_keyboard
)
else:
await callback.bot.send_message(
chat_id=user.telegram_id,
photo=media_file_id,
caption=message_text,
text=message_text,
parse_mode="HTML",
reply_markup=broadcast_keyboard
)
elif media_type == "video":
await callback.bot.send_video(
chat_id=user.telegram_id,
video=media_file_id,
caption=message_text,
parse_mode="HTML",
reply_markup=broadcast_keyboard
)
elif media_type == "document":
await callback.bot.send_document(
chat_id=user.telegram_id,
document=media_file_id,
caption=message_text,
parse_mode="HTML",
reply_markup=broadcast_keyboard
)
else:
await callback.bot.send_message(
chat_id=user.telegram_id,
text=message_text,
parse_mode="HTML",
reply_markup=broadcast_keyboard
)
sent_count += 1
if sent_count % 20 == 0:
await asyncio.sleep(1)
except Exception as e:
failed_count += 1
logger.error(f"Ошибка отправки рассылки пользователю {user.telegram_id}: {e}")
return True, user.telegram_id
except Exception as e:
logger.error(f"Ошибка отправки рассылки пользователю {user.telegram_id}: {e}")
return False, user.telegram_id
# Отправляем сообщения пакетами для эффективности
batch_size = 100
for i in range(0, len(users), batch_size):
batch = users[i:i + batch_size]
tasks = [send_single_broadcast(user) for user in batch]
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, tuple): # (success, telegram_id)
success, _ = result
if success:
sent_count += 1
else:
failed_count += 1
elif isinstance(result, Exception):
failed_count += 1
# Небольшая задержка между пакетами для снижения нагрузки на API
await asyncio.sleep(0.1)
broadcast_history.sent_count = sent_count
broadcast_history.failed_count = failed_count

View File

@@ -1027,6 +1027,20 @@ async def process_notification_value_input(message: Message, state: FSMContext):
language = data.get("settings_language") or message.from_user.language_code or settings.DEFAULT_LANGUAGE
texts = get_texts(language)
# Добавляем дополнительные проверки диапазона значений
if (key == "expired_second_wave" and field == "percent") or (key == "expired_third_wave" and field == "percent"):
if value < 0 or value > 100:
await message.answer("❌ Процент скидки должен быть от 0 до 100.")
return
elif (key == "expired_second_wave" and field == "hours") or (key == "expired_third_wave" and field == "hours"):
if value < 1 or value > 168: # Максимум 168 часов (7 дней)
await message.answer("❌ Количество часов должно быть от 1 до 168.")
return
elif key == "expired_third_wave" and field == "trigger":
if value < 2: # Минимум 2 дня
await message.answer("❌ Количество дней должно быть не менее 2.")
return
success = False
if key == "expired_second_wave" and field == "percent":
success = NotificationSettingsService.set_second_wave_discount_percent(value)
@@ -1075,4 +1089,4 @@ async def process_notification_value_input(message: Message, state: FSMContext):
def register_handlers(dp):
dp.include_router(router)
dp.include_router(router)

View File

@@ -1,5 +1,6 @@
from __future__ import annotations
import asyncio
import html
import logging
import re
@@ -1896,66 +1897,94 @@ async def _send_offer_to_users(
squad_name: Optional[str],
effect_type: str,
) -> Tuple[int, int]:
from app.database.database import AsyncSessionLocal
sent = 0
failed = 0
for user in users:
try:
offer_record = await upsert_discount_offer(
db,
user_id=user.id,
subscription_id=user.subscription.id if user.subscription else None,
notification_type=f"promo_template_{template.id}",
discount_percent=template.discount_percent,
bonus_amount_kopeks=0,
valid_hours=template.valid_hours,
effect_type=effect_type,
extra_data={
"template_id": template.id,
"offer_type": template.offer_type,
"test_duration_hours": template.test_duration_hours,
"test_squad_uuids": template.test_squad_uuids,
"active_discount_hours": template.active_discount_hours,
},
)
# Ограничение на количество одновременных отправок
semaphore = asyncio.Semaphore(20)
user_texts = get_texts(user.language or db_user.language)
keyboard_rows: List[List[InlineKeyboardButton]] = [
[
build_miniapp_or_callback_button(
text=template.button_text,
callback_data=f"claim_discount_{offer_record.id}",
async def send_single_offer(user):
"""Отправляет одно предложение с семафором ограничения"""
async with semaphore:
try:
# Используем отдельную сессию для изоляции транзакции
async with AsyncSessionLocal() as new_db:
offer_record = await upsert_discount_offer(
new_db,
user_id=user.id,
subscription_id=getattr(user, "subscription", None).id if getattr(user, "subscription", None) else None,
notification_type=f"promo_template_{template.id}",
discount_percent=template.discount_percent,
bonus_amount_kopeks=0,
valid_hours=template.valid_hours,
effect_type=effect_type,
extra_data={
"template_id": template.id,
"offer_type": template.offer_type,
"test_duration_hours": template.test_duration_hours,
"test_squad_uuids": template.test_squad_uuids,
"active_discount_hours": template.active_discount_hours,
},
)
]
]
keyboard_rows.append([
InlineKeyboardButton(
text=user_texts.t("PROMO_OFFER_CLOSE", "❌ Закрыть"),
callback_data="promo_offer_close",
)
])
user_texts = get_texts(user.language or db_user.language)
keyboard_rows: List[List[InlineKeyboardButton]] = [
[
build_miniapp_or_callback_button(
text=template.button_text,
callback_data=f"claim_discount_{offer_record.id}",
)
]
]
keyboard = InlineKeyboardMarkup(inline_keyboard=keyboard_rows)
keyboard_rows.append([
InlineKeyboardButton(
text=user_texts.t("PROMO_OFFER_CLOSE", "❌ Закрыть"),
callback_data="promo_offer_close",
)
])
message_text = _render_template_text(
template,
user.language or db_user.language,
server_name=squad_name,
)
await bot.send_message(
chat_id=user.telegram_id,
text=message_text,
reply_markup=keyboard,
parse_mode="HTML",
)
sent += 1
except (TelegramForbiddenError, TelegramBadRequest) as exc:
logger.warning("Не удалось отправить предложение пользователю %s: %s", user.telegram_id, exc)
failed += 1
except Exception as exc: # pragma: no cover - defensive logging
logger.error("Ошибка рассылки промо предложения пользователю %s: %s", user.telegram_id, exc)
failed += 1
keyboard = InlineKeyboardMarkup(inline_keyboard=keyboard_rows)
message_text = _render_template_text(
template,
user.language or db_user.language,
server_name=squad_name,
)
await bot.send_message(
chat_id=user.telegram_id,
text=message_text,
reply_markup=keyboard,
parse_mode="HTML",
)
return True
except (TelegramForbiddenError, TelegramBadRequest) as exc:
logger.warning("Не удалось отправить предложение пользователю %s: %s", user.telegram_id, exc)
return False
except Exception as exc: # pragma: no cover - defensive logging
logger.error("Ошибка рассылки промо предложения пользователю %s: %s", user.telegram_id, exc)
return False
# Отправляем предложения пакетами для эффективности
batch_size = 100
for i in range(0, len(users), batch_size):
batch = users[i:i + batch_size]
tasks = [send_single_offer(user) for user in batch]
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, bool): # Успешно или неуспешно
if result:
sent += 1
else:
failed += 1
elif isinstance(result, Exception): # Ошибка выполнения задачи
failed += 1
# Небольшая задержка между пакетами для снижения нагрузки на API
await asyncio.sleep(0.1)
return sent, failed

View File

@@ -1,4 +1,5 @@
import logging
import re
from aiogram import Dispatcher, types, F
from aiogram.fsm.context import FSMContext
from sqlalchemy.ext.asyncio import AsyncSession
@@ -18,6 +19,92 @@ from app.database.crud.welcome_text import (
logger = logging.getLogger(__name__)
def validate_html_tags(text: str) -> tuple[bool, str]:
"""
Проверяет HTML-теги в тексте на соответствие требованиям Telegram API.
Args:
text: Текст для проверки
Returns:
Кортеж из (валидно ли, сообщение об ошибке или None)
"""
# Поддерживаемые теги в parse_mode="HTML" для Telegram API
allowed_tags = {
'b', 'strong', # жирный
'i', 'em', # курсив
'u', 'ins', # подчеркнуто
's', 'strike', 'del', # зачеркнуто
'code', # моноширинный для коротких фрагментов
'pre', # моноширинный блок кода
'a' # ссылки
}
# Убираем плейсхолдеры из строки перед проверкой тегов
# Плейсхолдеры имеют формат {ключ}, и не являются тегами
placeholder_pattern = r'\{[^{}]+\}'
clean_text = re.sub(placeholder_pattern, '', text)
# Находим все открывающие и закрывающие теги
tag_pattern = r'<(/?)([a-zA-Z]+)(\s[^>]*)?>'
tags_with_pos = [(m.group(1), m.group(2), m.group(3), m.start(), m.end()) for m in re.finditer(tag_pattern, clean_text)]
for closing, tag, attrs, start_pos, end_pos in tags_with_pos:
tag_lower = tag.lower()
# Проверяем, является ли тег поддерживаемым
if tag_lower not in allowed_tags:
return False, f"Неподдерживаемый HTML-тег: <{tag}>. Используйте только теги: {', '.join(sorted(allowed_tags))}"
# Проверяем атрибуты для тега <a>
if tag_lower == 'a':
if closing:
continue # Для закрывающего тега не нужно проверять атрибуты
if not attrs:
return False, "Тег <a> должен содержать атрибут href, например: <a href='URL'>ссылка</a>"
# Проверяем, что есть атрибут href
if 'href=' not in attrs.lower():
return False, "Тег <a> должен содержать атрибут href, например: <a href='URL'>ссылка</a>"
# Проверяем формат URL
href_match = re.search(r'href\s*=\s*[\'"]([^\'"]+)[\'"]', attrs, re.IGNORECASE)
if href_match:
url = href_match.group(1)
# Проверяем, что URL начинается с поддерживаемой схемы
if not re.match(r'^https?://|^tg://', url, re.IGNORECASE):
return False, f"URL в теге <a> должен начинаться с http://, https:// или tg://. Найдено: {url}"
else:
return False, "Не удалось извлечь URL из атрибута href тега <a>"
# Проверяем парность тегов с использованием стека
stack = []
for closing, tag, attrs, start_pos, end_pos in tags_with_pos:
tag_lower = tag.lower()
if tag_lower not in allowed_tags:
continue
if closing:
# Это закрывающий тег
if not stack:
return False, f"Лишний закрывающий тег: </{tag}>"
last_opening_tag = stack.pop()
if last_opening_tag.lower() != tag_lower:
return False, f"Тег </{tag}> не соответствует открывающему тегу <{last_opening_tag}>"
else:
# Это открывающий тег
stack.append(tag)
# Если остались незакрытые теги
if stack:
unclosed_tags = ", ".join([f"<{tag}>" for tag in stack])
return False, f"Незакрытые теги: {unclosed_tags}"
return True, None
def get_telegram_formatting_info() -> str:
return """
📝 <b>Поддерживаемые теги форматирования:</b>
@@ -202,6 +289,12 @@ async def process_welcome_text_edit(
await message.answer("❌ Текст слишком длинный! Максимум 4000 символов.")
return
# Проверяем HTML-теги на валидность
is_valid, error_msg = validate_html_tags(new_text)
if not is_valid:
await message.answer(f"❌ Ошибка в HTML-разметке:\n\n{error_msg}")
return
success = await set_welcome_text(db, new_text, db_user.id)
if success:

View File

@@ -209,25 +209,103 @@ async def handle_balance_history_pagination(
async def show_payment_methods(
callback: types.CallbackQuery,
db_user: User,
db: AsyncSession,
state: FSMContext
):
from app.utils.payment_utils import get_payment_methods_text
from app.database.crud.subscription import get_subscription_by_user_id
from app.utils.pricing_utils import calculate_months_from_days, apply_percentage_discount
from app.config import settings
from app.services.subscription_service import SubscriptionService
texts = get_texts(db_user.language)
payment_text = get_payment_methods_text(db_user.language)
# Добавляем информацию о текущем тарифе пользователя
subscription = await get_subscription_by_user_id(db, db_user.id)
tariff_info = ""
if subscription and not subscription.is_trial:
# Рассчитываем приблизительную стоимость продления на 30 дней
duration_days = 30 # Берем для примера 30 дней
current_traffic = subscription.traffic_limit_gb
current_connected_squads = subscription.connected_squads or []
current_device_limit = subscription.device_limit or settings.DEFAULT_DEVICE_LIMIT
try:
# Получаем цены для текущих параметров
from app.config import PERIOD_PRICES
base_price_original = PERIOD_PRICES.get(duration_days, 0)
period_discount_percent = db_user.get_promo_discount("period", duration_days)
base_price, base_discount_total = apply_percentage_discount(
base_price_original,
period_discount_percent,
)
# Рассчитываем стоимость серверов
from app.services.subscription_service import SubscriptionService
subscription_service = SubscriptionService()
servers_price_per_month, per_server_monthly_prices = await subscription_service.get_countries_price_by_uuids(
current_connected_squads,
db,
promo_group_id=db_user.promo_group_id,
)
servers_discount_percent = db_user.get_promo_discount("servers", duration_days)
total_servers_price = 0
for server_price in per_server_monthly_prices:
discounted_per_month, discount_per_month = apply_percentage_discount(
server_price,
servers_discount_percent,
)
total_servers_price += discounted_per_month
# Рассчитываем стоимость трафика
traffic_price_per_month = settings.get_traffic_price(current_traffic)
traffic_discount_percent = db_user.get_promo_discount("traffic", duration_days)
traffic_discounted_per_month, traffic_discount_per_month = apply_percentage_discount(
traffic_price_per_month,
traffic_discount_percent,
)
# Рассчитываем стоимость устройств
additional_devices = max(0, (current_device_limit or 0) - settings.DEFAULT_DEVICE_LIMIT)
devices_price_per_month = additional_devices * settings.PRICE_PER_DEVICE
devices_discount_percent = db_user.get_promo_discount("devices", duration_days)
devices_discounted_per_month, devices_discount_per_month = apply_percentage_discount(
devices_price_per_month,
devices_discount_percent,
)
# Общая стоимость
months_in_period = calculate_months_from_days(duration_days)
total_price = (
base_price +
total_servers_price * months_in_period +
traffic_discounted_per_month * months_in_period +
devices_discounted_per_month * months_in_period
)
current_tariff_desc = f"📱 Подписка: {len(current_connected_squads)} серверов, {current_traffic} ГБ, {current_device_limit} устр."
estimated_price_info = f"💰 Стоимость продления (примерно): {texts.format_price(total_price)} за {duration_days} дней"
tariff_info = f"\n\n📋 <b>Ваш текущий тариф:</b>\n{current_tariff_desc}\n{estimated_price_info}"
except Exception as e:
logger.warning(f"Не удалось рассчитать стоимость текущей подписки для пользователя {db_user.id}: {e}")
tariff_info = ""
full_text = payment_text + tariff_info
keyboard = get_payment_methods_keyboard(0, db_user.language)
try:
await callback.message.edit_text(
payment_text,
full_text,
reply_markup=keyboard,
parse_mode="HTML"
)
except TelegramBadRequest:
try:
await callback.message.edit_caption(
payment_text,
full_text,
reply_markup=keyboard,
parse_mode="HTML"
)
@@ -237,7 +315,7 @@ async def show_payment_methods(
except TelegramBadRequest:
pass
await callback.message.answer(
payment_text,
full_text,
reply_markup=keyboard,
parse_mode="HTML"
)
@@ -825,4 +903,4 @@ def register_balance_handlers(dp: Dispatcher):
dp.callback_query.register(
handle_topup_amount_callback,
F.data.startswith("topup_amount|")
)
)

View File

@@ -253,17 +253,20 @@ async def _prepare_subscription_summary(
else:
traffic_display = f"{summary_data.get('traffic_gb', 0)} ГБ"
if base_discount_total > 0:
details_lines = []
# Добавляем строку базового периода только если цена не равна 0
if base_discount_total > 0 and base_price > 0:
base_line = (
f"- Базовый период: <s>{texts.format_price(base_price_original)}</s> "
f"{texts.format_price(base_price)}"
f" (скидка {period_discount_percent}%:"
f" -{texts.format_price(base_discount_total)})"
)
else:
details_lines.append(base_line)
elif base_price_original > 0:
base_line = f"- Базовый период: {texts.format_price(base_price_original)}"
details_lines = [base_line]
details_lines.append(base_line)
if total_traffic_price > 0:
traffic_line = (

View File

@@ -859,8 +859,10 @@ async def return_to_saved_cart(
period_display = format_period_description(prepared_cart_data['period_days'], db_user.language)
# Проверяем наличие ключа 'countries' в данных корзины
cart_countries = prepared_cart_data.get('countries', [])
for country in countries:
if country['uuid'] in prepared_cart_data['countries']:
if country['uuid'] in cart_countries:
selected_countries_names.append(country['name'])
if settings.is_traffic_fixed():

View File

@@ -1448,6 +1448,7 @@
"ADMIN_POLLS_CREATION_TITLE_PROMPT": "🗳️ <b>Create poll</b>\n\nEnter poll title:",
"ADMIN_POLLS_CREATION_CANCELLED": "❌ Poll creation cancelled.",
"ADMIN_POLLS_CREATION_DESCRIPTION_PROMPT": "Enter poll description. HTML is allowed.\nSend /skip to omit.",
"ADMIN_POLLS_CREATION_DESCRIPTION_SKIPPED": "Description omitted.",
"ADMIN_POLLS_CREATION_INVALID_HTML": "❌ HTML error: {error}",
"ADMIN_POLLS_CREATION_REWARD_PROMPT": "Enter reward amount in RUB. Send 0 to disable reward.",
"ADMIN_POLLS_CREATION_REWARD_INVALID": "❌ Invalid amount. Try again.",

View File

@@ -1468,6 +1468,7 @@
"ADMIN_POLLS_CREATION_TITLE_PROMPT": "🗳️ <b>Создание опроса</b>\n\nВведите заголовок опроса:",
"ADMIN_POLLS_CREATION_CANCELLED": "❌ Создание опроса отменено.",
"ADMIN_POLLS_CREATION_DESCRIPTION_PROMPT": "Введите описание опроса. HTML разрешён.\nОтправьте /skip, чтобы пропустить.",
"ADMIN_POLLS_CREATION_DESCRIPTION_SKIPPED": "Описание пропущено.",
"ADMIN_POLLS_CREATION_INVALID_HTML": "❌ Ошибка в HTML: {error}",
"ADMIN_POLLS_CREATION_REWARD_PROMPT": "Введите сумму награды в рублях. Отправьте 0 чтобы отключить награду.",
"ADMIN_POLLS_CREATION_REWARD_INVALID": "❌ Некорректная сумма. Попробуйте ещё раз.",

View File

@@ -137,30 +137,50 @@ class BroadcastService:
keyboard = self._build_keyboard(config.selected_buttons)
for index, user in enumerate(recipients, start=1):
# Ограничение на количество одновременных отправок
semaphore = asyncio.Semaphore(20)
async def send_single_message(user):
"""Отправляет одно сообщение с семафором ограничения"""
async with semaphore:
if cancel_event.is_set():
return False
telegram_id = getattr(user, "telegram_id", None)
if telegram_id is None:
return False
try:
await self._deliver_message(telegram_id, config, keyboard)
return True
except Exception as exc: # noqa: BLE001
logger.error(
"Ошибка отправки рассылки %s пользователю %s: %s",
broadcast_id,
telegram_id,
exc,
)
return False
# Отправляем сообщения пакетами для эффективности
batch_size = 100
for i in range(0, len(recipients), batch_size):
if cancel_event.is_set():
await self._mark_cancelled(broadcast_id, sent_count, failed_count)
return
telegram_id = getattr(user, "telegram_id", None)
if telegram_id is None:
failed_count += 1
continue
batch = recipients[i:i + batch_size]
tasks = [send_single_message(user) for user in batch]
results = await asyncio.gather(*tasks, return_exceptions=True)
try:
await self._deliver_message(telegram_id, config, keyboard)
sent_count += 1
except Exception as exc: # noqa: BLE001
failed_count += 1
logger.error(
"Ошибка отправки рассылки %s пользователю %s: %s",
broadcast_id,
telegram_id,
exc,
)
for result in results:
if result is True:
sent_count += 1
else:
failed_count += 1
if index % 20 == 0:
await asyncio.sleep(1)
# Небольшая задержка между пакетами для снижения нагрузки на API
await asyncio.sleep(0.1)
await self._mark_finished(broadcast_id, sent_count, failed_count, cancelled=False)

View File

@@ -1092,6 +1092,56 @@ class MonitoringService:
try:
texts = get_texts(user.language)
# Рассчитываем минимальную цену за подписку с минимальной конфигурацией
from app.config import settings, PERIOD_PRICES
from app.utils.pricing_utils import apply_percentage_discount
# Базовая цена за 30 дней
base_price_original = PERIOD_PRICES.get(30, settings.PRICE_30_DAYS)
# Применяем скидку промогруппы для категории "period"
promo_group_discount = user.get_promo_discount("period", 30) if user else 0
# Применяем пользовательскую промо-скидку (если есть)
user_discount_percent = self._get_user_promo_offer_discount_percent(user)
# Общая скидка - максимальная из промогруппы и пользовательской
total_discount_percent = max(promo_group_discount, user_discount_percent)
base_price, _ = apply_percentage_discount(base_price_original, total_discount_percent)
# Добавляем цену за трафик (если фиксированный трафик включён)
if settings.is_traffic_fixed():
traffic_price = settings.get_traffic_price(settings.get_fixed_traffic_limit())
# Применяем скидки на трафик
traffic_discount = user.get_promo_discount("traffic", 30) if user else 0
traffic_price, _ = apply_percentage_discount(traffic_price, traffic_discount)
else:
traffic_price = 0 # Трафик не фиксирован, цена включена в базовую
# Добавляем цену за серверы (предполагаем минимум 1 сервер по минимальной цене)
# Вместо сложного запроса к БД, используем настройки
# Для минимальной конфигурации - один сервер с минимальной ценой
min_server_price = getattr(settings, 'MIN_SERVER_PRICE', 0) or 0
if min_server_price == 0:
# Если нет явной минимальной цены, используем базовую цену
# В реальных условиях цена сервера будет определяться в ходе оформления подписки
min_server_price = 0
# Добавляем цену за устройства (если больше базового лимита)
# В минимальной конфигурации - базовый лимит, без доп. устройств
device_limit = settings.DEFAULT_DEVICE_LIMIT
additional_devices = max(0, device_limit - settings.DEFAULT_DEVICE_LIMIT)
devices_price = additional_devices * settings.PRICE_PER_DEVICE
# Для простоты и правильной работы без обращения к БД, рассчитываем минимальную цену как:
# базовая цена + минимальная цена за трафик (если есть фиксированный)
min_server_price = 0 # для минимальной конфигурации с 1 сервером используем 0 или минимальную известную
# Попробуем получить минимальную цену сервера из настроек или используем подходящее значение
# Находим минимальную возможную цену из возможных цен серверов
# В упрощенном варианте используем базовую конфигурацию: базовая цена + трафик
min_total_price = base_price + traffic_price
message = f"""
🎁 <b>Тестовая подписка скоро закончится!</b>
@@ -1101,7 +1151,7 @@ class MonitoringService:
Переходите на полную подписку!
🔥 <b>Специальное предложение:</b>
• 30 дней всего за {settings.format_price(settings.PRICE_30_DAYS)}
• 30 дней всего за {settings.format_price(min_total_price)}
• Безлимитный трафик
Все серверы доступны
• Скорость до 1ГБит/сек

View File

@@ -68,17 +68,13 @@ async def send_poll_to_users(
poll: Poll,
users: Iterable[User],
) -> dict:
from app.database.database import AsyncSessionLocal
sent = 0
failed = 0
skipped = 0
poll_id = poll.id
poll_snapshot = SimpleNamespace(
title=poll.title,
description=poll.description,
reward_enabled=poll.reward_enabled,
reward_amount_kopeks=poll.reward_amount_kopeks,
)
user_snapshots = [
SimpleNamespace(
@@ -89,72 +85,108 @@ async def send_poll_to_users(
for user in users
]
for index, user in enumerate(user_snapshots, start=1):
existing_response = await db.execute(
select(PollResponse.id).where(
and_(
PollResponse.poll_id == poll_id,
PollResponse.user_id == user.id,
)
# Получаем список пользователей, которые уже прошли опрос, за один запрос
user_ids = [user_snapshot.id for user_snapshot in user_snapshots]
existing_responses_result = await db.execute(
select(PollResponse.user_id).where(
and_(
PollResponse.poll_id == poll_id,
PollResponse.user_id.in_(user_ids)
)
)
if existing_response.scalar_one_or_none():
skipped += 1
continue
)
existing_user_ids = set(existing_responses_result.scalars().all())
response = PollResponse(
poll_id=poll_id,
user_id=user.id,
)
db.add(response)
# Используем умеренный семафор, чтобы не превышать лимиты подключений к БД
semaphore = asyncio.Semaphore(30) # Баланс между производительностью и нагрузкой на БД
try:
await db.flush()
# Создаем отдельную функцию для создания отдельной сессии для каждой отправки
async def send_poll_invitation(user_snapshot):
"""Отправляет приглашение к опросу одному пользователю"""
async with semaphore:
# Пропускаем пользователей, которые уже прошли опрос
if user_snapshot.id in existing_user_ids:
return "skipped"
# Создаем новую сессию для изоляции транзакции
async with AsyncSessionLocal() as new_db:
try:
# Проверяем еще раз в новой сессии на случай гонки
existing_response = await new_db.execute(
select(PollResponse.id).where(
and_(
PollResponse.poll_id == poll_id,
PollResponse.user_id == user_snapshot.id,
)
)
)
existing_id = existing_response.scalar_one_or_none()
if existing_id:
return "skipped"
text = _build_poll_invitation_text(poll_snapshot, user.language)
keyboard = build_start_keyboard(response.id, user.language)
response = PollResponse(
poll_id=poll_id,
user_id=user_snapshot.id,
)
new_db.add(response)
await bot.send_message(
chat_id=user.telegram_id,
text=text,
reply_markup=keyboard,
parse_mode="HTML",
disable_web_page_preview=True,
)
await new_db.flush()
await db.commit()
sent += 1
text = _build_poll_invitation_text(poll, user_snapshot.language)
keyboard = build_start_keyboard(response.id, user_snapshot.language)
if index % 20 == 0:
await asyncio.sleep(1)
except TelegramBadRequest as error:
error_text = str(error).lower()
if "chat not found" in error_text or "bot was blocked by the user" in error_text:
skipped += 1
logger.info(
" Пропуск пользователя %s при отправке опроса %s: %s",
user.telegram_id,
poll_id,
error,
)
else: # pragma: no cover - unexpected telegram error
await bot.send_message(
chat_id=user_snapshot.telegram_id,
text=text,
reply_markup=keyboard,
parse_mode="HTML",
disable_web_page_preview=True,
)
await new_db.commit()
return "sent"
except TelegramBadRequest as error:
error_text = str(error).lower()
if "chat not found" in error_text or "bot was blocked by the user" in error_text:
await new_db.rollback()
return "skipped"
else: # pragma: no cover - unexpected telegram error
await new_db.rollback()
return "failed"
except Exception as error: # pragma: no cover - defensive logging
await new_db.rollback()
# Проверяем, является ли ошибка связанной с лимитом подключений
if "too many clients" in str(error).lower():
logger.warning(
"⚠️ Ограничение на количество подключений к БД: %s пользователю %s",
poll_id,
user_snapshot.telegram_id,
)
# Уменьшаем вероятность переполнения, делая небольшую задержку
await asyncio.sleep(0.1)
else:
logger.error(
"❌ Ошибка отправки опроса %s пользователю %s: %s",
poll_id,
user_snapshot.telegram_id,
error,
)
return "failed"
# Отправляем все приглашения одновременно без задержек для максимальной скорости
tasks = [send_poll_invitation(user_snapshot) for user_snapshot in user_snapshots]
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, str): # Успешно выполненная задача
if result == "sent":
sent += 1
elif result == "failed":
failed += 1
logger.error(
"❌ Ошибка отправки опроса %s пользователю %s: %s",
poll_id,
user.telegram_id,
error,
)
await db.rollback()
except Exception as error: # pragma: no cover - defensive logging
elif result == "skipped":
skipped += 1
elif isinstance(result, Exception): # Ошибка выполнения задачи
failed += 1
logger.error(
"❌ Ошибка отправки опроса %s пользователю %s: %s",
poll_id,
user.telegram_id,
error,
)
await db.rollback()
return {
"sent": sent,

View File

@@ -125,17 +125,22 @@ def format_price_button(
Formatted button text
Examples:
With discount:
With discount and price > 0:
"📅 30 дней - 990₽ ➜ 693₽ (-30%)!"
With final price = 0:
"📅 30 дней"
With emphasis:
"🔥 📅 360 дней - 8990₽ ➜ 6293₽ (-30%)! 🔥"
"🔥 📅 30 дней - 8990₽ ➜ 6293₽ (-30%)! 🔥"
Without discount:
"📅 30 дней - 990₽"
"""
# Build button text with or without discount
if price_info.has_discount:
# Format button text differently if final price is 0
if price_info.final_price == 0:
button_text = f"📅 {period_label}"
elif price_info.has_discount:
exclamation = "!" if add_exclamation else ""
button_text = (
f"📅 {period_label} - "
@@ -176,8 +181,13 @@ def format_price_text(
Without discount:
"📅 30 дней - 990₽"
With zero price:
"📅 30 дней"
"""
if price_info.has_discount:
if price_info.final_price == 0:
return f"📅 {period_label}"
elif price_info.has_discount:
return (
f"📅 {period_label} - "
f"{format_price_func(price_info.base_price)}"

View File

@@ -114,13 +114,13 @@ OPENAPI_TAGS = [
def create_web_api_app() -> FastAPI:
docs_config = settings.get_web_api_docs_config()
# Убираем openapi_tags для предотвращения ошибок при генерации openapi.json
app = FastAPI(
title=settings.WEB_API_TITLE,
version=settings.WEB_API_VERSION,
docs_url=docs_config.get("docs_url"),
redoc_url=docs_config.get("redoc_url"),
openapi_url=docs_config.get("openapi_url"),
openapi_tags=OPENAPI_TAGS,
swagger_ui_parameters={"persistAuthorization": True},
)

View File

@@ -66,6 +66,15 @@ async def main():
handlers=[file_handler, stream_handler],
)
# Установим более высокий уровень логирования для "мусорных" логов
logging.getLogger("aiohttp.access").setLevel(logging.ERROR)
logging.getLogger("aiohttp.client").setLevel(logging.WARNING)
logging.getLogger("aiohttp.internal").setLevel(logging.WARNING)
logging.getLogger("app.external.remnawave_api").setLevel(logging.WARNING)
logging.getLogger("aiogram").setLevel(logging.WARNING)
logging.getLogger("uvicorn.access").setLevel(logging.ERROR)
logging.getLogger("uvicorn.error").setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
timeline = StartupTimeline(logger, "Bedolaga Remnawave Bot")
timeline.log_banner(