Update database.py

This commit is contained in:
Egor
2025-12-24 14:44:02 +03:00
committed by GitHub
parent 2c6c7056e8
commit dd87b4d408

View File

@@ -1,26 +1,39 @@
import asyncio
import logging
from typing import AsyncGenerator, Optional
from functools import wraps
from typing import AsyncGenerator, Callable, Optional, TypeVar
from contextlib import asynccontextmanager
from sqlalchemy.ext.asyncio import (
AsyncSession,
create_async_engine,
AsyncSession,
create_async_engine,
async_sessionmaker,
AsyncEngine
)
from sqlalchemy.pool import NullPool, AsyncAdaptedQueuePool
from sqlalchemy import event, text, bindparam, inspect
from sqlalchemy.engine import Engine
from sqlalchemy.exc import OperationalError, InterfaceError
import time
from app.config import settings
from app.database.models import Base
logger = logging.getLogger(__name__)
T = TypeVar("T")
# ============================================================================
# PRODUCTION-GRADE CONNECTION POOLING
# ============================================================================
if settings.get_database_url().startswith("sqlite"):
def _is_sqlite_url(url: str) -> bool:
"""Проверка на SQLite URL (поддерживает sqlite:// и sqlite+aiosqlite://)"""
return url.startswith("sqlite") or ":memory:" in url
DATABASE_URL = settings.get_database_url()
IS_SQLITE = _is_sqlite_url(DATABASE_URL)
if IS_SQLITE:
poolclass = NullPool
pool_kwargs = {}
else:
@@ -31,7 +44,7 @@ else:
"pool_timeout": 30,
"pool_recycle": 3600,
"pool_pre_ping": True,
# 🔥 Агрессивная очистка мертвых соединений
# Агрессивная очистка мертвых соединений
"pool_reset_on_return": "rollback",
}
@@ -39,28 +52,30 @@ else:
# ENGINE WITH ADVANCED OPTIMIZATIONS
# ============================================================================
# PostgreSQL-специфичные connect_args
_pg_connect_args = {
"server_settings": {
"application_name": "remnawave_bot",
"jit": "on",
"statement_timeout": "60000", # 60 секунд
"idle_in_transaction_session_timeout": "300000", # 5 минут
},
"command_timeout": 60,
"timeout": 10,
}
engine = create_async_engine(
settings.get_database_url(),
DATABASE_URL,
poolclass=poolclass,
echo=settings.DEBUG,
future=True,
**pool_kwargs,
connect_args={
"server_settings": {
"application_name": "remnawave_bot",
"jit": "on",
"statement_timeout": "60000", # 60 секунд
"idle_in_transaction_session_timeout": "300000", # 5 минут
},
"command_timeout": 60,
"timeout": 10,
} if not settings.get_database_url().startswith("sqlite") else {},
# Кеш скомпилированных запросов (правильное размещение)
query_cache_size=500,
connect_args=_pg_connect_args if not IS_SQLITE else {},
execution_options={
"isolation_level": "READ COMMITTED", # Оптимальный для большинства случаев
"compiled_cache_size": 500, # Кеш скомпилированных запросов
}
"isolation_level": "READ COMMITTED",
},
**pool_kwargs,
)
# ============================================================================
@@ -71,10 +86,86 @@ AsyncSessionLocal = async_sessionmaker(
bind=engine,
class_=AsyncSession,
expire_on_commit=False,
autoflush=False, # 🔥 Критично для производительности
autoflush=False, # Критично для производительности
autocommit=False,
)
# ============================================================================
# RETRY LOGIC FOR DATABASE OPERATIONS
# ============================================================================
RETRYABLE_EXCEPTIONS = (OperationalError, InterfaceError, ConnectionRefusedError, OSError)
DEFAULT_RETRY_ATTEMPTS = 3
DEFAULT_RETRY_DELAY = 0.5 # секунды
def with_db_retry(
attempts: int = DEFAULT_RETRY_ATTEMPTS,
delay: float = DEFAULT_RETRY_DELAY,
backoff: float = 2.0,
) -> Callable:
"""
Декоратор для автоматического retry при сбоях подключения к БД.
Args:
attempts: Количество попыток
delay: Начальная задержка между попытками (секунды)
backoff: Множитель задержки для каждой следующей попытки
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(*args, **kwargs):
last_exception = None
current_delay = delay
for attempt in range(1, attempts + 1):
try:
return await func(*args, **kwargs)
except RETRYABLE_EXCEPTIONS as e:
last_exception = e
if attempt < attempts:
logger.warning(
"Ошибка БД (попытка %d/%d): %s. Повтор через %.1f сек...",
attempt, attempts, str(e)[:100], current_delay
)
await asyncio.sleep(current_delay)
current_delay *= backoff
else:
logger.error(
"Ошибка БД: все %d попыток исчерпаны. Последняя ошибка: %s",
attempts, str(e)
)
raise last_exception
return wrapper
return decorator
async def execute_with_retry(
session: AsyncSession,
statement,
attempts: int = DEFAULT_RETRY_ATTEMPTS,
):
"""Выполнение SQL с retry логикой."""
last_exception = None
delay = DEFAULT_RETRY_DELAY
for attempt in range(1, attempts + 1):
try:
return await session.execute(statement)
except RETRYABLE_EXCEPTIONS as e:
last_exception = e
if attempt < attempts:
logger.warning(
"SQL retry (попытка %d/%d): %s",
attempt, attempts, str(e)[:100]
)
await asyncio.sleep(delay)
delay *= 2
raise last_exception
# ============================================================================
# QUERY PERFORMANCE MONITORING
# ============================================================================
@@ -97,35 +188,68 @@ if settings.DEBUG:
# ADVANCED SESSION MANAGER WITH READ REPLICAS
# ============================================================================
HEALTH_CHECK_TIMEOUT = 5.0 # секунды
def _validate_database_url(url: Optional[str]) -> Optional[str]:
"""Валидация URL базы данных."""
if not url:
return None
url = url.strip()
if not url or url.isspace():
return None
# Простая проверка на валидный формат
if not ("://" in url or url.startswith("sqlite")):
logger.warning("Невалидный DATABASE_URL: %s", url[:20])
return None
return url
class DatabaseManager:
"""Продвинутый менеджер БД с поддержкой реплик и кеширования"""
def __init__(self):
self.engine = engine
self.read_replica_engine: Optional[AsyncEngine] = None
if hasattr(settings, 'DATABASE_READ_REPLICA_URL') and settings.DATABASE_READ_REPLICA_URL:
self.read_replica_engine = create_async_engine(
settings.DATABASE_READ_REPLICA_URL,
poolclass=poolclass,
pool_size=30, # Больше для read операций
max_overflow=50,
pool_pre_ping=True,
echo=False,
)
self._read_replica_session_factory: Optional[async_sessionmaker] = None
# Валидация и создание read replica engine
replica_url = _validate_database_url(
getattr(settings, 'DATABASE_READ_REPLICA_URL', None)
)
if replica_url:
try:
self.read_replica_engine = create_async_engine(
replica_url,
poolclass=poolclass,
pool_size=30, # Больше для read операций
max_overflow=50,
pool_pre_ping=True,
pool_recycle=3600,
echo=False,
)
# Создаём sessionmaker один раз (не при каждом вызове)
self._read_replica_session_factory = async_sessionmaker(
bind=self.read_replica_engine,
class_=AsyncSession,
expire_on_commit=False,
autoflush=False,
)
logger.info("Read replica настроена: %s", replica_url[:30] + "...")
except Exception as e:
logger.error("Не удалось настроить read replica: %s", e)
self.read_replica_engine = None
@asynccontextmanager
async def session(self, read_only: bool = False):
target_engine = self.read_replica_engine if (read_only and self.read_replica_engine) else self.engine
async_session = async_sessionmaker(
bind=target_engine,
class_=AsyncSession,
expire_on_commit=False,
autoflush=False,
)
async with async_session() as session:
"""Контекстный менеджер для работы с сессией БД."""
# Используем предсозданный sessionmaker вместо создания нового
if read_only and self._read_replica_session_factory:
session_factory = self._read_replica_session_factory
else:
session_factory = AsyncSessionLocal
async with session_factory() as session:
try:
yield session
if not read_only:
@@ -133,20 +257,31 @@ class DatabaseManager:
except Exception:
await session.rollback()
raise
async def health_check(self) -> dict:
async def health_check(self, timeout: float = HEALTH_CHECK_TIMEOUT) -> dict:
"""
Проверка здоровья БД с таймаутом.
Args:
timeout: Максимальное время ожидания (секунды)
"""
pool = self.engine.pool
status = "unhealthy"
latency = None
try:
async with AsyncSessionLocal() as session:
start = time.time()
await session.execute(text("SELECT 1"))
latency = (time.time() - start) * 1000
async with asyncio.timeout(timeout):
async with AsyncSessionLocal() as session:
start = time.time()
await session.execute(text("SELECT 1"))
latency = (time.time() - start) * 1000
status = "healthy"
except asyncio.TimeoutError:
logger.error("Health check таймаут (%s сек)", timeout)
status = "timeout"
except Exception as e:
logger.error(f"Database health check failed: {e}")
logger.error("Database health check failed: %s", e)
status = "unhealthy"
latency = None
return {
"status": status,
@@ -154,6 +289,34 @@ class DatabaseManager:
"pool": _collect_health_pool_metrics(pool),
}
async def health_check_replica(self, timeout: float = HEALTH_CHECK_TIMEOUT) -> Optional[dict]:
"""Проверка здоровья read replica."""
if not self.read_replica_engine:
return None
pool = self.read_replica_engine.pool
status = "unhealthy"
latency = None
try:
async with asyncio.timeout(timeout):
async with self._read_replica_session_factory() as session:
start = time.time()
await session.execute(text("SELECT 1"))
latency = (time.time() - start) * 1000
status = "healthy"
except asyncio.TimeoutError:
status = "timeout"
except Exception as e:
logger.error("Read replica health check failed: %s", e)
return {
"status": status,
"latency_ms": round(latency, 2) if latency else None,
"pool": _collect_health_pool_metrics(pool),
}
db_manager = DatabaseManager()
# ============================================================================
@@ -254,13 +417,13 @@ batch_ops = BatchOperations()
async def init_db():
"""Инициализация БД с оптимизациями"""
logger.info("🚀 Создание таблиц базы данных...")
logger.info("Создание таблиц базы данных...")
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
if not settings.get_database_url().startswith("sqlite"):
logger.info("📊 Создание индексов для оптимизации...")
if not IS_SQLITE:
logger.info("Создание индексов для оптимизации...")
async with engine.begin() as conn:
indexes = [
@@ -294,22 +457,23 @@ async def init_db():
await conn.execute(text(index_sql))
except Exception as e:
logger.debug("Index creation skipped for %s: %s", table_name, e)
logger.info("База данных успешно инициализирована")
logger.info("База данных успешно инициализирована")
health = await db_manager.health_check()
logger.info(f"📊 Database health: {health}")
logger.info("Database health: %s", health)
async def close_db():
"""Корректное закрытие всех соединений"""
logger.info("🔄 Закрытие соединений с БД...")
logger.info("Закрытие соединений с БД...")
await engine.dispose()
if db_manager.read_replica_engine:
await db_manager.read_replica_engine.dispose()
logger.info("Все подключения к базе данных закрыты")
logger.info("Все подключения к базе данных закрыты")
# ============================================================================
# CONNECTION POOL METRICS (для мониторинга)