diff --git a/app/handlers/admin/daily_contests.py b/app/handlers/admin/daily_contests.py index 13731163..88d7506b 100644 --- a/app/handlers/admin/daily_contests.py +++ b/app/handlers/admin/daily_contests.py @@ -9,10 +9,11 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.config import settings from app.database.crud.contest import ( + clear_attempts, + create_round, get_template_by_id, list_templates, update_template_fields, - create_round, ) from app.database.models import ContestTemplate from app.keyboards.admin import ( @@ -28,7 +29,8 @@ from app.utils.decorators import admin_required, error_handler logger = logging.getLogger(__name__) EDITABLE_FIELDS: Dict[str, Dict] = { - "prize_days": {"type": int, "min": 1, "label": "приз (дни)"}, + "prize_type": {"type": str, "label": "тип приза (days/balance/custom)"}, + "prize_value": {"type": str, "label": "значение приза"}, "max_winners": {"type": int, "min": 1, "label": "макс. победителей"}, "attempts_per_user": {"type": int, "min": 1, "label": "попыток на пользователя"}, "times_per_day": {"type": int, "min": 1, "label": "раундов в день"}, @@ -57,7 +59,8 @@ async def show_daily_contests( else: for tpl in templates: status = "🟢" if tpl.is_enabled else "⚪️" - lines.append(f"{status} {tpl.name} (slug: {tpl.slug}) — приз {tpl.prize_days}д, макс {tpl.max_winners}") + prize_info = f"{tpl.prize_value} ({tpl.prize_type})" if tpl.prize_type else tpl.prize_value + lines.append(f"{status} {tpl.name} (slug: {tpl.slug}) — приз {prize_info}, макс {tpl.max_winners}") keyboard_rows = [] if templates: @@ -101,10 +104,12 @@ async def show_daily_contest( await callback.answer(texts.t("ADMIN_CONTEST_NOT_FOUND", "Конкурс не найден."), show_alert=True) return + prize_display = f"{tpl.prize_value} ({tpl.prize_type})" if tpl.prize_type else tpl.prize_value lines = [ f"🏷 {tpl.name} (slug: {tpl.slug})", f"{texts.t('ADMIN_CONTEST_STATUS_ACTIVE','🟢 Активен') if tpl.is_enabled else texts.t('ADMIN_CONTEST_STATUS_INACTIVE','⚪️ Выключен')}", - f"Приз: {tpl.prize_days} дн. | Макс победителей: {tpl.max_winners}", + f"Тип приза: {tpl.prize_type or 'days'} | Значение: {tpl.prize_value or '1'}", + f"Макс победителей: {tpl.max_winners}", f"Попыток/польз: {tpl.attempts_per_user}", f"Раундов в день: {tpl.times_per_day}", f"Расписание: {tpl.schedule_times or '-'}", diff --git a/app/handlers/contests.py b/app/handlers/contests.py index 28f8355b..dd54de6d 100644 --- a/app/handlers/contests.py +++ b/app/handlers/contests.py @@ -1,59 +1,52 @@ +"""Contest handlers for daily games.""" + import logging -import random -from datetime import datetime, timedelta +from datetime import datetime from typing import Optional + from aiogram import Dispatcher, F, types -from aiogram.fsm.context import FSMContext from aiogram.filters import Command +from aiogram.fsm.context import FSMContext from sqlalchemy.ext.asyncio import AsyncSession -from app.config import settings -from app.database.crud.contest import ( - get_active_rounds, - get_attempt, - create_attempt, - update_attempt, - increment_winner_count, -) -from app.database.database import AsyncSessionLocal -from app.database.models import ContestRound, ContestTemplate, SubscriptionStatus -from app.localization.texts import get_texts -from app.services.contest_rotation_service import ( - GAME_QUEST, - GAME_LOCKS, - GAME_CIPHER, - GAME_SERVER, - GAME_BLITZ, - GAME_EMOJI, - GAME_ANAGRAM, -) +from app.database.crud.contest import get_active_rounds, get_attempt from app.database.crud.subscription import get_subscription_by_user_id -from app.database.crud.subscription import extend_subscription -from app.utils.decorators import auth_required, error_handler +from app.database.database import AsyncSessionLocal +from app.database.models import ContestRound, SubscriptionStatus from app.keyboards.inline import get_back_keyboard +from app.localization.texts import get_texts +from app.services.contests import ( + ContestAttemptService, + GameType, + get_game_strategy, +) from app.states import ContestStates +from app.utils.decorators import auth_required, error_handler logger = logging.getLogger(__name__) -# Rate limiting for contests -_contest_rate_limits = {} +# Rate limiting storage +_rate_limits: dict = {} + +# Service instance +_attempt_service = ContestAttemptService() def _check_rate_limit(user_id: int, action: str, limit: int = 1, window_seconds: int = 5) -> bool: """Check if user exceeds rate limit for contest actions.""" key = f"{user_id}_{action}" now = datetime.utcnow().timestamp() - - if key not in _contest_rate_limits: - _contest_rate_limits[key] = [] - + + if key not in _rate_limits: + _rate_limits[key] = [] + # Clean old entries - _contest_rate_limits[key] = [t for t in _contest_rate_limits[key] if now - t < window_seconds] - - if len(_contest_rate_limits[key]) >= limit: + _rate_limits[key] = [t for t in _rate_limits[key] if now - t < window_seconds] + + if len(_rate_limits[key]) >= limit: return False - - _contest_rate_limits[key].append(now) + + _rate_limits[key].append(now) return True @@ -61,20 +54,20 @@ def _validate_callback_data(data: str) -> Optional[list]: """Validate and parse callback data safely.""" if not data or not isinstance(data, str): return None - + parts = data.split("_") if len(parts) < 2 or parts[0] != "contest": return None - - # Basic validation for parts + for part in parts: - if not part or len(part) > 50: # reasonable limit + if not part or len(part) > 50: return None - + return parts def _user_allowed(subscription) -> bool: + """Check if user has active or trial subscription.""" if not subscription: return False return subscription.status in { @@ -83,38 +76,13 @@ def _user_allowed(subscription) -> bool: } -async def _award_prize(db: AsyncSession, user_id: int, prize_type: str, prize_value: str, language: str) -> str: - from app.database.crud.user import get_user_by_id - user = await get_user_by_id(db, user_id) - if not user: - return "" - - texts = get_texts(language) - - if prize_type == "days": - subscription = await get_subscription_by_user_id(db, user_id) - if not subscription: - return "" - days = int(prize_value) if prize_value.isdigit() else 1 - await extend_subscription(db, subscription, days) - return texts.t("CONTEST_PRIZE_GRANTED", "Бонус {days} дней зачислен!").format(days=days) - - elif prize_type == "balance": - kopeks = int(prize_value) if prize_value.isdigit() else 0 - if kopeks > 0: - user.balance_kopeks += kopeks - return texts.t("CONTEST_BALANCE_GRANTED", "Бонус {amount} зачислен!").format(amount=settings.format_price(kopeks)) - - elif prize_type == "custom": - # For custom prizes, just send a message - return f"🎁 {prize_value}" - - return "" - - async def _reply_not_eligible(callback: types.CallbackQuery, language: str): + """Reply that user is not eligible to play.""" texts = get_texts(language) - await callback.answer(texts.t("CONTEST_NOT_ELIGIBLE", "Игры доступны только с активной или триальной подпиской."), show_alert=True) + await callback.answer( + texts.t("CONTEST_NOT_ELIGIBLE", "Игры доступны только с активной или триальной подпиской."), + show_alert=True, + ) # ---------- Handlers ---------- @@ -123,13 +91,17 @@ async def _reply_not_eligible(callback: types.CallbackQuery, language: str): @auth_required @error_handler async def show_contests_menu(callback: types.CallbackQuery, db_user, db: AsyncSession): + """Show menu with available contest games.""" texts = get_texts(db_user.language) + subscription = await get_subscription_by_user_id(db, db_user.id) if not _user_allowed(subscription): await _reply_not_eligible(callback, db_user.language) return active_rounds = await get_active_rounds(db) + + # Group by template, take one round per template unique_templates = {} for rnd in active_rounds: if not rnd.template or not rnd.template.is_enabled: @@ -141,19 +113,24 @@ async def show_contests_menu(callback: types.CallbackQuery, db_user, db: AsyncSe buttons = [] for tpl_slug, rnd in unique_templates.items(): title = rnd.template.name if rnd.template else tpl_slug - buttons.append( - [ - types.InlineKeyboardButton( - text=f"▶️ {title}", - callback_data=f"contest_play_{tpl_slug}_{rnd.id}", - ) - ] - ) + buttons.append([ + types.InlineKeyboardButton( + text=f"▶️ {title}", + callback_data=f"contest_play_{tpl_slug}_{rnd.id}", + ) + ]) + if not buttons: - buttons.append( - [types.InlineKeyboardButton(text=texts.t("CONTEST_EMPTY", "Сейчас игр нет"), callback_data="noop")] - ) - buttons.append([types.InlineKeyboardButton(text=texts.BACK, callback_data="back_to_menu")]) + buttons.append([ + types.InlineKeyboardButton( + text=texts.t("CONTEST_EMPTY", "Сейчас игр нет"), + callback_data="noop", + ) + ]) + + buttons.append([ + types.InlineKeyboardButton(text=texts.BACK, callback_data="back_to_menu") + ]) await callback.message.edit_text( texts.t("CONTEST_MENU_TITLE", "🎲 Игры/Конкурсы\nВыберите игру:"), @@ -165,7 +142,9 @@ async def show_contests_menu(callback: types.CallbackQuery, db_user, db: AsyncSe @auth_required @error_handler async def play_contest(callback: types.CallbackQuery, state: FSMContext, db_user, db: AsyncSession): + """Start playing a specific contest.""" texts = get_texts(db_user.language) + subscription = await get_subscription_by_user_id(db, db_user.id) if not _user_allowed(subscription): await _reply_not_eligible(callback, db_user.language) @@ -173,7 +152,10 @@ async def play_contest(callback: types.CallbackQuery, state: FSMContext, db_user # Rate limit check if not _check_rate_limit(db_user.id, "contest_play", limit=2, window_seconds=10): - await callback.answer(texts.t("CONTEST_TOO_FAST", "Слишком быстро! Подождите."), show_alert=True) + await callback.answer( + texts.t("CONTEST_TOO_FAST", "Слишком быстро! Подождите."), + show_alert=True, + ) return # Validate callback data @@ -189,332 +171,169 @@ async def play_contest(callback: types.CallbackQuery, state: FSMContext, db_user await callback.answer("Некорректные данные", show_alert=True) return - slug = "_".join(parts[2:-1]) - - # reload round with template + # Get round with template async with AsyncSessionLocal() as db2: active_rounds = await get_active_rounds(db2) round_obj = next((r for r in active_rounds if r.id == round_id), None) + if not round_obj: - await callback.answer(texts.t("CONTEST_ROUND_FINISHED", "Раунд завершён или недоступен."), show_alert=True) + await callback.answer( + texts.t("CONTEST_ROUND_FINISHED", "Раунд завершён или недоступен."), + show_alert=True, + ) return + if not round_obj.template or not round_obj.template.is_enabled: - await callback.answer(texts.t("CONTEST_DISABLED", "Игра отключена."), show_alert=True) + await callback.answer( + texts.t("CONTEST_DISABLED", "Игра отключена."), + show_alert=True, + ) return + + # Check if user already played attempt = await get_attempt(db2, round_id, db_user.id) if attempt: - await callback.answer(texts.t("CONTEST_ALREADY_PLAYED", "У вас уже была попытка в этом раунде."), show_alert=True) + await callback.answer( + texts.t("CONTEST_ALREADY_PLAYED", "У вас уже была попытка в этом раунде."), + show_alert=True, + ) return + # Get game strategy and render tpl = round_obj.template - if tpl.slug == GAME_QUEST: - await _render_quest(callback, db_user, round_obj, tpl) - elif tpl.slug == GAME_LOCKS: - await _render_locks(callback, db_user, round_obj, tpl) - elif tpl.slug == GAME_SERVER: - await _render_server_lottery(callback, db_user, round_obj, tpl) - elif tpl.slug == GAME_CIPHER: - await _render_cipher(callback, db_user, round_obj, tpl, state, db2) - elif tpl.slug == GAME_EMOJI: - await _render_emoji(callback, db_user, round_obj, tpl, state, db2) - elif tpl.slug == GAME_ANAGRAM: - await _render_anagram(callback, db_user, round_obj, tpl, state, db2) - elif tpl.slug == GAME_BLITZ: - await _render_blitz(callback, db_user, round_obj, tpl) - else: - await callback.answer(texts.t("CONTEST_UNKNOWN", "Тип конкурса не поддерживается."), show_alert=True) + strategy = get_game_strategy(tpl.slug) - -async def _render_quest(callback, db_user, round_obj: ContestRound, tpl: ContestTemplate): - texts = get_texts(db_user.language) - rows = round_obj.payload.get("rows", 3) - cols = round_obj.payload.get("cols", 3) - keyboard = [] - for r in range(rows): - row_buttons = [] - for c in range(cols): - idx = r * cols + c - row_buttons.append( - types.InlineKeyboardButton( - text="🎛", - callback_data=f"contest_pick_{round_obj.id}_quest_{idx}" - ) + if not strategy: + await callback.answer( + texts.t("CONTEST_UNKNOWN", "Тип конкурса не поддерживается."), + show_alert=True, ) - keyboard.append(row_buttons) - keyboard.append([types.InlineKeyboardButton(text=texts.BACK, callback_data="contests_menu")]) - await callback.message.edit_text( - texts.t("CONTEST_QUEST_PROMPT", "Выбери один из узлов 3×3:"), - reply_markup=types.InlineKeyboardMarkup(inline_keyboard=keyboard), - ) - await callback.answer() + return + render_result = strategy.render( + round_id=round_obj.id, + payload=round_obj.payload or {}, + language=db_user.language, + ) -async def _render_locks(callback, db_user, round_obj: ContestRound, tpl: ContestTemplate): - texts = get_texts(db_user.language) - total = round_obj.payload.get("total", 20) - keyboard = [] - row = [] - for i in range(total): - row.append(types.InlineKeyboardButton(text="🔒", callback_data=f"contest_pick_{round_obj.id}_locks_{i}")) - if len(row) == 5: - keyboard.append(row) - row = [] - if row: - keyboard.append(row) - keyboard.append([types.InlineKeyboardButton(text=texts.BACK, callback_data="contests_menu")]) - await callback.message.edit_text( - texts.t("CONTEST_LOCKS_PROMPT", "Найди взломанную кнопку среди замков:"), - reply_markup=types.InlineKeyboardMarkup(inline_keyboard=keyboard), - ) - await callback.answer() + # For text input games, create pending attempt and set FSM state + if render_result.requires_text_input: + await _attempt_service.create_pending_attempt(db2, round_obj.id, db_user.id) + await state.set_state(ContestStates.waiting_for_answer) + await state.update_data(contest_round_id=round_obj.id) - -async def _render_server_lottery(callback, db_user, round_obj: ContestRound, tpl: ContestTemplate): - texts = get_texts(db_user.language) - flags = round_obj.payload.get("flags") or [] - shuffled_flags = flags.copy() - random.shuffle(shuffled_flags) - keyboard = [] - row = [] - for flag in shuffled_flags: - row.append(types.InlineKeyboardButton(text=flag, callback_data=f"contest_pick_{round_obj.id}_{flag}")) - if len(row) == 5: - keyboard.append(row) - row = [] - if row: - keyboard.append(row) - keyboard.append([types.InlineKeyboardButton(text=texts.BACK, callback_data="contests_menu")]) - await callback.message.edit_text( - texts.t("CONTEST_SERVER_PROMPT", "Выбери сервер:"), - reply_markup=types.InlineKeyboardMarkup(inline_keyboard=keyboard), - ) - await callback.answer() - - -async def _render_cipher(callback, db_user, round_obj: ContestRound, tpl: ContestTemplate, state: FSMContext, db: AsyncSession): - texts = get_texts(db_user.language) - question = round_obj.payload.get("question", "") - # Create attempt immediately to block re-entry - await create_attempt(db, round_id=round_obj.id, user_id=db_user.id, answer=None, is_winner=False) - await state.set_state(ContestStates.waiting_for_answer) - await state.update_data(contest_round_id=round_obj.id) - await callback.message.edit_text( - texts.t("CONTEST_CIPHER_PROMPT", "Расшифруй: {q}").format(q=question), - reply_markup=get_back_keyboard(db_user.language), - ) - await callback.answer() - - -async def _render_emoji(callback, db_user, round_obj: ContestRound, tpl: ContestTemplate, state: FSMContext, db: AsyncSession): - texts = get_texts(db_user.language) - question = round_obj.payload.get("question", "🤔") - emoji_list = question.split() - random.shuffle(emoji_list) - shuffled_question = " ".join(emoji_list) - # Create attempt immediately to block re-entry - await create_attempt(db, round_id=round_obj.id, user_id=db_user.id, answer=None, is_winner=False) - await state.set_state(ContestStates.waiting_for_answer) - await state.update_data(contest_round_id=round_obj.id) - await callback.message.edit_text( - texts.t("CONTEST_EMOJI_PROMPT", "Угадай сервис по эмодзи: {q}").format(q=shuffled_question), - reply_markup=get_back_keyboard(db_user.language), - ) - await callback.answer() - - -async def _render_anagram(callback, db_user, round_obj: ContestRound, tpl: ContestTemplate, state: FSMContext, db: AsyncSession): - texts = get_texts(db_user.language) - letters = round_obj.payload.get("letters", "") - # Create attempt immediately to block re-entry - await create_attempt(db, round_id=round_obj.id, user_id=db_user.id, answer=None, is_winner=False) - await state.set_state(ContestStates.waiting_for_answer) - await state.update_data(contest_round_id=round_obj.id) - await callback.message.edit_text( - texts.t("CONTEST_ANAGRAM_PROMPT", "Составь слово: {letters}").format(letters=letters), - reply_markup=get_back_keyboard(db_user.language), - ) - await callback.answer() - - -async def _render_blitz(callback, db_user, round_obj: ContestRound, tpl: ContestTemplate): - texts = get_texts(db_user.language) - keyboard = types.InlineKeyboardMarkup( - inline_keyboard=[ - [types.InlineKeyboardButton(text=texts.t("CONTEST_BLITZ_BUTTON", "Я здесь!"), callback_data=f"contest_pick_{round_obj.id}_blitz")], - [types.InlineKeyboardButton(text=texts.BACK, callback_data="contests_menu")] - ] - ) - await callback.message.edit_text( - texts.t("CONTEST_BLITZ_PROMPT", "⚡️ Блиц! Нажми «Я здесь!»"), - reply_markup=keyboard, - ) - await callback.answer() + await callback.message.edit_text( + render_result.text, + reply_markup=render_result.keyboard, + ) + await callback.answer() @auth_required @error_handler async def handle_pick(callback: types.CallbackQuery, db_user, db: AsyncSession): + """Handle button pick in contest games.""" texts = get_texts(db_user.language) - + # Rate limit check if not _check_rate_limit(db_user.id, "contest_pick", limit=1, window_seconds=3): - await callback.answer(texts.t("CONTEST_TOO_FAST", "Слишком быстро! Подождите."), show_alert=True) + await callback.answer( + texts.t("CONTEST_TOO_FAST", "Слишком быстро! Подождите."), + show_alert=True, + ) return - + # Validate callback data parts = _validate_callback_data(callback.data) - if not parts: + if not parts or len(parts) < 4 or parts[1] != "pick": await callback.answer("Некорректные данные", show_alert=True) return - - if len(parts) < 4 or parts[1] != "pick": - await callback.answer("Некорректные данные", show_alert=True) - return - + round_id_str = parts[2] pick = "_".join(parts[3:]) - + try: round_id = int(round_id_str) except ValueError: await callback.answer("Некорректные данные", show_alert=True) return - - # Re-check authorization + + # Re-check subscription subscription = await get_subscription_by_user_id(db, db_user.id) if not _user_allowed(subscription): - await callback.answer(texts.t("CONTEST_NOT_ELIGIBLE", "Игра недоступна без активной подписки."), show_alert=True) + await callback.answer( + texts.t("CONTEST_NOT_ELIGIBLE", "Игра недоступна без активной подписки."), + show_alert=True, + ) return async with AsyncSessionLocal() as db2: active_rounds = await get_active_rounds(db2) round_obj = next((r for r in active_rounds if r.id == round_id), None) + if not round_obj: - await callback.answer(texts.t("CONTEST_ROUND_FINISHED", "Раунд завершён."), show_alert=True) + await callback.answer( + texts.t("CONTEST_ROUND_FINISHED", "Раунд завершён."), + show_alert=True, + ) return - tpl = round_obj.template - attempt = await get_attempt(db2, round_id, db_user.id) - if attempt: - await callback.answer(texts.t("CONTEST_ALREADY_PLAYED", "У вас уже была попытка."), show_alert=True) - return + # Process attempt using service + result = await _attempt_service.process_button_attempt( + db=db2, + round_obj=round_obj, + user_id=db_user.id, + pick=pick, + language=db_user.language, + ) - secret_idx = round_obj.payload.get("secret_idx") - correct_flag = "" - if tpl.slug == GAME_SERVER: - flags = round_obj.payload.get("flags") or [] - correct_flag = flags[secret_idx] if secret_idx is not None and secret_idx < len(flags) else "" - - is_winner = False - if tpl.slug == GAME_SERVER: - is_winner = pick == correct_flag - elif tpl.slug == GAME_QUEST: - # Format: quest_{idx} - try: - if pick.startswith("quest_"): - idx = int(pick.split("_")[1]) - is_winner = secret_idx is not None and idx == secret_idx - except (ValueError, IndexError): - is_winner = False - elif tpl.slug == GAME_LOCKS: - # Format: locks_{idx} - try: - if pick.startswith("locks_"): - idx = int(pick.split("_")[1]) - is_winner = secret_idx is not None and idx == secret_idx - except (ValueError, IndexError): - is_winner = False - elif tpl.slug == GAME_BLITZ: - is_winner = pick == "blitz" - else: - is_winner = False - - # Log attempt - logger.info(f"Contest attempt: user {db_user.id}, round {round_id}, pick '{pick}', winner {is_winner}") - - # Atomic winner check and increment - from sqlalchemy import select - stmt = select(ContestRound).where(ContestRound.id == round_id).with_for_update() - result = await db2.execute(stmt) - round_obj_locked = result.scalar_one() - - if is_winner and round_obj_locked.winners_count >= round_obj_locked.max_winners: - is_winner = False - - await create_attempt(db2, round_id=round_obj.id, user_id=db_user.id, answer=str(pick), is_winner=is_winner) - - if is_winner: - round_obj_locked.winners_count += 1 - await db2.commit() - prize_text = await _award_prize(db2, db_user.id, tpl.prize_type, tpl.prize_value, db_user.language) - await callback.answer(texts.t("CONTEST_WIN", "🎉 Победа! ") + (prize_text or ""), show_alert=True) - else: - responses = { - GAME_QUEST: ["Пусто", "Ложный сервер", "Найди другой узел"], - GAME_LOCKS: ["Заблокировано", "Попробуй ещё", "Нет доступа"], - GAME_SERVER: ["Сервер перегружен", "Нет ответа", "Попробуй завтра"], - }.get(tpl.slug, ["Неудача"]) - await callback.answer(random.choice(responses), show_alert=True) + await callback.answer(result.message, show_alert=True) @auth_required @error_handler async def handle_text_answer(message: types.Message, state: FSMContext, db_user, db: AsyncSession): + """Handle text answer in contest games.""" texts = get_texts(db_user.language) + data = await state.get_data() round_id = data.get("contest_round_id") if not round_id: + await state.clear() return async with AsyncSessionLocal() as db2: active_rounds = await get_active_rounds(db2) round_obj = next((r for r in active_rounds if r.id == round_id), None) + if not round_obj: - await message.answer(texts.t("CONTEST_ROUND_FINISHED", "Раунд завершён."), reply_markup=get_back_keyboard(db_user.language)) + await message.answer( + texts.t("CONTEST_ROUND_FINISHED", "Раунд завершён."), + reply_markup=get_back_keyboard(db_user.language), + ) await state.clear() return - attempt = await get_attempt(db2, round_obj.id, db_user.id) - if not attempt: - # No attempt found - user didn't start the game properly - await message.answer(texts.t("CONTEST_NOT_STARTED", "Сначала начните игру."), reply_markup=get_back_keyboard(db_user.language)) - await state.clear() - return - - if attempt.answer is not None: - # Already answered - block re-entry - await message.answer(texts.t("CONTEST_ALREADY_PLAYED", "У вас уже была попытка."), reply_markup=get_back_keyboard(db_user.language)) - await state.clear() - return + # Process attempt using service + text_answer = (message.text or "").strip() + result = await _attempt_service.process_text_attempt( + db=db2, + round_obj=round_obj, + user_id=db_user.id, + text_answer=text_answer, + language=db_user.language, + ) - answer = (message.text or "").strip().upper() - tpl = round_obj.template - correct = (round_obj.payload.get("answer") or "").upper() + await message.answer( + result.message, + reply_markup=get_back_keyboard(db_user.language), + ) - is_winner = correct and answer == correct - - # Atomic winner check and increment - from sqlalchemy import select - stmt = select(ContestRound).where(ContestRound.id == round_id).with_for_update() - result = await db2.execute(stmt) - round_obj_locked = result.scalar_one() - - if is_winner and round_obj_locked.winners_count >= round_obj_locked.max_winners: - is_winner = False - - await update_attempt(db2, attempt, answer=answer, is_winner=is_winner) - - if is_winner: - round_obj_locked.winners_count += 1 - await db2.commit() - prize_text = await _award_prize(db2, db_user.id, tpl.prize_type, tpl.prize_value, db_user.language) - await message.answer(texts.t("CONTEST_WIN", "🎉 Победа! ") + (prize_text or ""), reply_markup=get_back_keyboard(db_user.language)) - else: - await message.answer(texts.t("CONTEST_LOSE", "Не верно, попробуй снова в следующем раунде."), reply_markup=get_back_keyboard(db_user.language)) await state.clear() def register_handlers(dp: Dispatcher): + """Register contest handlers.""" dp.callback_query.register(show_contests_menu, F.data == "contests_menu") dp.callback_query.register(play_contest, F.data.startswith("contest_play_")) dp.callback_query.register(handle_pick, F.data.startswith("contest_pick_")) diff --git a/app/services/contest_rotation_service.py b/app/services/contest_rotation_service.py index b5596af0..95ac7500 100644 --- a/app/services/contest_rotation_service.py +++ b/app/services/contest_rotation_service.py @@ -1,6 +1,5 @@ import asyncio import logging -import random from datetime import datetime, timedelta, time, timezone from typing import Dict, List, Optional from zoneinfo import ZoneInfo @@ -18,17 +17,19 @@ from app.database.crud.contest import ( ) from app.database.database import AsyncSessionLocal from app.database.models import ContestTemplate, SubscriptionStatus, User +from app.services.contests.enums import GameType, PrizeType, RoundStatus +from app.services.contests.games import get_game_strategy logger = logging.getLogger(__name__) -# Slugs for games -GAME_QUEST = "quest_buttons" -GAME_LOCKS = "lock_hack" -GAME_CIPHER = "letter_cipher" -GAME_SERVER = "server_lottery" -GAME_BLITZ = "blitz_reaction" -GAME_EMOJI = "emoji_guess" -GAME_ANAGRAM = "anagram" +# Legacy aliases for backward compatibility +GAME_QUEST = GameType.QUEST_BUTTONS.value +GAME_LOCKS = GameType.LOCK_HACK.value +GAME_CIPHER = GameType.LETTER_CIPHER.value +GAME_SERVER = GameType.SERVER_LOTTERY.value +GAME_BLITZ = GameType.BLITZ_REACTION.value +GAME_EMOJI = GameType.EMOJI_GUESS.value +GAME_ANAGRAM = GameType.ANAGRAM.value DEFAULT_TEMPLATES = [ @@ -246,38 +247,12 @@ class ContestRotationService: return ZoneInfo("UTC") def _build_payload_for_template(self, tpl: ContestTemplate) -> Dict: - payload = tpl.payload or {} - if tpl.slug == GAME_QUEST: - rows = payload.get("rows", 3) - cols = payload.get("cols", 3) - total = rows * cols - secret_idx = random.randint(0, total - 1) - return {"rows": rows, "cols": cols, "secret_idx": secret_idx} - if tpl.slug == GAME_LOCKS: - total = payload.get("buttons", 20) - secret_idx = random.randint(0, max(0, total - 1)) - return {"total": total, "secret_idx": secret_idx} - if tpl.slug == GAME_CIPHER: - words = payload.get("words") or ["VPN"] - word = random.choice(words) - codes = [str(ord(ch.upper()) - 64) for ch in word if ch.isalpha()] - return {"question": "-".join(codes), "answer": word.upper()} - if tpl.slug == GAME_SERVER: - flags = payload.get("flags") or ["🇸🇪","🇸🇬","🇺🇸","🇷🇺","🇩🇪","🇯🇵","🇧🇷","🇦🇺","🇨🇦","🇫🇷"] - secret_idx = random.randint(0, len(flags) - 1) - return {"flags": flags, "secret_idx": secret_idx} - if tpl.slug == GAME_BLITZ: - return {"timeout_seconds": payload.get("timeout_seconds", 10)} - if tpl.slug == GAME_EMOJI: - pairs = payload.get("pairs") or [{"question": "🔐📡🌐", "answer": "VPN"}] - pair = random.choice(pairs) - return pair - if tpl.slug == GAME_ANAGRAM: - words = payload.get("words") or ["SERVER"] - word = random.choice(words).upper() - shuffled = "".join(random.sample(word, len(word))) - return {"letters": shuffled, "answer": word} - return payload + """Build round-specific payload using game strategy.""" + strategy = get_game_strategy(tpl.slug) + if strategy: + return strategy.build_payload(tpl.payload or {}) + # Fallback for unknown game types + return tpl.payload or {} async def _announce_round_start( self, @@ -289,22 +264,20 @@ class ContestRotationService: return from app.localization.texts import get_texts - texts = get_texts("ru") # Default to ru for announcements, or detect - + texts = get_texts("ru") # Default to ru for announcements + # Format prize display based on prize_type - prize_display = "" - if hasattr(tpl, 'prize_type') and tpl.prize_type: - if tpl.prize_type == "days": - prize_display = f"{tpl.prize_value} {texts.t('DAYS', 'дн. подписки')}" - elif tpl.prize_type == "balance": - prize_display = f"{tpl.prize_value} коп." - elif tpl.prize_type == "custom": - prize_display = tpl.prize_value - else: - prize_display = tpl.prize_value + prize_type = tpl.prize_type or PrizeType.DAYS.value + prize_value = tpl.prize_value or "1" + + if prize_type == PrizeType.DAYS.value: + prize_display = f"{prize_value} {texts.t('DAYS', 'дн. подписки')}" + elif prize_type == PrizeType.BALANCE.value: + prize_display = f"{prize_value} коп." + elif prize_type == PrizeType.CUSTOM.value: + prize_display = prize_value else: - # Fallback for old templates - prize_display = f"{getattr(tpl, 'prize_days', 1)} {texts.t('DAYS', 'дн. подписки')}" + prize_display = prize_value text = ( f"🎲 {texts.t('CONTEST_START_ANNOUNCEMENT', 'Стартует игра')}: {tpl.name}\n" diff --git a/app/services/contests/__init__.py b/app/services/contests/__init__.py new file mode 100644 index 00000000..831ae551 --- /dev/null +++ b/app/services/contests/__init__.py @@ -0,0 +1,14 @@ +"""Contest services module.""" + +from app.services.contests.enums import GameType, RoundStatus, PrizeType +from app.services.contests.games import get_game_strategy, BaseGameStrategy +from app.services.contests.attempt_service import ContestAttemptService + +__all__ = [ + "GameType", + "RoundStatus", + "PrizeType", + "get_game_strategy", + "BaseGameStrategy", + "ContestAttemptService", +] diff --git a/app/services/contests/attempt_service.py b/app/services/contests/attempt_service.py new file mode 100644 index 00000000..97bc6cfd --- /dev/null +++ b/app/services/contests/attempt_service.py @@ -0,0 +1,316 @@ +"""Service for atomic contest attempt operations.""" + +import logging +from dataclasses import dataclass +from typing import Optional + +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.config import settings +from app.database.crud.contest import create_attempt, get_attempt, update_attempt +from app.database.crud.subscription import extend_subscription, get_subscription_by_user_id +from app.database.crud.user import get_user_by_id +from app.database.models import ContestAttempt, ContestRound, ContestTemplate +from app.services.contests.enums import PrizeType +from app.services.contests.games import get_game_strategy + +logger = logging.getLogger(__name__) + + +@dataclass +class AttemptResult: + """Result of processing a contest attempt.""" + + success: bool + is_winner: bool + message: str + already_played: bool = False + round_finished: bool = False + + +class ContestAttemptService: + """Service for processing contest attempts with atomic operations.""" + + async def process_button_attempt( + self, + db: AsyncSession, + round_obj: ContestRound, + user_id: int, + pick: str, + language: str, + ) -> AttemptResult: + """ + Process a button-based game attempt atomically. + + Args: + db: Database session + round_obj: Contest round + user_id: User ID + pick: User's pick (button callback data) + language: User's language + + Returns: + AttemptResult with outcome details + """ + tpl = round_obj.template + if not tpl: + return AttemptResult( + success=False, + is_winner=False, + message="Конкурс не найден", + ) + + # Check if user already played + existing_attempt = await get_attempt(db, round_obj.id, user_id) + if existing_attempt: + return AttemptResult( + success=False, + is_winner=False, + message="У вас уже была попытка", + already_played=True, + ) + + # Get game strategy and check answer + strategy = get_game_strategy(tpl.slug) + if not strategy: + return AttemptResult( + success=False, + is_winner=False, + message="Тип игры не поддерживается", + ) + + check_result = strategy.check_answer(pick, round_obj.payload or {}, language) + is_winner = check_result.is_correct + + # Atomic winner check with row lock + is_winner = await self._atomic_winner_check(db, round_obj.id, is_winner) + + # Create attempt record + await create_attempt( + db, + round_id=round_obj.id, + user_id=user_id, + answer=str(pick), + is_winner=is_winner, + ) + + logger.info( + "Contest attempt: user %s, round %s, pick '%s', winner %s", + user_id, round_obj.id, pick, is_winner + ) + + if is_winner: + prize_msg = await self._award_prize(db, user_id, tpl, language) + return AttemptResult( + success=True, + is_winner=True, + message=f"🎉 Победа! {prize_msg}" if prize_msg else "🎉 Победа!", + ) + + return AttemptResult( + success=True, + is_winner=False, + message=check_result.response_text or "Неудача", + ) + + async def process_text_attempt( + self, + db: AsyncSession, + round_obj: ContestRound, + user_id: int, + text_answer: str, + language: str, + ) -> AttemptResult: + """ + Process a text-input game attempt atomically. + + Args: + db: Database session + round_obj: Contest round + user_id: User ID + text_answer: User's text answer + language: User's language + + Returns: + AttemptResult with outcome details + """ + tpl = round_obj.template + if not tpl: + return AttemptResult( + success=False, + is_winner=False, + message="Конкурс не найден", + ) + + # For text games, attempt should already exist (created in render phase) + attempt = await get_attempt(db, round_obj.id, user_id) + if not attempt: + return AttemptResult( + success=False, + is_winner=False, + message="Сначала начните игру", + ) + + # Check if already answered + if attempt.answer is not None: + return AttemptResult( + success=False, + is_winner=False, + message="У вас уже была попытка", + already_played=True, + ) + + # Get game strategy and check answer + strategy = get_game_strategy(tpl.slug) + if not strategy: + return AttemptResult( + success=False, + is_winner=False, + message="Тип игры не поддерживается", + ) + + check_result = strategy.check_answer(text_answer, round_obj.payload or {}, language) + is_winner = check_result.is_correct + + # Atomic winner check with row lock + is_winner = await self._atomic_winner_check(db, round_obj.id, is_winner) + + # Update attempt with answer + await update_attempt(db, attempt, answer=text_answer.strip().upper(), is_winner=is_winner) + + logger.info( + "Contest text attempt: user %s, round %s, answer '%s', winner %s", + user_id, round_obj.id, text_answer, is_winner + ) + + if is_winner: + prize_msg = await self._award_prize(db, user_id, tpl, language) + return AttemptResult( + success=True, + is_winner=True, + message=f"🎉 Победа! {prize_msg}" if prize_msg else "🎉 Победа!", + ) + + return AttemptResult( + success=True, + is_winner=False, + message=check_result.response_text or "Неверно, попробуй в следующем раунде", + ) + + async def create_pending_attempt( + self, + db: AsyncSession, + round_id: int, + user_id: int, + ) -> Optional[ContestAttempt]: + """ + Create a pending attempt for text-input games. + This blocks re-entry while user is answering. + + Args: + db: Database session + round_id: Round ID + user_id: User ID + + Returns: + Created attempt or None if already exists + """ + existing = await get_attempt(db, round_id, user_id) + if existing: + return None + + return await create_attempt( + db, + round_id=round_id, + user_id=user_id, + answer=None, + is_winner=False, + ) + + async def _atomic_winner_check( + self, + db: AsyncSession, + round_id: int, + is_winner: bool, + ) -> bool: + """ + Atomically check and increment winner count. + Uses SELECT FOR UPDATE to prevent race conditions. + + Args: + db: Database session + round_id: Round ID + is_winner: Whether user answered correctly + + Returns: + True if user is a winner, False if max winners reached + """ + if not is_winner: + return False + + stmt = select(ContestRound).where(ContestRound.id == round_id).with_for_update() + result = await db.execute(stmt) + round_obj = result.scalar_one() + + if round_obj.winners_count >= round_obj.max_winners: + return False + + round_obj.winners_count += 1 + await db.commit() + return True + + async def _award_prize( + self, + db: AsyncSession, + user_id: int, + template: ContestTemplate, + language: str, + ) -> str: + """ + Award prize to winner. + + Args: + db: Database session + user_id: Winner user ID + template: Contest template with prize info + language: User's language + + Returns: + Prize notification message + """ + from app.localization.texts import get_texts + texts = get_texts(language) + + prize_type = template.prize_type or PrizeType.DAYS.value + prize_value = template.prize_value or "1" + + if prize_type == PrizeType.DAYS.value: + subscription = await get_subscription_by_user_id(db, user_id) + if not subscription: + return "" + days = int(prize_value) if prize_value.isdigit() else 1 + await extend_subscription(db, subscription, days) + return texts.t("CONTEST_PRIZE_GRANTED", "Бонус {days} дней зачислен!").format(days=days) + + elif prize_type == PrizeType.BALANCE.value: + user = await get_user_by_id(db, user_id) + if not user: + return "" + kopeks = int(prize_value) if prize_value.isdigit() else 0 + if kopeks > 0: + user.balance_kopeks += kopeks + await db.commit() + return texts.t( + "CONTEST_BALANCE_GRANTED", + "Бонус {amount} зачислен!" + ).format(amount=settings.format_price(kopeks)) + + elif prize_type == PrizeType.CUSTOM.value: + return f"🎁 {prize_value}" + + return "" + + +# Singleton instance +contest_attempt_service = ContestAttemptService() diff --git a/app/services/contests/enums.py b/app/services/contests/enums.py new file mode 100644 index 00000000..070477a8 --- /dev/null +++ b/app/services/contests/enums.py @@ -0,0 +1,45 @@ +"""Enum classes for contest system.""" + +from enum import Enum + + +class GameType(str, Enum): + """Types of daily contest games.""" + + QUEST_BUTTONS = "quest_buttons" + LOCK_HACK = "lock_hack" + LETTER_CIPHER = "letter_cipher" + SERVER_LOTTERY = "server_lottery" + BLITZ_REACTION = "blitz_reaction" + EMOJI_GUESS = "emoji_guess" + ANAGRAM = "anagram" + + @classmethod + def is_text_input(cls, game_type: "GameType") -> bool: + """Check if game requires text input from user.""" + return game_type in {cls.LETTER_CIPHER, cls.EMOJI_GUESS, cls.ANAGRAM} + + @classmethod + def is_button_pick(cls, game_type: "GameType") -> bool: + """Check if game uses button selection.""" + return game_type in { + cls.QUEST_BUTTONS, + cls.LOCK_HACK, + cls.SERVER_LOTTERY, + cls.BLITZ_REACTION, + } + + +class RoundStatus(str, Enum): + """Contest round status.""" + + ACTIVE = "active" + FINISHED = "finished" + + +class PrizeType(str, Enum): + """Types of prizes for contests.""" + + DAYS = "days" + BALANCE = "balance" + CUSTOM = "custom" diff --git a/app/services/contests/games.py b/app/services/contests/games.py new file mode 100644 index 00000000..81d92ee0 --- /dev/null +++ b/app/services/contests/games.py @@ -0,0 +1,473 @@ +"""Game strategies for different contest types.""" + +import random +from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import Any, Dict, List, Optional + +from aiogram import types + +from app.services.contests.enums import GameType + + +@dataclass +class GameRenderResult: + """Result of rendering a game.""" + + text: str + keyboard: types.InlineKeyboardMarkup + requires_text_input: bool = False + + +@dataclass +class AnswerCheckResult: + """Result of checking user's answer.""" + + is_correct: bool + response_text: str + + +class BaseGameStrategy(ABC): + """Base class for game strategies.""" + + game_type: GameType + + @abstractmethod + def build_payload(self, template_payload: Dict[str, Any]) -> Dict[str, Any]: + """Build round-specific payload from template config.""" + pass + + @abstractmethod + def render( + self, + round_id: int, + payload: Dict[str, Any], + language: str, + back_callback: str = "contests_menu", + ) -> GameRenderResult: + """Render game UI for user.""" + pass + + @abstractmethod + def check_answer( + self, + user_answer: str, + payload: Dict[str, Any], + language: str, + ) -> AnswerCheckResult: + """Check if user's answer is correct.""" + pass + + def _get_back_button(self, language: str, callback: str) -> types.InlineKeyboardButton: + from app.localization.texts import get_texts + texts = get_texts(language) + return types.InlineKeyboardButton(text=texts.BACK, callback_data=callback) + + def _get_texts(self, language: str): + from app.localization.texts import get_texts + return get_texts(language) + + +class QuestButtonsStrategy(BaseGameStrategy): + """3x3 grid game - find the secret button.""" + + game_type = GameType.QUEST_BUTTONS + + def build_payload(self, template_payload: Dict[str, Any]) -> Dict[str, Any]: + rows = template_payload.get("rows", 3) + cols = template_payload.get("cols", 3) + total = rows * cols + secret_idx = random.randint(0, total - 1) + return {"rows": rows, "cols": cols, "secret_idx": secret_idx} + + def render( + self, + round_id: int, + payload: Dict[str, Any], + language: str, + back_callback: str = "contests_menu", + ) -> GameRenderResult: + texts = self._get_texts(language) + rows = payload.get("rows", 3) + cols = payload.get("cols", 3) + + keyboard_rows = [] + for r in range(rows): + row_buttons = [] + for c in range(cols): + idx = r * cols + c + row_buttons.append( + types.InlineKeyboardButton( + text="🎛", + callback_data=f"contest_pick_{round_id}_quest_{idx}", + ) + ) + keyboard_rows.append(row_buttons) + keyboard_rows.append([self._get_back_button(language, back_callback)]) + + return GameRenderResult( + text=texts.t("CONTEST_QUEST_PROMPT", "Выбери один из узлов 3×3:"), + keyboard=types.InlineKeyboardMarkup(inline_keyboard=keyboard_rows), + ) + + def check_answer( + self, + user_answer: str, + payload: Dict[str, Any], + language: str, + ) -> AnswerCheckResult: + secret_idx = payload.get("secret_idx") + try: + if user_answer.startswith("quest_"): + idx = int(user_answer.split("_")[1]) + is_correct = secret_idx is not None and idx == secret_idx + else: + is_correct = False + except (ValueError, IndexError): + is_correct = False + + responses = ["Пусто", "Ложный сервер", "Найди другой узел"] + return AnswerCheckResult( + is_correct=is_correct, + response_text="" if is_correct else random.choice(responses), + ) + + +class LockHackStrategy(BaseGameStrategy): + """20 locks game - find the hacked one.""" + + game_type = GameType.LOCK_HACK + + def build_payload(self, template_payload: Dict[str, Any]) -> Dict[str, Any]: + total = template_payload.get("buttons", 20) + secret_idx = random.randint(0, max(0, total - 1)) + return {"total": total, "secret_idx": secret_idx} + + def render( + self, + round_id: int, + payload: Dict[str, Any], + language: str, + back_callback: str = "contests_menu", + ) -> GameRenderResult: + texts = self._get_texts(language) + total = payload.get("total", 20) + + keyboard_rows = [] + row = [] + for i in range(total): + row.append( + types.InlineKeyboardButton( + text="🔒", + callback_data=f"contest_pick_{round_id}_locks_{i}", + ) + ) + if len(row) == 5: + keyboard_rows.append(row) + row = [] + if row: + keyboard_rows.append(row) + keyboard_rows.append([self._get_back_button(language, back_callback)]) + + return GameRenderResult( + text=texts.t("CONTEST_LOCKS_PROMPT", "Найди взломанную кнопку среди замков:"), + keyboard=types.InlineKeyboardMarkup(inline_keyboard=keyboard_rows), + ) + + def check_answer( + self, + user_answer: str, + payload: Dict[str, Any], + language: str, + ) -> AnswerCheckResult: + secret_idx = payload.get("secret_idx") + try: + if user_answer.startswith("locks_"): + idx = int(user_answer.split("_")[1]) + is_correct = secret_idx is not None and idx == secret_idx + else: + is_correct = False + except (ValueError, IndexError): + is_correct = False + + responses = ["Заблокировано", "Попробуй ещё", "Нет доступа"] + return AnswerCheckResult( + is_correct=is_correct, + response_text="" if is_correct else random.choice(responses), + ) + + +class ServerLotteryStrategy(BaseGameStrategy): + """Flag lottery game - pick the correct server flag.""" + + game_type = GameType.SERVER_LOTTERY + + DEFAULT_FLAGS = ["🇸🇪", "🇸🇬", "🇺🇸", "🇷🇺", "🇩🇪", "🇯🇵", "🇧🇷", "🇦🇺", "🇨🇦", "🇫🇷"] + + def build_payload(self, template_payload: Dict[str, Any]) -> Dict[str, Any]: + flags = template_payload.get("flags") or self.DEFAULT_FLAGS + secret_idx = random.randint(0, len(flags) - 1) + return {"flags": flags, "secret_idx": secret_idx} + + def render( + self, + round_id: int, + payload: Dict[str, Any], + language: str, + back_callback: str = "contests_menu", + ) -> GameRenderResult: + texts = self._get_texts(language) + flags = payload.get("flags") or [] + shuffled_flags = flags.copy() + random.shuffle(shuffled_flags) + + keyboard_rows = [] + row = [] + for flag in shuffled_flags: + row.append( + types.InlineKeyboardButton( + text=flag, + callback_data=f"contest_pick_{round_id}_{flag}", + ) + ) + if len(row) == 5: + keyboard_rows.append(row) + row = [] + if row: + keyboard_rows.append(row) + keyboard_rows.append([self._get_back_button(language, back_callback)]) + + return GameRenderResult( + text=texts.t("CONTEST_SERVER_PROMPT", "Выбери сервер:"), + keyboard=types.InlineKeyboardMarkup(inline_keyboard=keyboard_rows), + ) + + def check_answer( + self, + user_answer: str, + payload: Dict[str, Any], + language: str, + ) -> AnswerCheckResult: + secret_idx = payload.get("secret_idx") + flags = payload.get("flags") or [] + correct_flag = flags[secret_idx] if secret_idx is not None and secret_idx < len(flags) else "" + is_correct = user_answer == correct_flag + + responses = ["Сервер перегружен", "Нет ответа", "Попробуй завтра"] + return AnswerCheckResult( + is_correct=is_correct, + response_text="" if is_correct else random.choice(responses), + ) + + +class BlitzReactionStrategy(BaseGameStrategy): + """Blitz reaction game - press button quickly.""" + + game_type = GameType.BLITZ_REACTION + + def build_payload(self, template_payload: Dict[str, Any]) -> Dict[str, Any]: + return {"timeout_seconds": template_payload.get("timeout_seconds", 10)} + + def render( + self, + round_id: int, + payload: Dict[str, Any], + language: str, + back_callback: str = "contests_menu", + ) -> GameRenderResult: + texts = self._get_texts(language) + + keyboard = types.InlineKeyboardMarkup( + inline_keyboard=[ + [ + types.InlineKeyboardButton( + text=texts.t("CONTEST_BLITZ_BUTTON", "Я здесь!"), + callback_data=f"contest_pick_{round_id}_blitz", + ) + ], + [self._get_back_button(language, back_callback)], + ] + ) + + return GameRenderResult( + text=texts.t("CONTEST_BLITZ_PROMPT", "⚡️ Блиц! Нажми «Я здесь!»"), + keyboard=keyboard, + ) + + def check_answer( + self, + user_answer: str, + payload: Dict[str, Any], + language: str, + ) -> AnswerCheckResult: + is_correct = user_answer == "blitz" + return AnswerCheckResult( + is_correct=is_correct, + response_text="" if is_correct else "Время вышло", + ) + + +class LetterCipherStrategy(BaseGameStrategy): + """Letter cipher game - decode word from letter codes.""" + + game_type = GameType.LETTER_CIPHER + + DEFAULT_WORDS = ["VPN", "SERVER", "PROXY", "XRAY"] + + def build_payload(self, template_payload: Dict[str, Any]) -> Dict[str, Any]: + words = template_payload.get("words") or self.DEFAULT_WORDS + word = random.choice(words) + codes = [str(ord(ch.upper()) - 64) for ch in word if ch.isalpha()] + return {"question": "-".join(codes), "answer": word.upper()} + + def render( + self, + round_id: int, + payload: Dict[str, Any], + language: str, + back_callback: str = "contests_menu", + ) -> GameRenderResult: + texts = self._get_texts(language) + question = payload.get("question", "") + from app.keyboards.inline import get_back_keyboard + + return GameRenderResult( + text=texts.t("CONTEST_CIPHER_PROMPT", "Расшифруй: {q}").format(q=question), + keyboard=get_back_keyboard(language), + requires_text_input=True, + ) + + def check_answer( + self, + user_answer: str, + payload: Dict[str, Any], + language: str, + ) -> AnswerCheckResult: + correct = (payload.get("answer") or "").upper() + is_correct = correct and user_answer.strip().upper() == correct + + return AnswerCheckResult( + is_correct=is_correct, + response_text="" if is_correct else "Неверно, попробуй в следующем раунде", + ) + + +class EmojiGuessStrategy(BaseGameStrategy): + """Emoji guess game - guess service by emoji.""" + + game_type = GameType.EMOJI_GUESS + + def build_payload(self, template_payload: Dict[str, Any]) -> Dict[str, Any]: + pairs = template_payload.get("pairs") or [{"question": "🔐📡🌐", "answer": "VPN"}] + pair = random.choice(pairs) + return pair + + def render( + self, + round_id: int, + payload: Dict[str, Any], + language: str, + back_callback: str = "contests_menu", + ) -> GameRenderResult: + texts = self._get_texts(language) + question = payload.get("question", "🤔") + emoji_list = question.split() + random.shuffle(emoji_list) + shuffled_question = " ".join(emoji_list) + from app.keyboards.inline import get_back_keyboard + + return GameRenderResult( + text=texts.t("CONTEST_EMOJI_PROMPT", "Угадай сервис по эмодзи: {q}").format( + q=shuffled_question + ), + keyboard=get_back_keyboard(language), + requires_text_input=True, + ) + + def check_answer( + self, + user_answer: str, + payload: Dict[str, Any], + language: str, + ) -> AnswerCheckResult: + correct = (payload.get("answer") or "").upper() + is_correct = correct and user_answer.strip().upper() == correct + + return AnswerCheckResult( + is_correct=is_correct, + response_text="" if is_correct else "Неверно, попробуй в следующем раунде", + ) + + +class AnagramStrategy(BaseGameStrategy): + """Anagram game - unscramble letters to form a word.""" + + game_type = GameType.ANAGRAM + + DEFAULT_WORDS = ["SERVER", "XRAY", "VPN"] + + def build_payload(self, template_payload: Dict[str, Any]) -> Dict[str, Any]: + words = template_payload.get("words") or self.DEFAULT_WORDS + word = random.choice(words).upper() + shuffled = "".join(random.sample(word, len(word))) + return {"letters": shuffled, "answer": word} + + def render( + self, + round_id: int, + payload: Dict[str, Any], + language: str, + back_callback: str = "contests_menu", + ) -> GameRenderResult: + texts = self._get_texts(language) + letters = payload.get("letters", "") + from app.keyboards.inline import get_back_keyboard + + return GameRenderResult( + text=texts.t("CONTEST_ANAGRAM_PROMPT", "Составь слово: {letters}").format( + letters=letters + ), + keyboard=get_back_keyboard(language), + requires_text_input=True, + ) + + def check_answer( + self, + user_answer: str, + payload: Dict[str, Any], + language: str, + ) -> AnswerCheckResult: + correct = (payload.get("answer") or "").upper() + is_correct = correct and user_answer.strip().upper() == correct + + return AnswerCheckResult( + is_correct=is_correct, + response_text="" if is_correct else "Неверно, попробуй в следующем раунде", + ) + + +# Registry of game strategies +_GAME_STRATEGIES: Dict[GameType, BaseGameStrategy] = { + GameType.QUEST_BUTTONS: QuestButtonsStrategy(), + GameType.LOCK_HACK: LockHackStrategy(), + GameType.SERVER_LOTTERY: ServerLotteryStrategy(), + GameType.BLITZ_REACTION: BlitzReactionStrategy(), + GameType.LETTER_CIPHER: LetterCipherStrategy(), + GameType.EMOJI_GUESS: EmojiGuessStrategy(), + GameType.ANAGRAM: AnagramStrategy(), +} + + +def get_game_strategy(game_type: GameType | str) -> Optional[BaseGameStrategy]: + """Get game strategy by type.""" + if isinstance(game_type, str): + try: + game_type = GameType(game_type) + except ValueError: + return None + return _GAME_STRATEGIES.get(game_type) + + +def get_all_game_types() -> List[GameType]: + """Get list of all supported game types.""" + return list(_GAME_STRATEGIES.keys())