From 97ec39aa803f0e3f03fdcd482df0cbcb86fd1efd Mon Sep 17 00:00:00 2001 From: Fringg Date: Mon, 16 Feb 2026 06:52:45 +0300 Subject: [PATCH] fix: add promo code anti-abuse protections - Rate-limit on brute-force: 5 failed attempts per 5 min blocks user - Daily stacking limit: max 5 promo activations per 24h (in-memory + DB) - Format validation: only alphanumeric/hyphen/underscore, 3-50 chars --- app/database/crud/promocode.py | 11 ++++ app/handlers/promocode.py | 42 ++++++++++++ app/handlers/start.py | 47 ++++++++++++- app/services/promocode_service.py | 12 ++++ app/utils/promo_rate_limiter.py | 105 ++++++++++++++++++++++++++++++ 5 files changed, 214 insertions(+), 3 deletions(-) create mode 100644 app/utils/promo_rate_limiter.py diff --git a/app/database/crud/promocode.py b/app/database/crud/promocode.py index 3ec7102a..6e00c41c 100644 --- a/app/database/crud/promocode.py +++ b/app/database/crud/promocode.py @@ -131,6 +131,17 @@ async def get_promocode_use_by_user_and_code(db: AsyncSession, user_id: int, pro return result.scalar_one_or_none() +async def count_user_recent_activations(db: AsyncSession, user_id: int, hours: int = 24) -> int: + """Подсчитывает количество активаций промокодов пользователем за последние N часов.""" + from datetime import timedelta + + cutoff = datetime.utcnow() - timedelta(hours=hours) + result = await db.execute( + select(func.count(PromoCodeUse.id)).where(and_(PromoCodeUse.user_id == user_id, PromoCodeUse.used_at >= cutoff)) + ) + return result.scalar() or 0 + + async def get_user_promocodes(db: AsyncSession, user_id: int) -> list[PromoCodeUse]: result = await db.execute( select(PromoCodeUse).where(PromoCodeUse.user_id == user_id).order_by(PromoCodeUse.used_at.desc()) diff --git a/app/handlers/promocode.py b/app/handlers/promocode.py index da6fbfbb..3beefc43 100644 --- a/app/handlers/promocode.py +++ b/app/handlers/promocode.py @@ -84,14 +84,52 @@ async def process_promocode(message: types.Message, db_user: User, state: FSMCon ) return + from app.utils.promo_rate_limiter import promo_limiter, validate_promo_format + + # Валидация формата + if not validate_promo_format(code): + await message.answer(texts.PROMOCODE_INVALID, reply_markup=get_back_keyboard(db_user.language)) + return + + # Rate-limit на перебор + if promo_limiter.is_blocked(message.from_user.id): + cooldown = promo_limiter.get_block_cooldown(message.from_user.id) + await message.answer( + texts.t( + 'PROMO_RATE_LIMITED', + '⏳ Слишком много попыток. Попробуйте через {cooldown} сек.', + ).format(cooldown=cooldown), + reply_markup=get_back_keyboard(db_user.language), + ) + await state.clear() + return + + # Лимит на стакинг (макс активаций в день) + if not promo_limiter.can_activate(message.from_user.id): + await message.answer( + texts.t( + 'PROMO_DAILY_LIMIT', + '❌ Достигнут лимит активаций промокодов на сегодня. Попробуйте завтра.', + ), + reply_markup=get_back_keyboard(db_user.language), + ) + await state.clear() + return + result = await activate_promocode_for_registration(db, db_user.id, code, message.bot) if result['success']: + promo_limiter.record_activation(message.from_user.id) await message.answer( texts.PROMOCODE_SUCCESS.format(description=result['description']), reply_markup=get_back_keyboard(db_user.language), ) else: + # Записываем неудачную попытку только для not_found (перебор) + if result['error'] == 'not_found': + promo_limiter.record_failed_attempt(message.from_user.id) + promo_limiter.cleanup() + error_messages = { 'not_found': texts.PROMOCODE_INVALID, 'expired': texts.PROMOCODE_EXPIRED, @@ -104,6 +142,10 @@ async def process_promocode(message: types.Message, db_user: User, state: FSMCon 'PROMOCODE_ACTIVE_DISCOUNT_EXISTS', '❌ У вас уже есть активная скидка. Используйте её перед активацией новой.', ), + 'daily_limit': texts.t( + 'PROMO_DAILY_LIMIT', + '❌ Достигнут лимит активаций промокодов на сегодня. Попробуйте завтра.', + ), 'server_error': texts.ERROR, } diff --git a/app/handlers/start.py b/app/handlers/start.py index 0a80f8b9..73542b45 100644 --- a/app/handlers/start.py +++ b/app/handlers/start.py @@ -159,10 +159,27 @@ async def handle_potential_referral_code(message: types.Message, state: FSMConte language = data.get('language') or (getattr(user, 'language', None) if user else None) or DEFAULT_LANGUAGE texts = get_texts(language) + from app.utils.promo_rate_limiter import promo_limiter, validate_promo_format + potential_code = message.text.strip() - if len(potential_code) < 4 or len(potential_code) > 20: + if len(potential_code) < 3 or len(potential_code) > 50: return False + # Валидация формата (только буквы, цифры, дефис, подчёркивание) + if not validate_promo_format(potential_code): + return False + + # Rate-limit на перебор промокодов + if promo_limiter.is_blocked(message.from_user.id): + cooldown = promo_limiter.get_block_cooldown(message.from_user.id) + await message.answer( + texts.t( + 'PROMO_RATE_LIMITED', + '⏳ Слишком много попыток. Попробуйте через {cooldown} сек.', + ).format(cooldown=cooldown) + ) + return True + # Сначала проверяем реферальный код referrer = await get_user_by_referral_code(db, potential_code) if referrer: @@ -217,7 +234,10 @@ async def handle_potential_referral_code(message: types.Message, state: FSMConte return True - # Ни реферальный код, ни промокод не найдены + # Ни реферальный код, ни промокод не найдены — записываем неудачную попытку + promo_limiter.record_failed_attempt(message.from_user.id) + promo_limiter.cleanup() + await message.answer( texts.t( 'REFERRAL_OR_PROMO_CODE_INVALID_HELP', @@ -969,8 +989,26 @@ async def process_referral_code_input(message: types.Message, state: FSMContext, language = data.get('language', DEFAULT_LANGUAGE) texts = get_texts(language) + from app.utils.promo_rate_limiter import promo_limiter, validate_promo_format + code = message.text.strip() + # Валидация формата + if not validate_promo_format(code): + await message.answer(texts.t('REFERRAL_OR_PROMO_CODE_INVALID', '❌ Неверный реферальный код или промокод')) + return + + # Rate-limit на перебор + if promo_limiter.is_blocked(message.from_user.id): + cooldown = promo_limiter.get_block_cooldown(message.from_user.id) + await message.answer( + texts.t( + 'PROMO_RATE_LIMITED', + '⏳ Слишком много попыток. Попробуйте через {cooldown} сек.', + ).format(cooldown=cooldown) + ) + return + # Сначала проверяем, является ли это реферальным кодом referrer = await get_user_by_referral_code(db, code) if referrer: @@ -1000,7 +1038,10 @@ async def process_referral_code_input(message: types.Message, state: FSMContext, await complete_registration(message, state, db) return - # Ни реферальный код, ни промокод не найдены + # Ни реферальный код, ни промокод не найдены — записываем неудачу + promo_limiter.record_failed_attempt(message.from_user.id) + promo_limiter.cleanup() + await message.answer(texts.t('REFERRAL_OR_PROMO_CODE_INVALID', '❌ Неверный реферальный код или промокод')) logger.info(f'❌ Неверный код (ни реферальный, ни промокод): {code}') return diff --git a/app/services/promocode_service.py b/app/services/promocode_service.py index 85c78bca..fafcd6a3 100644 --- a/app/services/promocode_service.py +++ b/app/services/promocode_service.py @@ -54,6 +54,18 @@ class PromoCodeService: if existing_use: return {'success': False, 'error': 'already_used_by_user'} + # Лимит на количество активаций за день (анти-стакинг) + from app.database.crud.promocode import count_user_recent_activations + + recent_count = await count_user_recent_activations(db, user_id, hours=24) + if recent_count >= 5: + logger.warning( + 'Promo stacking limit: user %s has %d activations in 24h', + self._format_user_log(user), + recent_count, + ) + return {'success': False, 'error': 'daily_limit'} + # Проверка "только для первой покупки" if getattr(promocode, 'first_purchase_only', False): if getattr(user, 'has_had_paid_subscription', False): diff --git a/app/utils/promo_rate_limiter.py b/app/utils/promo_rate_limiter.py new file mode 100644 index 00000000..ff83e1ac --- /dev/null +++ b/app/utils/promo_rate_limiter.py @@ -0,0 +1,105 @@ +import logging +import re +import time + + +logger = logging.getLogger(__name__) + +# Только буквы, цифры, дефис, подчёркивание +PROMO_CODE_PATTERN = re.compile(r'^[A-Za-z0-9_-]+$') + +# Лимиты +MAX_FAILED_ATTEMPTS = 5 +FAILED_WINDOW_SECONDS = 300 # 5 минут +MAX_ACTIVATIONS_PER_DAY = 5 +ACTIVATION_WINDOW_SECONDS = 86400 # 24 часа + + +class PromoRateLimiter: + """ + In-memory rate limiter для промокодов: + 1. Лимит на неудачные попытки (перебор) + 2. Лимит на количество активаций за день (стакинг) + """ + + def __init__(self): + # user_id → list[timestamp] неудачных попыток + self._failed_attempts: dict[int, list[float]] = {} + # user_id → list[timestamp] успешных активаций + self._activations: dict[int, list[float]] = {} + + def record_failed_attempt(self, user_id: int) -> None: + now = time.time() + attempts = self._failed_attempts.get(user_id, []) + attempts = [ts for ts in attempts if now - ts < FAILED_WINDOW_SECONDS] + attempts.append(now) + self._failed_attempts[user_id] = attempts + + if len(attempts) >= MAX_FAILED_ATTEMPTS: + logger.warning( + 'Promo brute-force: user %s — %d failed attempts in %ds', + user_id, + len(attempts), + FAILED_WINDOW_SECONDS, + ) + + def is_blocked(self, user_id: int) -> bool: + now = time.time() + attempts = self._failed_attempts.get(user_id, []) + attempts = [ts for ts in attempts if now - ts < FAILED_WINDOW_SECONDS] + self._failed_attempts[user_id] = attempts + return len(attempts) >= MAX_FAILED_ATTEMPTS + + def get_block_cooldown(self, user_id: int) -> int: + attempts = self._failed_attempts.get(user_id, []) + if not attempts: + return 0 + oldest = attempts[0] + remaining = int(FAILED_WINDOW_SECONDS - (time.time() - oldest)) + 1 + return max(remaining, 0) + + def record_activation(self, user_id: int) -> None: + now = time.time() + activations = self._activations.get(user_id, []) + activations = [ts for ts in activations if now - ts < ACTIVATION_WINDOW_SECONDS] + activations.append(now) + self._activations[user_id] = activations + + def can_activate(self, user_id: int) -> bool: + now = time.time() + activations = self._activations.get(user_id, []) + activations = [ts for ts in activations if now - ts < ACTIVATION_WINDOW_SECONDS] + self._activations[user_id] = activations + return len(activations) < MAX_ACTIVATIONS_PER_DAY + + def get_activations_left(self, user_id: int) -> int: + now = time.time() + activations = self._activations.get(user_id, []) + activations = [ts for ts in activations if now - ts < ACTIVATION_WINDOW_SECONDS] + return max(0, MAX_ACTIVATIONS_PER_DAY - len(activations)) + + def cleanup(self) -> None: + now = time.time() + if len(self._failed_attempts) > 500: + self._failed_attempts = { + uid: [ts for ts in tss if now - ts < FAILED_WINDOW_SECONDS] + for uid, tss in self._failed_attempts.items() + if any(now - ts < FAILED_WINDOW_SECONDS for ts in tss) + } + if len(self._activations) > 500: + self._activations = { + uid: [ts for ts in tss if now - ts < ACTIVATION_WINDOW_SECONDS] + for uid, tss in self._activations.items() + if any(now - ts < ACTIVATION_WINDOW_SECONDS for ts in tss) + } + + +def validate_promo_format(code: str) -> bool: + """Проверяет формат промокода: 3-50 символов, только буквы/цифры/дефис/подчёркивание.""" + if not code or len(code) < 3 or len(code) > 50: + return False + return bool(PROMO_CODE_PATTERN.match(code)) + + +# Глобальный синглтон +promo_limiter = PromoRateLimiter()