feat: block registration with disposable email addresses

Add DisposableEmailService that fetches ~72k disposable email domains
from github.com/disposable/disposable-email-domains into an in-memory
frozenset with 24h auto-refresh via asyncio background task.

Integrated into three email entry points in cabinet auth routes:
- POST /email/register (link email to Telegram account)
- POST /email/register/standalone (standalone email registration)
- POST /email/change (change existing email)

Controlled by DISPOSABLE_EMAIL_CHECK_ENABLED setting (default: true).
Falls back to allowing all emails if domain list fetch fails.
This commit is contained in:
Fringg
2026-02-07 00:34:11 +03:00
parent 4e7438b9f9
commit 116c8453bb
4 changed files with 142 additions and 0 deletions

View File

@@ -22,6 +22,7 @@ from app.database.crud.user import (
verify_and_apply_email_change,
)
from app.database.models import CabinetRefreshToken, User
from app.services.disposable_email_service import disposable_email_service
from app.services.referral_service import process_referral_registration
from app.utils.timezone import panel_datetime_to_naive_utc
@@ -385,6 +386,13 @@ async def register_email(
Requires valid JWT token from Telegram authentication.
Sends verification email to the provided address.
"""
# Check for disposable email
if disposable_email_service.is_disposable(request.email):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='Disposable email addresses are not allowed',
)
# Check if email already exists
existing_user = await db.execute(select(User).where(User.email == request.email))
if existing_user.scalar_one_or_none():
@@ -478,6 +486,13 @@ async def register_email_standalone(
)
logger.info(f'Test email registration: {request.email}')
# Check for disposable email
if disposable_email_service.is_disposable(request.email):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='Disposable email addresses are not allowed',
)
# Проверить что email не занят
existing = await db.execute(select(User).where(User.email == request.email))
if existing.scalar_one_or_none():
@@ -971,6 +986,13 @@ async def request_email_change(
detail='New email is the same as current email',
)
# Check for disposable email
if disposable_email_service.is_disposable(request.new_email):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='Disposable email addresses are not allowed',
)
# Check if new email is already taken
if await is_email_taken(db, request.new_email, exclude_user_id=user.id):
raise HTTPException(

View File

@@ -237,6 +237,8 @@ class Settings(BaseSettings):
BLACKLIST_UPDATE_INTERVAL_HOURS: int = 24
BLACKLIST_IGNORE_ADMINS: bool = True
DISPOSABLE_EMAIL_CHECK_ENABLED: bool = True
# Настройки простой покупки
SIMPLE_SUBSCRIPTION_ENABLED: bool = False
SIMPLE_SUBSCRIPTION_PERIOD_DAYS: int = 30

View File

@@ -0,0 +1,109 @@
"""Service for blocking disposable/temporary email domains."""
import asyncio
import logging
from datetime import UTC, datetime
import aiohttp
from app.config import settings
logger = logging.getLogger(__name__)
class DisposableEmailService:
"""
Downloads and caches a list of disposable email domains from GitHub.
Domains are stored in a frozenset for O(1) thread-safe lookups.
The list is refreshed every 24 hours via an asyncio background task.
If the download fails, the service falls back to an empty set (no blocking).
"""
DOMAINS_URL = 'https://raw.githubusercontent.com/disposable/disposable-email-domains/master/domains.txt'
UPDATE_INTERVAL_HOURS = 24
def __init__(self) -> None:
self._domains: frozenset[str] = frozenset()
self._task: asyncio.Task[None] | None = None
self._last_updated: datetime | None = None
self._domain_count: int = 0
async def start(self) -> None:
"""Load domains and start periodic refresh task."""
await self._update_domains()
self._task = asyncio.create_task(self._periodic_loop())
logger.info('DisposableEmailService started (%d domains loaded)', self._domain_count)
async def stop(self) -> None:
"""Cancel periodic refresh task."""
if self._task and not self._task.done():
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
logger.info('DisposableEmailService stopped')
async def _update_domains(self) -> None:
"""Fetch domains.txt from GitHub and swap the in-memory set."""
try:
async with aiohttp.ClientSession() as session, session.get(self.DOMAINS_URL) as resp:
if resp.status != 200:
logger.error(
'Failed to fetch disposable domains: HTTP %d',
resp.status,
)
return
text = await resp.text()
domains = frozenset(
line.strip().lower() for line in text.splitlines() if line.strip() and not line.startswith('#')
)
self._domains = domains
self._domain_count = len(domains)
self._last_updated = datetime.now(UTC)
logger.info('Disposable email domains updated: %d domains', self._domain_count)
except Exception:
logger.exception('Error updating disposable email domains')
async def _periodic_loop(self) -> None:
"""Sleep then refresh, repeating forever until cancelled."""
while True:
await asyncio.sleep(self.UPDATE_INTERVAL_HOURS * 3600)
await self._update_domains()
def is_disposable(self, email: str) -> bool:
"""Check if the email uses a disposable domain.
Returns False when the feature is disabled via settings.
"""
if not getattr(settings, 'DISPOSABLE_EMAIL_CHECK_ENABLED', True):
return False
if not self._domains:
return False
try:
domain = email.rsplit('@', 1)[1].lower()
except IndexError:
return False
return domain in self._domains
def get_status(self) -> dict:
"""Return service status for monitoring / health checks."""
return {
'enabled': getattr(settings, 'DISPOSABLE_EMAIL_CHECK_ENABLED', True),
'domain_count': self._domain_count,
'last_updated': self._last_updated.isoformat() if self._last_updated else None,
'running': self._task is not None and not self._task.done(),
}
disposable_email_service = DisposableEmailService()

View File

@@ -10,6 +10,7 @@ from fastapi.staticfiles import StaticFiles
from app.cabinet.routes import router as cabinet_router
from app.config import settings
from app.services.disposable_email_service import disposable_email_service
from app.services.payment_service import PaymentService
from app.webapi.app import create_web_api_app
from app.webapi.docs import add_redoc_endpoint
@@ -144,6 +145,14 @@ def create_unified_app(
else:
telegram_processor = None
@app.on_event('startup')
async def start_disposable_email_service() -> None: # pragma: no cover - event hook
await disposable_email_service.start()
@app.on_event('shutdown')
async def stop_disposable_email_service() -> None: # pragma: no cover - event hook
await disposable_email_service.stop()
miniapp_mounted, miniapp_path = _mount_miniapp_static(app)
unified_health_path = '/health/unified' if settings.is_web_api_enabled() else '/health'