mirror of
https://github.com/BEDOLAGA-DEV/remnawave-bedolaga-telegram-bot.git
synced 2026-01-20 03:40:26 +00:00
257 lines
8.6 KiB
Python
257 lines
8.6 KiB
Python
import asyncio
|
||
import logging
|
||
from datetime import datetime
|
||
|
||
from aiogram import Dispatcher, F, types
|
||
from aiogram.exceptions import TelegramBadRequest
|
||
from sqlalchemy.ext.asyncio import AsyncSession
|
||
|
||
from app.config import settings
|
||
from app.database.crud.poll import (
|
||
get_poll_response_by_id,
|
||
record_poll_answer,
|
||
)
|
||
from app.database.models import PollQuestion, User
|
||
from app.localization.texts import get_texts
|
||
from app.services.poll_service import get_next_question, get_question_option, reward_user_for_poll
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
async def _delete_message_later(bot, chat_id: int, message_id: int, delay: int = 10) -> None:
|
||
try:
|
||
await asyncio.sleep(delay)
|
||
await bot.delete_message(chat_id, message_id)
|
||
except Exception as error: # pragma: no cover - cleanup best effort
|
||
logger.debug("Не удалось удалить сообщение опроса %s: %s", message_id, error)
|
||
|
||
|
||
async def _render_question_text(
|
||
poll_title: str,
|
||
question: PollQuestion,
|
||
current_index: int,
|
||
total: int,
|
||
language: str,
|
||
) -> str:
|
||
texts = get_texts(language)
|
||
header = texts.t("POLL_QUESTION_HEADER", "<b>Вопрос {current}/{total}</b>").format(
|
||
current=current_index,
|
||
total=total,
|
||
)
|
||
lines = [f"🗳️ <b>{poll_title}</b>", "", header, "", question.text]
|
||
return "\n".join(lines)
|
||
|
||
|
||
async def _update_poll_message(
|
||
message: types.Message,
|
||
text: str,
|
||
*,
|
||
reply_markup: types.InlineKeyboardMarkup | None = None,
|
||
parse_mode: str | None = "HTML",
|
||
) -> bool:
|
||
try:
|
||
await message.edit_text(
|
||
text,
|
||
reply_markup=reply_markup,
|
||
parse_mode=parse_mode,
|
||
)
|
||
return True
|
||
except TelegramBadRequest as error:
|
||
error_text = str(error).lower()
|
||
if "message is not modified" in error_text:
|
||
logger.debug(
|
||
"Опросное сообщение уже актуально, пропускаем обновление: %s",
|
||
error,
|
||
)
|
||
return True
|
||
|
||
logger.warning(
|
||
"Не удалось обновить сообщение опроса %s: %s",
|
||
message.message_id,
|
||
error,
|
||
)
|
||
except Exception as error: # pragma: no cover - defensive logging
|
||
logger.exception(
|
||
"Непредвиденная ошибка при обновлении сообщения опроса %s: %s",
|
||
message.message_id,
|
||
error,
|
||
)
|
||
|
||
return False
|
||
|
||
|
||
def _build_options_keyboard(response_id: int, question: PollQuestion) -> types.InlineKeyboardMarkup:
|
||
buttons: list[list[types.InlineKeyboardButton]] = []
|
||
for option in sorted(question.options, key=lambda o: o.order):
|
||
buttons.append(
|
||
[
|
||
types.InlineKeyboardButton(
|
||
text=option.text,
|
||
callback_data=f"poll_answer:{response_id}:{question.id}:{option.id}",
|
||
)
|
||
]
|
||
)
|
||
return types.InlineKeyboardMarkup(inline_keyboard=buttons)
|
||
|
||
|
||
async def handle_poll_start(
|
||
callback: types.CallbackQuery,
|
||
db_user: User,
|
||
db: AsyncSession,
|
||
):
|
||
try:
|
||
response_id = int(callback.data.split(":")[1])
|
||
except (IndexError, ValueError):
|
||
await callback.answer("❌ Опрос не найден", show_alert=True)
|
||
return
|
||
|
||
response = await get_poll_response_by_id(db, response_id)
|
||
if not response or response.user_id != db_user.id:
|
||
await callback.answer("❌ Опрос не найден", show_alert=True)
|
||
return
|
||
|
||
texts = get_texts(db_user.language)
|
||
|
||
if response.completed_at:
|
||
await callback.answer(texts.t("POLL_ALREADY_COMPLETED", "Вы уже прошли этот опрос."), show_alert=True)
|
||
return
|
||
|
||
if not response.poll or not response.poll.questions:
|
||
await callback.answer(texts.t("POLL_EMPTY", "Опрос пока недоступен."), show_alert=True)
|
||
return
|
||
|
||
if not response.started_at:
|
||
response.started_at = datetime.utcnow()
|
||
await db.commit()
|
||
|
||
index, question = await get_next_question(response)
|
||
if not question:
|
||
await callback.answer(texts.t("POLL_ERROR", "Не удалось загрузить вопросы."), show_alert=True)
|
||
return
|
||
|
||
question_text = await _render_question_text(
|
||
response.poll.title,
|
||
question,
|
||
index,
|
||
len(response.poll.questions),
|
||
db_user.language,
|
||
)
|
||
|
||
if not await _update_poll_message(
|
||
callback.message,
|
||
question_text,
|
||
reply_markup=_build_options_keyboard(response.id, question),
|
||
):
|
||
await callback.answer(texts.t("POLL_ERROR", "Не удалось показать вопрос."), show_alert=True)
|
||
return
|
||
await callback.answer()
|
||
|
||
|
||
async def handle_poll_answer(
|
||
callback: types.CallbackQuery,
|
||
db_user: User,
|
||
db: AsyncSession,
|
||
):
|
||
try:
|
||
_, response_id, question_id, option_id = callback.data.split(":", 3)
|
||
response_id = int(response_id)
|
||
question_id = int(question_id)
|
||
option_id = int(option_id)
|
||
except (ValueError, IndexError):
|
||
await callback.answer("❌ Некорректные данные", show_alert=True)
|
||
return
|
||
|
||
response = await get_poll_response_by_id(db, response_id)
|
||
texts = get_texts(db_user.language)
|
||
|
||
if not response or response.user_id != db_user.id:
|
||
await callback.answer("❌ Опрос не найден", show_alert=True)
|
||
return
|
||
|
||
if not response.poll:
|
||
await callback.answer(texts.t("POLL_ERROR", "Опрос недоступен."), show_alert=True)
|
||
return
|
||
|
||
if response.completed_at:
|
||
await callback.answer(texts.t("POLL_ALREADY_COMPLETED", "Вы уже прошли этот опрос."), show_alert=True)
|
||
return
|
||
|
||
question = next((q for q in response.poll.questions if q.id == question_id), None)
|
||
if not question:
|
||
await callback.answer(texts.t("POLL_ERROR", "Вопрос не найден."), show_alert=True)
|
||
return
|
||
|
||
option = await get_question_option(question, option_id)
|
||
if not option:
|
||
await callback.answer(texts.t("POLL_ERROR", "Вариант ответа не найден."), show_alert=True)
|
||
return
|
||
|
||
await record_poll_answer(
|
||
db,
|
||
response_id=response.id,
|
||
question_id=question.id,
|
||
option_id=option.id,
|
||
)
|
||
|
||
try:
|
||
await db.refresh(response, attribute_names=["answers"])
|
||
except Exception as error: # pragma: no cover - defensive cache busting
|
||
logger.debug(
|
||
"Не удалось обновить локальные ответы опроса %s: %s",
|
||
response.id,
|
||
error,
|
||
)
|
||
response = await get_poll_response_by_id(db, response.id)
|
||
if not response:
|
||
await callback.answer(texts.t("POLL_ERROR", "Опрос недоступен."), show_alert=True)
|
||
return
|
||
index, next_question = await get_next_question(response)
|
||
|
||
if next_question:
|
||
question_text = await _render_question_text(
|
||
response.poll.title,
|
||
next_question,
|
||
index,
|
||
len(response.poll.questions),
|
||
db_user.language,
|
||
)
|
||
if not await _update_poll_message(
|
||
callback.message,
|
||
question_text,
|
||
reply_markup=_build_options_keyboard(response.id, next_question),
|
||
):
|
||
await callback.answer(texts.t("POLL_ERROR", "Не удалось показать вопрос."), show_alert=True)
|
||
return
|
||
await callback.answer()
|
||
return
|
||
|
||
response.completed_at = datetime.utcnow()
|
||
await db.commit()
|
||
|
||
reward_amount = await reward_user_for_poll(db, response)
|
||
|
||
thanks_lines = [texts.t("POLL_COMPLETED", "🙏 Спасибо за участие в опросе!")]
|
||
if reward_amount:
|
||
thanks_lines.append(
|
||
texts.t(
|
||
"POLL_REWARD_GRANTED",
|
||
"Награда {amount} зачислена на ваш баланс.",
|
||
).format(amount=settings.format_price(reward_amount))
|
||
)
|
||
|
||
if not await _update_poll_message(
|
||
callback.message,
|
||
"\n\n".join(thanks_lines),
|
||
):
|
||
await callback.answer(texts.t("POLL_COMPLETED", "🙏 Спасибо за участие в опросе!"))
|
||
return
|
||
asyncio.create_task(
|
||
_delete_message_later(callback.bot, callback.message.chat.id, callback.message.message_id)
|
||
)
|
||
await callback.answer()
|
||
|
||
|
||
def register_handlers(dp: Dispatcher):
|
||
dp.callback_query.register(handle_poll_start, F.data.startswith("poll_start:"))
|
||
dp.callback_query.register(handle_poll_answer, F.data.startswith("poll_answer:"))
|