From dd87b4d4085cfb45ab1afc7cbe4b657fdfc2b3e5 Mon Sep 17 00:00:00 2001 From: Egor Date: Wed, 24 Dec 2025 14:44:02 +0300 Subject: [PATCH] Update database.py --- app/database/database.py | 296 ++++++++++++++++++++++++++++++--------- 1 file changed, 230 insertions(+), 66 deletions(-) diff --git a/app/database/database.py b/app/database/database.py index 93d8a057..c62e135c 100644 --- a/app/database/database.py +++ b/app/database/database.py @@ -1,26 +1,39 @@ +import asyncio import logging -from typing import AsyncGenerator, Optional +from functools import wraps +from typing import AsyncGenerator, Callable, Optional, TypeVar from contextlib import asynccontextmanager from sqlalchemy.ext.asyncio import ( - AsyncSession, - create_async_engine, + AsyncSession, + create_async_engine, async_sessionmaker, AsyncEngine ) from sqlalchemy.pool import NullPool, AsyncAdaptedQueuePool from sqlalchemy import event, text, bindparam, inspect from sqlalchemy.engine import Engine +from sqlalchemy.exc import OperationalError, InterfaceError import time from app.config import settings from app.database.models import Base logger = logging.getLogger(__name__) +T = TypeVar("T") + # ============================================================================ # PRODUCTION-GRADE CONNECTION POOLING # ============================================================================ -if settings.get_database_url().startswith("sqlite"): +def _is_sqlite_url(url: str) -> bool: + """Проверка на SQLite URL (поддерживает sqlite:// и sqlite+aiosqlite://)""" + return url.startswith("sqlite") or ":memory:" in url + + +DATABASE_URL = settings.get_database_url() +IS_SQLITE = _is_sqlite_url(DATABASE_URL) + +if IS_SQLITE: poolclass = NullPool pool_kwargs = {} else: @@ -31,7 +44,7 @@ else: "pool_timeout": 30, "pool_recycle": 3600, "pool_pre_ping": True, - # 🔥 Агрессивная очистка мертвых соединений + # Агрессивная очистка мертвых соединений "pool_reset_on_return": "rollback", } @@ -39,28 +52,30 @@ else: # ENGINE WITH ADVANCED OPTIMIZATIONS # ============================================================================ +# PostgreSQL-специфичные connect_args +_pg_connect_args = { + "server_settings": { + "application_name": "remnawave_bot", + "jit": "on", + "statement_timeout": "60000", # 60 секунд + "idle_in_transaction_session_timeout": "300000", # 5 минут + }, + "command_timeout": 60, + "timeout": 10, +} + engine = create_async_engine( - settings.get_database_url(), + DATABASE_URL, poolclass=poolclass, echo=settings.DEBUG, future=True, - **pool_kwargs, - - connect_args={ - "server_settings": { - "application_name": "remnawave_bot", - "jit": "on", - "statement_timeout": "60000", # 60 секунд - "idle_in_transaction_session_timeout": "300000", # 5 минут - }, - "command_timeout": 60, - "timeout": 10, - } if not settings.get_database_url().startswith("sqlite") else {}, - + # Кеш скомпилированных запросов (правильное размещение) + query_cache_size=500, + connect_args=_pg_connect_args if not IS_SQLITE else {}, execution_options={ - "isolation_level": "READ COMMITTED", # Оптимальный для большинства случаев - "compiled_cache_size": 500, # Кеш скомпилированных запросов - } + "isolation_level": "READ COMMITTED", + }, + **pool_kwargs, ) # ============================================================================ @@ -71,10 +86,86 @@ AsyncSessionLocal = async_sessionmaker( bind=engine, class_=AsyncSession, expire_on_commit=False, - autoflush=False, # 🔥 Критично для производительности + autoflush=False, # Критично для производительности autocommit=False, ) +# ============================================================================ +# RETRY LOGIC FOR DATABASE OPERATIONS +# ============================================================================ + +RETRYABLE_EXCEPTIONS = (OperationalError, InterfaceError, ConnectionRefusedError, OSError) +DEFAULT_RETRY_ATTEMPTS = 3 +DEFAULT_RETRY_DELAY = 0.5 # секунды + + +def with_db_retry( + attempts: int = DEFAULT_RETRY_ATTEMPTS, + delay: float = DEFAULT_RETRY_DELAY, + backoff: float = 2.0, +) -> Callable: + """ + Декоратор для автоматического retry при сбоях подключения к БД. + + Args: + attempts: Количество попыток + delay: Начальная задержка между попытками (секунды) + backoff: Множитель задержки для каждой следующей попытки + """ + def decorator(func: Callable) -> Callable: + @wraps(func) + async def wrapper(*args, **kwargs): + last_exception = None + current_delay = delay + + for attempt in range(1, attempts + 1): + try: + return await func(*args, **kwargs) + except RETRYABLE_EXCEPTIONS as e: + last_exception = e + if attempt < attempts: + logger.warning( + "Ошибка БД (попытка %d/%d): %s. Повтор через %.1f сек...", + attempt, attempts, str(e)[:100], current_delay + ) + await asyncio.sleep(current_delay) + current_delay *= backoff + else: + logger.error( + "Ошибка БД: все %d попыток исчерпаны. Последняя ошибка: %s", + attempts, str(e) + ) + + raise last_exception + return wrapper + return decorator + + +async def execute_with_retry( + session: AsyncSession, + statement, + attempts: int = DEFAULT_RETRY_ATTEMPTS, +): + """Выполнение SQL с retry логикой.""" + last_exception = None + delay = DEFAULT_RETRY_DELAY + + for attempt in range(1, attempts + 1): + try: + return await session.execute(statement) + except RETRYABLE_EXCEPTIONS as e: + last_exception = e + if attempt < attempts: + logger.warning( + "SQL retry (попытка %d/%d): %s", + attempt, attempts, str(e)[:100] + ) + await asyncio.sleep(delay) + delay *= 2 + + raise last_exception + + # ============================================================================ # QUERY PERFORMANCE MONITORING # ============================================================================ @@ -97,35 +188,68 @@ if settings.DEBUG: # ADVANCED SESSION MANAGER WITH READ REPLICAS # ============================================================================ +HEALTH_CHECK_TIMEOUT = 5.0 # секунды + + +def _validate_database_url(url: Optional[str]) -> Optional[str]: + """Валидация URL базы данных.""" + if not url: + return None + url = url.strip() + if not url or url.isspace(): + return None + # Простая проверка на валидный формат + if not ("://" in url or url.startswith("sqlite")): + logger.warning("Невалидный DATABASE_URL: %s", url[:20]) + return None + return url + + class DatabaseManager: """Продвинутый менеджер БД с поддержкой реплик и кеширования""" - + def __init__(self): self.engine = engine self.read_replica_engine: Optional[AsyncEngine] = None - - if hasattr(settings, 'DATABASE_READ_REPLICA_URL') and settings.DATABASE_READ_REPLICA_URL: - self.read_replica_engine = create_async_engine( - settings.DATABASE_READ_REPLICA_URL, - poolclass=poolclass, - pool_size=30, # Больше для read операций - max_overflow=50, - pool_pre_ping=True, - echo=False, - ) - + self._read_replica_session_factory: Optional[async_sessionmaker] = None + + # Валидация и создание read replica engine + replica_url = _validate_database_url( + getattr(settings, 'DATABASE_READ_REPLICA_URL', None) + ) + if replica_url: + try: + self.read_replica_engine = create_async_engine( + replica_url, + poolclass=poolclass, + pool_size=30, # Больше для read операций + max_overflow=50, + pool_pre_ping=True, + pool_recycle=3600, + echo=False, + ) + # Создаём sessionmaker один раз (не при каждом вызове) + self._read_replica_session_factory = async_sessionmaker( + bind=self.read_replica_engine, + class_=AsyncSession, + expire_on_commit=False, + autoflush=False, + ) + logger.info("Read replica настроена: %s", replica_url[:30] + "...") + except Exception as e: + logger.error("Не удалось настроить read replica: %s", e) + self.read_replica_engine = None + @asynccontextmanager async def session(self, read_only: bool = False): - target_engine = self.read_replica_engine if (read_only and self.read_replica_engine) else self.engine - - async_session = async_sessionmaker( - bind=target_engine, - class_=AsyncSession, - expire_on_commit=False, - autoflush=False, - ) - - async with async_session() as session: + """Контекстный менеджер для работы с сессией БД.""" + # Используем предсозданный sessionmaker вместо создания нового + if read_only and self._read_replica_session_factory: + session_factory = self._read_replica_session_factory + else: + session_factory = AsyncSessionLocal + + async with session_factory() as session: try: yield session if not read_only: @@ -133,20 +257,31 @@ class DatabaseManager: except Exception: await session.rollback() raise - - async def health_check(self) -> dict: + + async def health_check(self, timeout: float = HEALTH_CHECK_TIMEOUT) -> dict: + """ + Проверка здоровья БД с таймаутом. + + Args: + timeout: Максимальное время ожидания (секунды) + """ pool = self.engine.pool + status = "unhealthy" + latency = None try: - async with AsyncSessionLocal() as session: - start = time.time() - await session.execute(text("SELECT 1")) - latency = (time.time() - start) * 1000 + async with asyncio.timeout(timeout): + async with AsyncSessionLocal() as session: + start = time.time() + await session.execute(text("SELECT 1")) + latency = (time.time() - start) * 1000 status = "healthy" + except asyncio.TimeoutError: + logger.error("Health check таймаут (%s сек)", timeout) + status = "timeout" except Exception as e: - logger.error(f"❌ Database health check failed: {e}") + logger.error("Database health check failed: %s", e) status = "unhealthy" - latency = None return { "status": status, @@ -154,6 +289,34 @@ class DatabaseManager: "pool": _collect_health_pool_metrics(pool), } + async def health_check_replica(self, timeout: float = HEALTH_CHECK_TIMEOUT) -> Optional[dict]: + """Проверка здоровья read replica.""" + if not self.read_replica_engine: + return None + + pool = self.read_replica_engine.pool + status = "unhealthy" + latency = None + + try: + async with asyncio.timeout(timeout): + async with self._read_replica_session_factory() as session: + start = time.time() + await session.execute(text("SELECT 1")) + latency = (time.time() - start) * 1000 + status = "healthy" + except asyncio.TimeoutError: + status = "timeout" + except Exception as e: + logger.error("Read replica health check failed: %s", e) + + return { + "status": status, + "latency_ms": round(latency, 2) if latency else None, + "pool": _collect_health_pool_metrics(pool), + } + + db_manager = DatabaseManager() # ============================================================================ @@ -254,13 +417,13 @@ batch_ops = BatchOperations() async def init_db(): """Инициализация БД с оптимизациями""" - logger.info("🚀 Создание таблиц базы данных...") - + logger.info("Создание таблиц базы данных...") + async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) - if not settings.get_database_url().startswith("sqlite"): - logger.info("📊 Создание индексов для оптимизации...") + if not IS_SQLITE: + logger.info("Создание индексов для оптимизации...") async with engine.begin() as conn: indexes = [ @@ -294,22 +457,23 @@ async def init_db(): await conn.execute(text(index_sql)) except Exception as e: logger.debug("Index creation skipped for %s: %s", table_name, e) - - logger.info("✅ База данных успешно инициализирована") - + + logger.info("База данных успешно инициализирована") + health = await db_manager.health_check() - logger.info(f"📊 Database health: {health}") + logger.info("Database health: %s", health) + async def close_db(): """Корректное закрытие всех соединений""" - logger.info("🔄 Закрытие соединений с БД...") - + logger.info("Закрытие соединений с БД...") + await engine.dispose() - + if db_manager.read_replica_engine: await db_manager.read_replica_engine.dispose() - - logger.info("✅ Все подключения к базе данных закрыты") + + logger.info("Все подключения к базе данных закрыты") # ============================================================================ # CONNECTION POOL METRICS (для мониторинга)