From 4d943471793a55e44ce037c34c93129d5329fda4 Mon Sep 17 00:00:00 2001 From: Egor Date: Sun, 9 Nov 2025 07:30:31 +0300 Subject: [PATCH] Revert "Handle PostgreSQL backups without pg_dump" --- app/services/backup_service.py | 1030 ++++++++------------------------ 1 file changed, 253 insertions(+), 777 deletions(-) diff --git a/app/services/backup_service.py b/app/services/backup_service.py index 9857e0d0..abfde7e5 100644 --- a/app/services/backup_service.py +++ b/app/services/backup_service.py @@ -1,21 +1,18 @@ import asyncio -import gzip import json as json_lib import logging +import gzip import os -import shutil -import tarfile import tempfile -from dataclasses import asdict, dataclass from datetime import datetime, timedelta from pathlib import Path -from typing import Any, Dict, List, Optional, Tuple - +from typing import Dict, Any, Optional, List, Tuple +from dataclasses import dataclass, asdict import aiofiles from aiogram.types import FSInputFile -from sqlalchemy import inspect, select, text -from sqlalchemy.exc import IntegrityError from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy import select, text, inspect +from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import selectinload from app.config import settings @@ -62,14 +59,12 @@ class BackupService: def __init__(self, bot=None): self.bot = bot - self.backup_dir = Path(settings.BACKUP_LOCATION).expanduser().resolve() - self.backup_dir.mkdir(parents=True, exist_ok=True) - self.data_dir = self.backup_dir.parent - self.archive_format_version = "2.0" + self.backup_dir = Path(settings.SQLITE_PATH).parent / "backups" + self.backup_dir.mkdir(exist_ok=True) self._auto_backup_task = None self._settings = self._load_settings() - - self._base_backup_models = [ + + self.backup_models_ordered = [ SystemSetting, ServiceRule, Squad, @@ -100,8 +95,6 @@ class BackupService: WebApiToken, ] - self.backup_models_ordered = self._base_backup_models.copy() - if self._settings.include_logs: self.backup_models_ordered.append(MonitoringLog) @@ -167,792 +160,290 @@ class BackupService: return timedelta(hours=hours) - def _get_models_for_backup(self, include_logs: bool) -> List[Any]: - models = self._base_backup_models.copy() - - if include_logs: - if MonitoringLog not in models: - models.append(MonitoringLog) - else: - models = [model for model in models if model is not MonitoringLog] - - return models - - def _resolve_command_path(self, command: str, env_var: str) -> Optional[str]: - override = os.getenv(env_var) - if override: - override_path = Path(override) - if override_path.exists() and os.access(override_path, os.X_OK): - return str(override_path) - logger.warning( - "Путь %s из %s недоступен или не является исполняемым", - override, - env_var, - ) - - resolved = shutil.which(command) - if resolved: - return resolved - - return None - async def create_backup( - self, + self, created_by: Optional[int] = None, compress: bool = True, include_logs: bool = None ) -> Tuple[bool, str, Optional[str]]: try: logger.info("📄 Начинаем создание бекапа...") - + if include_logs is None: include_logs = self._settings.include_logs + + models_to_backup = self.backup_models_ordered.copy() + if not include_logs and MonitoringLog in models_to_backup: + models_to_backup.remove(MonitoringLog) + elif include_logs and MonitoringLog not in models_to_backup: + models_to_backup.append(MonitoringLog) + + backup_data = {} + association_data = {} + total_records = 0 + + async for db in get_db(): + try: + for model in models_to_backup: + table_name = model.__tablename__ + logger.info(f"📊 Экспортируем таблицу: {table_name}") + + query = select(model) + + if model == User: + query = query.options(selectinload(User.subscription)) + elif model == Subscription: + query = query.options(selectinload(Subscription.user)) + elif model == Transaction: + query = query.options(selectinload(Transaction.user)) + + result = await db.execute(query) + records = result.scalars().all() + + table_data = [] + for record in records: + record_dict = {} + for column in model.__table__.columns: + value = getattr(record, column.name) + + if value is None: + record_dict[column.name] = None + elif isinstance(value, datetime): + record_dict[column.name] = value.isoformat() + elif isinstance(value, (list, dict)): + record_dict[column.name] = json_lib.dumps(value) if value else None + elif hasattr(value, '__dict__'): + record_dict[column.name] = str(value) + else: + record_dict[column.name] = value + + table_data.append(record_dict) + + backup_data[table_name] = table_data + total_records += len(table_data) + + logger.info(f"✅ Экспортировано {len(table_data)} записей из {table_name}") - overview = await self._collect_database_overview() + association_data = await self._export_association_tables(db) + for records in association_data.values(): + total_records += len(records) + break + except Exception as e: + logger.error(f"Ошибка при экспорте данных: {e}") + raise e + finally: + await db.close() + + metadata = BackupMetadata( + timestamp=datetime.utcnow().isoformat(), + database_type="postgresql" if settings.is_postgresql() else "sqlite", + backup_type="full", + tables_count=len(models_to_backup) + len(association_data), + total_records=total_records, + compressed=compress, + created_by=created_by, + file_size_bytes=0 + ) + timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") - archive_suffix = ".tar.gz" if compress else ".tar" - filename = f"backup_{timestamp}{archive_suffix}" + filename = f"backup_{timestamp}.json" + if compress: + filename += ".gz" + backup_path = self.backup_dir / filename - with tempfile.TemporaryDirectory() as temp_dir: - temp_path = Path(temp_dir) - staging_dir = temp_path / "backup" - staging_dir.mkdir(parents=True, exist_ok=True) + file_snapshots = await self._collect_file_snapshots() - database_info = await self._dump_database( - staging_dir, - include_logs=include_logs - ) - database_info.setdefault("tables_count", overview.get("tables_count", 0)) - database_info.setdefault("total_records", overview.get("total_records", 0)) - files_info = await self._collect_files(staging_dir, include_logs=include_logs) - data_snapshot_info = await self._collect_data_snapshot(staging_dir) - - metadata = { - "format_version": self.archive_format_version, - "timestamp": datetime.utcnow().isoformat(), - "database_type": "postgresql" if settings.is_postgresql() else "sqlite", - "backup_type": "full", - "tables_count": overview.get("tables_count", 0), - "total_records": overview.get("total_records", 0), - "compressed": True, - "created_by": created_by, - "database": database_info, - "files": files_info, - "data_snapshot": data_snapshot_info, - "settings": asdict(self._settings), + backup_structure = { + "metadata": asdict(metadata), + "data": backup_data, + "associations": association_data, + "files": file_snapshots, + "config": { + "backup_settings": asdict(self._settings) } - - metadata_path = staging_dir / "metadata.json" - async with aiofiles.open(metadata_path, "w", encoding="utf-8") as meta_file: - await meta_file.write(json_lib.dumps(metadata, ensure_ascii=False, indent=2)) - - mode = "w:gz" if compress else "w" - with tarfile.open(backup_path, mode) as tar: - for item in staging_dir.iterdir(): - tar.add(item, arcname=item.name) - + } + + if compress: + backup_json_str = json_lib.dumps(backup_structure, ensure_ascii=False, indent=2) + async with aiofiles.open(backup_path, 'wb') as f: + compressed_data = gzip.compress(backup_json_str.encode('utf-8')) + await f.write(compressed_data) + else: + async with aiofiles.open(backup_path, 'w', encoding='utf-8') as f: + await f.write(json_lib.dumps(backup_structure, ensure_ascii=False, indent=2)) + file_size = backup_path.stat().st_size - + backup_structure["metadata"]["file_size_bytes"] = file_size + + if compress: + backup_json_str = json_lib.dumps(backup_structure, ensure_ascii=False, indent=2) + async with aiofiles.open(backup_path, 'wb') as f: + compressed_data = gzip.compress(backup_json_str.encode('utf-8')) + await f.write(compressed_data) + else: + async with aiofiles.open(backup_path, 'w', encoding='utf-8') as f: + await f.write(json_lib.dumps(backup_structure, ensure_ascii=False, indent=2)) + await self._cleanup_old_backups() - + size_mb = file_size / 1024 / 1024 message = (f"✅ Бекап успешно создан!\n" f"📁 Файл: {filename}\n" - f"📊 Таблиц: {overview.get('tables_count', 0)}\n" - f"📈 Записей: {overview.get('total_records', 0):,}\n" + f"📊 Таблиц: {metadata.tables_count}\n" + f"📈 Записей: {total_records:,}\n" f"💾 Размер: {size_mb:.2f} MB") - + logger.info(message) - + if self.bot: await self._send_backup_notification( "success", message, str(backup_path) ) await self._send_backup_file_to_chat(str(backup_path)) - + return True, message, str(backup_path) - + except Exception as e: error_msg = f"❌ Ошибка создания бекапа: {str(e)}" logger.error(error_msg, exc_info=True) - + if self.bot: await self._send_backup_notification("error", error_msg) - + return False, error_msg, None async def restore_backup( - self, + self, backup_file_path: str, clear_existing: bool = False ) -> Tuple[bool, str]: try: logger.info(f"📄 Начинаем восстановление из {backup_file_path}") - + backup_path = Path(backup_file_path) if not backup_path.exists(): return False, f"❌ Файл бекапа не найден: {backup_file_path}" - - if self._is_archive_backup(backup_path): - success, message = await self._restore_from_archive(backup_path, clear_existing) + + if backup_path.suffix == '.gz': + async with aiofiles.open(backup_path, 'rb') as f: + compressed_data = await f.read() + uncompressed_data = gzip.decompress(compressed_data).decode('utf-8') + backup_structure = json_lib.loads(uncompressed_data) else: - success, message = await self._restore_from_legacy(backup_path, clear_existing) + async with aiofiles.open(backup_path, 'r', encoding='utf-8') as f: + file_content = await f.read() + backup_structure = json_lib.loads(file_content) + + metadata = backup_structure.get("metadata", {}) + backup_data = backup_structure.get("data", {}) + association_data = backup_structure.get("associations", {}) + file_snapshots = backup_structure.get("files", {}) + + if not backup_data: + return False, "❌ Файл бекапа не содержит данных" + + logger.info(f"📊 Загружен бекап от {metadata.get('timestamp')}") + logger.info(f"📈 Содержит {metadata.get('total_records', 0)} записей") + + restored_records = 0 + restored_tables = 0 + + async for db in get_db(): + try: + if clear_existing: + logger.warning("🗑️ Очищаем существующие данные...") + await self._clear_database_tables(db) + + models_by_table = {model.__tablename__: model for model in self.backup_models_ordered} - if success and self.bot: + pre_restore_tables = {"promo_groups"} + for table_name in pre_restore_tables: + model = models_by_table.get(table_name) + if not model: + continue + + records = backup_data.get(table_name, []) + if not records: + continue + + logger.info(f"🔥 Восстанавливаем таблицу {table_name} ({len(records)} записей)") + restored = await self._restore_table_records(db, model, table_name, records, clear_existing) + restored_records += restored + + if restored: + restored_tables += 1 + logger.info(f"✅ Таблица {table_name} восстановлена") + + await self._restore_users_without_referrals(db, backup_data, models_by_table) + + for model in self.backup_models_ordered: + table_name = model.__tablename__ + + if table_name == "users" or table_name in pre_restore_tables: + continue + + records = backup_data.get(table_name, []) + if not records: + continue + + logger.info(f"🔥 Восстанавливаем таблицу {table_name} ({len(records)} записей)") + restored = await self._restore_table_records(db, model, table_name, records, clear_existing) + restored_records += restored + + if restored: + restored_tables += 1 + logger.info(f"✅ Таблица {table_name} восстановлена") + + await self._update_user_referrals(db, backup_data) + + assoc_tables, assoc_records = await self._restore_association_tables( + db, + association_data, + clear_existing + ) + restored_tables += assoc_tables + restored_records += assoc_records + + await db.commit() + + break + + except Exception as e: + await db.rollback() + logger.error(f"Ошибка при восстановлении: {e}") + raise e + finally: + await db.close() + + message = (f"✅ Восстановление завершено!\n" + f"📊 Таблиц: {restored_tables}\n" + f"📈 Записей: {restored_records:,}\n" + f"📅 Дата бекапа: {metadata.get('timestamp', 'неизвестно')}") + + logger.info(message) + + if self.bot: await self._send_backup_notification("restore_success", message) - elif not success and self.bot: - await self._send_backup_notification("restore_error", message) - - return success, message + if file_snapshots: + restored_files = await self._restore_file_snapshots(file_snapshots) + if restored_files: + logger.info(f"📁 Восстановлено файлов конфигурации: {restored_files}") + + return True, message + except Exception as e: error_msg = f"❌ Ошибка восстановления: {str(e)}" logger.error(error_msg, exc_info=True) - + if self.bot: await self._send_backup_notification("restore_error", error_msg) - + return False, error_msg - async def _collect_database_overview(self) -> Dict[str, Any]: - overview: Dict[str, Any] = { - "tables_count": 0, - "total_records": 0, - "tables": [], - } - - try: - async with engine.begin() as conn: - table_names = await conn.run_sync( - lambda sync_conn: inspect(sync_conn).get_table_names() - ) - - for table_name in table_names: - try: - result = await conn.execute(text(f"SELECT COUNT(*) FROM {table_name}")) - count = result.scalar_one() - except Exception: - count = 0 - - overview["tables"].append({"name": table_name, "rows": count}) - overview["total_records"] += count - - overview["tables_count"] = len(table_names) - except Exception as exc: - logger.warning("Не удалось собрать статистику по БД: %s", exc) - - return overview - - async def _dump_database(self, staging_dir: Path, include_logs: bool) -> Dict[str, Any]: - if settings.is_postgresql(): - pg_dump_path = self._resolve_command_path("pg_dump", "PG_DUMP_PATH") - - if pg_dump_path: - dump_path = staging_dir / "database.sql" - await self._dump_postgres(dump_path, pg_dump_path) - size = dump_path.stat().st_size if dump_path.exists() else 0 - return { - "type": "postgresql", - "path": dump_path.name, - "size_bytes": size, - "format": "sql", - "tool": pg_dump_path, - } - - logger.warning( - "pg_dump не найден в PATH. Используется ORM-дамп в формате JSON" - ) - json_info = await self._dump_postgres_json(staging_dir, include_logs) - return json_info - - dump_path = staging_dir / "database.sqlite" - await self._dump_sqlite(dump_path) - size = dump_path.stat().st_size if dump_path.exists() else 0 - return { - "type": "sqlite", - "path": dump_path.name, - "size_bytes": size, - "format": "file", - } - - async def _dump_postgres(self, dump_path: Path, pg_dump_path: str): - env = os.environ.copy() - env.update({ - "PGHOST": settings.POSTGRES_HOST, - "PGPORT": str(settings.POSTGRES_PORT), - "PGUSER": settings.POSTGRES_USER, - "PGPASSWORD": settings.POSTGRES_PASSWORD, - }) - - command = [ - pg_dump_path, - "--format=plain", - "--no-owner", - "--no-privileges", - settings.POSTGRES_DB, - ] - - logger.info("📦 Экспорт PostgreSQL через pg_dump (%s)...", pg_dump_path) - dump_path.parent.mkdir(parents=True, exist_ok=True) - - with dump_path.open("wb") as dump_file: - process = await asyncio.create_subprocess_exec( - *command, - stdout=dump_file, - stderr=asyncio.subprocess.PIPE, - env=env, - ) - _, stderr = await process.communicate() - - if process.returncode != 0: - error_text = stderr.decode() if stderr else "pg_dump error" - raise RuntimeError(f"pg_dump завершился с ошибкой: {error_text}") - - logger.info("✅ PostgreSQL dump создан (%s)", dump_path) - - async def _dump_postgres_json(self, staging_dir: Path, include_logs: bool) -> Dict[str, Any]: - models_to_backup = self._get_models_for_backup(include_logs) - ( - backup_data, - association_data, - total_records, - tables_count, - ) = await self._export_database_via_orm(models_to_backup) - - dump_path = staging_dir / "database.json" - dump_structure = { - "metadata": { - "timestamp": datetime.utcnow().isoformat(), - "version": "orm-1.0", - "database_type": "postgresql", - "tables_count": tables_count, - "total_records": total_records, - }, - "data": backup_data, - "associations": association_data, - } - - async with aiofiles.open(dump_path, "w", encoding="utf-8") as dump_file: - await dump_file.write( - json_lib.dumps(dump_structure, ensure_ascii=False, indent=2) - ) - - size = dump_path.stat().st_size if dump_path.exists() else 0 - - logger.info( - "✅ PostgreSQL экспортирован через ORM в JSON (%s)", - dump_path, - ) - - return { - "type": "postgresql", - "path": dump_path.name, - "size_bytes": size, - "format": "json", - "tool": "orm", - "format_version": "orm-1.0", - "tables_count": tables_count, - "total_records": total_records, - } - - async def _dump_sqlite(self, dump_path: Path): - sqlite_path = Path(settings.SQLITE_PATH) - if not sqlite_path.exists(): - raise FileNotFoundError(f"SQLite база данных не найдена по пути {sqlite_path}") - - dump_path.parent.mkdir(parents=True, exist_ok=True) - await asyncio.to_thread(shutil.copy2, sqlite_path, dump_path) - logger.info("✅ SQLite база данных скопирована (%s)", dump_path) - - async def _export_database_via_orm( - self, - models_to_backup: List[Any], - ) -> Tuple[Dict[str, List[Dict[str, Any]]], Dict[str, List[Dict[str, Any]]], int, int]: - backup_data: Dict[str, List[Dict[str, Any]]] = {} - total_records = 0 - - async for db in get_db(): - try: - for model in models_to_backup: - table_name = model.__tablename__ - logger.info("📊 Экспортируем таблицу: %s", table_name) - - query = select(model) - - if model == User: - query = query.options(selectinload(User.subscription)) - elif model == Subscription: - query = query.options(selectinload(Subscription.user)) - elif model == Transaction: - query = query.options(selectinload(Transaction.user)) - - result = await db.execute(query) - records = result.scalars().all() - - table_data: List[Dict[str, Any]] = [] - for record in records: - record_dict: Dict[str, Any] = {} - for column in model.__table__.columns: - value = getattr(record, column.name) - - if value is None: - record_dict[column.name] = None - elif isinstance(value, datetime): - record_dict[column.name] = value.isoformat() - elif isinstance(value, (list, dict)): - record_dict[column.name] = ( - json_lib.dumps(value) if value else None - ) - elif hasattr(value, "__dict__"): - record_dict[column.name] = str(value) - else: - record_dict[column.name] = value - - table_data.append(record_dict) - - backup_data[table_name] = table_data - total_records += len(table_data) - - logger.info( - "✅ Экспортировано %s записей из %s", - len(table_data), - table_name, - ) - - association_data = await self._export_association_tables(db) - for records in association_data.values(): - total_records += len(records) - - tables_count = len(models_to_backup) + len(association_data) - return backup_data, association_data, total_records, tables_count - - except Exception as exc: - logger.error("Ошибка при экспорте данных: %s", exc) - raise exc - finally: - await db.close() - - return backup_data, {}, total_records, len(models_to_backup) - - async def _collect_files(self, staging_dir: Path, include_logs: bool) -> List[Dict[str, Any]]: - files_info: List[Dict[str, Any]] = [] - files_dir = staging_dir / "files" - files_dir.mkdir(parents=True, exist_ok=True) - - app_config_path = settings.get_app_config_path() - if app_config_path: - src = Path(app_config_path) - if src.exists(): - dest = files_dir / src.name - await asyncio.to_thread(shutil.copy2, src, dest) - files_info.append({ - "path": str(src), - "relative_path": f"files/{src.name}", - }) - - if include_logs and settings.LOG_FILE: - log_path = Path(settings.LOG_FILE) - if log_path.exists(): - dest = files_dir / log_path.name - await asyncio.to_thread(shutil.copy2, log_path, dest) - files_info.append({ - "path": str(log_path), - "relative_path": f"files/{log_path.name}", - }) - - if not files_info and files_dir.exists(): - files_dir.rmdir() - - return files_info - - async def _collect_data_snapshot(self, staging_dir: Path) -> Dict[str, Any]: - data_dir = staging_dir / "data" - snapshot_info: Dict[str, Any] = { - "path": str(self.data_dir), - "items": 0, - } - - if not self.data_dir.exists(): - return snapshot_info - - counter = {"items": 0} - - def _copy_data(): - data_dir.mkdir(parents=True, exist_ok=True) - for item in self.data_dir.iterdir(): - if item.resolve() == self.backup_dir.resolve(): - continue - - destination = data_dir / item.name - if item.is_dir(): - shutil.copytree(item, destination, dirs_exist_ok=True) - else: - shutil.copy2(item, destination) - counter["items"] += 1 - - await asyncio.to_thread(_copy_data) - snapshot_info["items"] = counter["items"] - return snapshot_info - - def _is_archive_backup(self, backup_path: Path) -> bool: - suffixes = backup_path.suffixes - if (len(suffixes) >= 2 and suffixes[-2:] == [".tar", ".gz"]) or (suffixes and suffixes[-1] == ".tar"): - return True - try: - return tarfile.is_tarfile(backup_path) - except Exception: - return False - - async def _restore_from_archive( - self, - backup_path: Path, - clear_existing: bool, - ) -> Tuple[bool, str]: - with tempfile.TemporaryDirectory() as temp_dir: - temp_path = Path(temp_dir) - - mode = "r:gz" if backup_path.suffixes and backup_path.suffixes[-1] == ".gz" else "r" - with tarfile.open(backup_path, mode) as tar: - tar.extractall(temp_path) - - metadata_path = temp_path / "metadata.json" - if not metadata_path.exists(): - return False, "❌ Метаданные бекапа отсутствуют" - - async with aiofiles.open(metadata_path, "r", encoding="utf-8") as meta_file: - metadata = json_lib.loads(await meta_file.read()) - - logger.info("📊 Загружен бекап формата %s", metadata.get("format_version", "unknown")) - - database_info = metadata.get("database", {}) - data_snapshot_info = metadata.get("data_snapshot", {}) - files_info = metadata.get("files", []) - - if database_info.get("type") == "postgresql": - db_format = database_info.get("format", "sql") - default_name = "database.json" if db_format == "json" else "database.sql" - dump_file = temp_path / database_info.get("path", default_name) - - if db_format == "json": - await self._restore_postgres_json(dump_file, clear_existing) - else: - await self._restore_postgres(dump_file, clear_existing) - else: - dump_file = temp_path / database_info.get("path", "database.sqlite") - await self._restore_sqlite(dump_file, clear_existing) - - data_dir = temp_path / "data" - if data_dir.exists(): - await self._restore_data_snapshot(data_dir, clear_existing) - - if files_info: - await self._restore_files(files_info, temp_path) - - message = (f"✅ Восстановление завершено!\n" - f"📊 Таблиц: {metadata.get('tables_count', 0)}\n" - f"📈 Записей: {metadata.get('total_records', 0):,}\n" - f"📅 Дата бекапа: {metadata.get('timestamp', 'неизвестно')}") - - logger.info(message) - return True, message - - async def _restore_postgres(self, dump_path: Path, clear_existing: bool): - if not dump_path.exists(): - raise FileNotFoundError(f"Dump PostgreSQL не найден: {dump_path}") - - psql_path = self._resolve_command_path("psql", "PSQL_PATH") - if not psql_path: - raise FileNotFoundError( - "psql не найден в PATH. Установите клиент PostgreSQL или выполните восстановление из JSON дампа" - ) - - env = os.environ.copy() - env.update({ - "PGHOST": settings.POSTGRES_HOST, - "PGPORT": str(settings.POSTGRES_PORT), - "PGUSER": settings.POSTGRES_USER, - "PGPASSWORD": settings.POSTGRES_PASSWORD, - }) - - if clear_existing: - logger.info("🗑️ Полная очистка схемы PostgreSQL перед восстановлением") - drop_command = [ - psql_path, - settings.POSTGRES_DB, - "-c", - "DROP SCHEMA public CASCADE; CREATE SCHEMA public; GRANT ALL ON SCHEMA public TO public;", - ] - proc = await asyncio.create_subprocess_exec( - *drop_command, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - env=env, - ) - _, stderr = await proc.communicate() - if proc.returncode != 0: - raise RuntimeError(f"Не удалось очистить схему: {stderr.decode()}") - - logger.info("📥 Восстановление PostgreSQL через psql (%s)...", psql_path) - restore_command = [ - psql_path, - settings.POSTGRES_DB, - "-f", - str(dump_path), - ] - proc = await asyncio.create_subprocess_exec( - *restore_command, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - env=env, - ) - stdout, stderr = await proc.communicate() - - if proc.returncode != 0: - raise RuntimeError(f"Ошибка psql: {stderr.decode()}") - - logger.info("✅ PostgreSQL восстановлен (%s)", dump_path) - - async def _restore_postgres_json(self, dump_path: Path, clear_existing: bool): - if not dump_path.exists(): - raise FileNotFoundError(f"JSON дамп PostgreSQL не найден: {dump_path}") - - async with aiofiles.open(dump_path, "r", encoding="utf-8") as dump_file: - dump_data = json_lib.loads(await dump_file.read()) - - metadata = dump_data.get("metadata", {}) - backup_data = dump_data.get("data", {}) - association_data = dump_data.get("associations", {}) - - await self._restore_database_payload( - backup_data, - association_data, - metadata, - clear_existing, - ) - - logger.info("✅ PostgreSQL восстановлен из ORM JSON (%s)", dump_path) - - async def _restore_sqlite(self, dump_path: Path, clear_existing: bool): - if not dump_path.exists(): - raise FileNotFoundError(f"SQLite файл не найден: {dump_path}") - - target_path = Path(settings.SQLITE_PATH) - target_path.parent.mkdir(parents=True, exist_ok=True) - - if clear_existing and target_path.exists(): - target_path.unlink() - - await asyncio.to_thread(shutil.copy2, dump_path, target_path) - logger.info("✅ SQLite база восстановлена (%s)", target_path) - - async def _restore_data_snapshot(self, source_dir: Path, clear_existing: bool): - if not source_dir.exists(): - return - - def _restore(): - self.data_dir.mkdir(parents=True, exist_ok=True) - for item in source_dir.iterdir(): - if item.name == self.backup_dir.name: - continue - - destination = self.data_dir / item.name - if clear_existing and destination.exists(): - if destination.is_dir(): - shutil.rmtree(destination) - else: - destination.unlink() - - if item.is_dir(): - shutil.copytree(item, destination, dirs_exist_ok=True) - else: - shutil.copy2(item, destination) - - await asyncio.to_thread(_restore) - logger.info("📁 Снимок директории data восстановлен") - - async def _restore_files(self, files_info: List[Dict[str, Any]], temp_path: Path): - for file_info in files_info: - relative_path = file_info.get("relative_path") - target_path = Path(file_info.get("path", "")) - if not relative_path or not target_path: - continue - - source_file = temp_path / relative_path - if not source_file.exists(): - logger.warning("Файл %s отсутствует в архиве", relative_path) - continue - - target_path.parent.mkdir(parents=True, exist_ok=True) - await asyncio.to_thread(shutil.copy2, source_file, target_path) - logger.info("📁 Файл %s восстановлен", target_path) - - async def _restore_database_payload( - self, - backup_data: Dict[str, List[Dict[str, Any]]], - association_data: Dict[str, List[Dict[str, Any]]], - metadata: Dict[str, Any], - clear_existing: bool, - ) -> Tuple[int, int]: - if not backup_data: - raise ValueError("❌ Файл бекапа не содержит данных") - - logger.info( - "📊 Загружен дамп: %s", - metadata.get("timestamp", "неизвестная дата"), - ) - - estimated_records = metadata.get("total_records") - if estimated_records is None: - estimated_records = sum(len(records) for records in backup_data.values()) - estimated_records += sum(len(records) for records in association_data.values()) - - logger.info("📈 Содержит %s записей", estimated_records) - - restored_records = 0 - restored_tables = 0 - - async for db in get_db(): - try: - if clear_existing: - logger.warning("🗑️ Очищаем существующие данные...") - await self._clear_database_tables(db) - - models_for_restore = self._get_models_for_backup(True) - models_by_table = { - model.__tablename__: model for model in models_for_restore - } - - pre_restore_tables = {"promo_groups"} - for table_name in pre_restore_tables: - model = models_by_table.get(table_name) - if not model: - continue - - records = backup_data.get(table_name, []) - if not records: - continue - - logger.info( - "🔥 Восстанавливаем таблицу %s (%s записей)", - table_name, - len(records), - ) - restored = await self._restore_table_records( - db, - model, - table_name, - records, - clear_existing, - ) - restored_records += restored - - if restored: - restored_tables += 1 - logger.info("✅ Таблица %s восстановлена", table_name) - - await self._restore_users_without_referrals( - db, - backup_data, - models_by_table, - ) - - for model in models_for_restore: - table_name = model.__tablename__ - - if table_name == "users" or table_name in pre_restore_tables: - continue - - records = backup_data.get(table_name, []) - if not records: - continue - - logger.info( - "🔥 Восстанавливаем таблицу %s (%s записей)", - table_name, - len(records), - ) - restored = await self._restore_table_records( - db, - model, - table_name, - records, - clear_existing, - ) - restored_records += restored - - if restored: - restored_tables += 1 - logger.info("✅ Таблица %s восстановлена", table_name) - - await self._update_user_referrals(db, backup_data) - - assoc_tables, assoc_records = await self._restore_association_tables( - db, - association_data, - clear_existing, - ) - restored_tables += assoc_tables - restored_records += assoc_records - - await db.commit() - - break - - except Exception as exc: - await db.rollback() - logger.error("Ошибка при восстановлении: %s", exc) - raise exc - finally: - await db.close() - - return restored_tables, restored_records - - async def _restore_from_legacy( - self, - backup_path: Path, - clear_existing: bool, - ) -> Tuple[bool, str]: - if backup_path.suffix == '.gz': - async with aiofiles.open(backup_path, 'rb') as f: - compressed_data = await f.read() - uncompressed_data = gzip.decompress(compressed_data).decode('utf-8') - backup_structure = json_lib.loads(uncompressed_data) - else: - async with aiofiles.open(backup_path, 'r', encoding='utf-8') as f: - file_content = await f.read() - backup_structure = json_lib.loads(file_content) - - metadata = backup_structure.get("metadata", {}) - backup_data = backup_structure.get("data", {}) - association_data = backup_structure.get("associations", {}) - file_snapshots = backup_structure.get("files", {}) - - try: - restored_tables, restored_records = await self._restore_database_payload( - backup_data, - association_data, - metadata, - clear_existing, - ) - except ValueError as exc: - return False, str(exc) - - if file_snapshots: - restored_files = await self._restore_file_snapshots(file_snapshots) - if restored_files: - logger.info(f"📁 Восстановлено файлов конфигурации: {restored_files}") - - message = (f"✅ Восстановление завершено!\n" - f"📊 Таблиц: {restored_tables}\n" - f"📈 Записей: {restored_records:,}\n" - f"📅 Дата бекапа: {metadata.get('timestamp', 'неизвестно')}") - - logger.info(message) - return True, message - async def _restore_users_without_referrals(self, db: AsyncSession, backup_data: dict, models_by_table: dict): users_data = backup_data.get("users", []) if not users_data: @@ -1301,49 +792,34 @@ class BackupService: backups = [] try: - for backup_file in sorted(self.backup_dir.glob("backup_*"), reverse=True): - if not backup_file.is_file(): - continue - + for backup_file in sorted(self.backup_dir.glob("backup_*.json*"), reverse=True): try: - metadata = {} - - if self._is_archive_backup(backup_file): - mode = "r:gz" if backup_file.suffixes and backup_file.suffixes[-1] == ".gz" else "r" - with tarfile.open(backup_file, mode) as tar: - try: - member = tar.getmember("metadata.json") - with tar.extractfile(member) as meta_file: - metadata = json_lib.load(meta_file) - except KeyError: - metadata = {} + if backup_file.suffix == '.gz': + with gzip.open(backup_file, 'rt', encoding='utf-8') as f: + backup_structure = json_lib.load(f) else: - if backup_file.suffix == '.gz': - with gzip.open(backup_file, 'rt', encoding='utf-8') as f: - backup_structure = json_lib.load(f) - else: - with open(backup_file, 'r', encoding='utf-8') as f: - backup_structure = json_lib.load(f) - metadata = backup_structure.get("metadata", {}) - + with open(backup_file, 'r', encoding='utf-8') as f: + backup_structure = json_lib.load(f) + + metadata = backup_structure.get("metadata", {}) file_stats = backup_file.stat() - + backup_info = { "filename": backup_file.name, "filepath": str(backup_file), - "timestamp": metadata.get("timestamp", datetime.fromtimestamp(file_stats.st_mtime).isoformat()), - "tables_count": metadata.get("tables_count", metadata.get("database", {}).get("tables_count", 0)), - "total_records": metadata.get("total_records", metadata.get("database", {}).get("total_records", 0)), - "compressed": self._is_archive_backup(backup_file) or backup_file.suffix == '.gz', + "timestamp": metadata.get("timestamp"), + "tables_count": metadata.get("tables_count", 0), + "total_records": metadata.get("total_records", 0), + "compressed": metadata.get("compressed", False), "file_size_bytes": file_stats.st_size, "file_size_mb": round(file_stats.st_size / 1024 / 1024, 2), "created_by": metadata.get("created_by"), - "database_type": metadata.get("database_type", metadata.get("database", {}).get("type", "unknown")), - "version": metadata.get("format_version", metadata.get("version", "1.0")), + "database_type": metadata.get("database_type", "unknown"), + "version": metadata.get("version", "1.0") } - + backups.append(backup_info) - + except Exception as e: logger.error(f"Ошибка чтения метаданных {backup_file}: {e}") file_stats = backup_file.stat()