fix: add promo code anti-abuse protections

- Rate-limit on brute-force: 5 failed attempts per 5 min blocks user
- Daily stacking limit: max 5 promo activations per 24h (in-memory + DB)
- Format validation: only alphanumeric/hyphen/underscore, 3-50 chars
This commit is contained in:
Fringg
2026-02-16 06:52:45 +03:00
parent 61a97220d3
commit 97ec39aa80
5 changed files with 214 additions and 3 deletions

View File

@@ -131,6 +131,17 @@ async def get_promocode_use_by_user_and_code(db: AsyncSession, user_id: int, pro
return result.scalar_one_or_none()
async def count_user_recent_activations(db: AsyncSession, user_id: int, hours: int = 24) -> int:
"""Подсчитывает количество активаций промокодов пользователем за последние N часов."""
from datetime import timedelta
cutoff = datetime.utcnow() - timedelta(hours=hours)
result = await db.execute(
select(func.count(PromoCodeUse.id)).where(and_(PromoCodeUse.user_id == user_id, PromoCodeUse.used_at >= cutoff))
)
return result.scalar() or 0
async def get_user_promocodes(db: AsyncSession, user_id: int) -> list[PromoCodeUse]:
result = await db.execute(
select(PromoCodeUse).where(PromoCodeUse.user_id == user_id).order_by(PromoCodeUse.used_at.desc())

View File

@@ -84,14 +84,52 @@ async def process_promocode(message: types.Message, db_user: User, state: FSMCon
)
return
from app.utils.promo_rate_limiter import promo_limiter, validate_promo_format
# Валидация формата
if not validate_promo_format(code):
await message.answer(texts.PROMOCODE_INVALID, reply_markup=get_back_keyboard(db_user.language))
return
# Rate-limit на перебор
if promo_limiter.is_blocked(message.from_user.id):
cooldown = promo_limiter.get_block_cooldown(message.from_user.id)
await message.answer(
texts.t(
'PROMO_RATE_LIMITED',
'⏳ Слишком много попыток. Попробуйте через {cooldown} сек.',
).format(cooldown=cooldown),
reply_markup=get_back_keyboard(db_user.language),
)
await state.clear()
return
# Лимит на стакинг (макс активаций в день)
if not promo_limiter.can_activate(message.from_user.id):
await message.answer(
texts.t(
'PROMO_DAILY_LIMIT',
'❌ Достигнут лимит активаций промокодов на сегодня. Попробуйте завтра.',
),
reply_markup=get_back_keyboard(db_user.language),
)
await state.clear()
return
result = await activate_promocode_for_registration(db, db_user.id, code, message.bot)
if result['success']:
promo_limiter.record_activation(message.from_user.id)
await message.answer(
texts.PROMOCODE_SUCCESS.format(description=result['description']),
reply_markup=get_back_keyboard(db_user.language),
)
else:
# Записываем неудачную попытку только для not_found (перебор)
if result['error'] == 'not_found':
promo_limiter.record_failed_attempt(message.from_user.id)
promo_limiter.cleanup()
error_messages = {
'not_found': texts.PROMOCODE_INVALID,
'expired': texts.PROMOCODE_EXPIRED,
@@ -104,6 +142,10 @@ async def process_promocode(message: types.Message, db_user: User, state: FSMCon
'PROMOCODE_ACTIVE_DISCOUNT_EXISTS',
'У вас уже есть активная скидка. Используйте её перед активацией новой.',
),
'daily_limit': texts.t(
'PROMO_DAILY_LIMIT',
'❌ Достигнут лимит активаций промокодов на сегодня. Попробуйте завтра.',
),
'server_error': texts.ERROR,
}

View File

@@ -159,10 +159,27 @@ async def handle_potential_referral_code(message: types.Message, state: FSMConte
language = data.get('language') or (getattr(user, 'language', None) if user else None) or DEFAULT_LANGUAGE
texts = get_texts(language)
from app.utils.promo_rate_limiter import promo_limiter, validate_promo_format
potential_code = message.text.strip()
if len(potential_code) < 4 or len(potential_code) > 20:
if len(potential_code) < 3 or len(potential_code) > 50:
return False
# Валидация формата (только буквы, цифры, дефис, подчёркивание)
if not validate_promo_format(potential_code):
return False
# Rate-limit на перебор промокодов
if promo_limiter.is_blocked(message.from_user.id):
cooldown = promo_limiter.get_block_cooldown(message.from_user.id)
await message.answer(
texts.t(
'PROMO_RATE_LIMITED',
'⏳ Слишком много попыток. Попробуйте через {cooldown} сек.',
).format(cooldown=cooldown)
)
return True
# Сначала проверяем реферальный код
referrer = await get_user_by_referral_code(db, potential_code)
if referrer:
@@ -217,7 +234,10 @@ async def handle_potential_referral_code(message: types.Message, state: FSMConte
return True
# Ни реферальный код, ни промокод не найдены
# Ни реферальный код, ни промокод не найдены — записываем неудачную попытку
promo_limiter.record_failed_attempt(message.from_user.id)
promo_limiter.cleanup()
await message.answer(
texts.t(
'REFERRAL_OR_PROMO_CODE_INVALID_HELP',
@@ -969,8 +989,26 @@ async def process_referral_code_input(message: types.Message, state: FSMContext,
language = data.get('language', DEFAULT_LANGUAGE)
texts = get_texts(language)
from app.utils.promo_rate_limiter import promo_limiter, validate_promo_format
code = message.text.strip()
# Валидация формата
if not validate_promo_format(code):
await message.answer(texts.t('REFERRAL_OR_PROMO_CODE_INVALID', '❌ Неверный реферальный код или промокод'))
return
# Rate-limit на перебор
if promo_limiter.is_blocked(message.from_user.id):
cooldown = promo_limiter.get_block_cooldown(message.from_user.id)
await message.answer(
texts.t(
'PROMO_RATE_LIMITED',
'⏳ Слишком много попыток. Попробуйте через {cooldown} сек.',
).format(cooldown=cooldown)
)
return
# Сначала проверяем, является ли это реферальным кодом
referrer = await get_user_by_referral_code(db, code)
if referrer:
@@ -1000,7 +1038,10 @@ async def process_referral_code_input(message: types.Message, state: FSMContext,
await complete_registration(message, state, db)
return
# Ни реферальный код, ни промокод не найдены
# Ни реферальный код, ни промокод не найдены — записываем неудачу
promo_limiter.record_failed_attempt(message.from_user.id)
promo_limiter.cleanup()
await message.answer(texts.t('REFERRAL_OR_PROMO_CODE_INVALID', '❌ Неверный реферальный код или промокод'))
logger.info(f'❌ Неверный код (ни реферальный, ни промокод): {code}')
return

View File

@@ -54,6 +54,18 @@ class PromoCodeService:
if existing_use:
return {'success': False, 'error': 'already_used_by_user'}
# Лимит на количество активаций за день (анти-стакинг)
from app.database.crud.promocode import count_user_recent_activations
recent_count = await count_user_recent_activations(db, user_id, hours=24)
if recent_count >= 5:
logger.warning(
'Promo stacking limit: user %s has %d activations in 24h',
self._format_user_log(user),
recent_count,
)
return {'success': False, 'error': 'daily_limit'}
# Проверка "только для первой покупки"
if getattr(promocode, 'first_purchase_only', False):
if getattr(user, 'has_had_paid_subscription', False):

View File

@@ -0,0 +1,105 @@
import logging
import re
import time
logger = logging.getLogger(__name__)
# Только буквы, цифры, дефис, подчёркивание
PROMO_CODE_PATTERN = re.compile(r'^[A-Za-z0-9_-]+$')
# Лимиты
MAX_FAILED_ATTEMPTS = 5
FAILED_WINDOW_SECONDS = 300 # 5 минут
MAX_ACTIVATIONS_PER_DAY = 5
ACTIVATION_WINDOW_SECONDS = 86400 # 24 часа
class PromoRateLimiter:
"""
In-memory rate limiter для промокодов:
1. Лимит на неудачные попытки (перебор)
2. Лимит на количество активаций за день (стакинг)
"""
def __init__(self):
# user_id → list[timestamp] неудачных попыток
self._failed_attempts: dict[int, list[float]] = {}
# user_id → list[timestamp] успешных активаций
self._activations: dict[int, list[float]] = {}
def record_failed_attempt(self, user_id: int) -> None:
now = time.time()
attempts = self._failed_attempts.get(user_id, [])
attempts = [ts for ts in attempts if now - ts < FAILED_WINDOW_SECONDS]
attempts.append(now)
self._failed_attempts[user_id] = attempts
if len(attempts) >= MAX_FAILED_ATTEMPTS:
logger.warning(
'Promo brute-force: user %s%d failed attempts in %ds',
user_id,
len(attempts),
FAILED_WINDOW_SECONDS,
)
def is_blocked(self, user_id: int) -> bool:
now = time.time()
attempts = self._failed_attempts.get(user_id, [])
attempts = [ts for ts in attempts if now - ts < FAILED_WINDOW_SECONDS]
self._failed_attempts[user_id] = attempts
return len(attempts) >= MAX_FAILED_ATTEMPTS
def get_block_cooldown(self, user_id: int) -> int:
attempts = self._failed_attempts.get(user_id, [])
if not attempts:
return 0
oldest = attempts[0]
remaining = int(FAILED_WINDOW_SECONDS - (time.time() - oldest)) + 1
return max(remaining, 0)
def record_activation(self, user_id: int) -> None:
now = time.time()
activations = self._activations.get(user_id, [])
activations = [ts for ts in activations if now - ts < ACTIVATION_WINDOW_SECONDS]
activations.append(now)
self._activations[user_id] = activations
def can_activate(self, user_id: int) -> bool:
now = time.time()
activations = self._activations.get(user_id, [])
activations = [ts for ts in activations if now - ts < ACTIVATION_WINDOW_SECONDS]
self._activations[user_id] = activations
return len(activations) < MAX_ACTIVATIONS_PER_DAY
def get_activations_left(self, user_id: int) -> int:
now = time.time()
activations = self._activations.get(user_id, [])
activations = [ts for ts in activations if now - ts < ACTIVATION_WINDOW_SECONDS]
return max(0, MAX_ACTIVATIONS_PER_DAY - len(activations))
def cleanup(self) -> None:
now = time.time()
if len(self._failed_attempts) > 500:
self._failed_attempts = {
uid: [ts for ts in tss if now - ts < FAILED_WINDOW_SECONDS]
for uid, tss in self._failed_attempts.items()
if any(now - ts < FAILED_WINDOW_SECONDS for ts in tss)
}
if len(self._activations) > 500:
self._activations = {
uid: [ts for ts in tss if now - ts < ACTIVATION_WINDOW_SECONDS]
for uid, tss in self._activations.items()
if any(now - ts < ACTIVATION_WINDOW_SECONDS for ts in tss)
}
def validate_promo_format(code: str) -> bool:
"""Проверяет формат промокода: 3-50 символов, только буквы/цифры/дефис/подчёркивание."""
if not code or len(code) < 3 or len(code) > 50:
return False
return bool(PROMO_CODE_PATTERN.match(code))
# Глобальный синглтон
promo_limiter = PromoRateLimiter()