mirror of
https://github.com/BEDOLAGA-DEV/remnawave-bedolaga-telegram-bot.git
synced 2026-04-28 16:50:08 +00:00
Refactor backup system with archive dumps
This commit is contained in:
@@ -1,18 +1,21 @@
|
||||
import asyncio
|
||||
import gzip
|
||||
import json as json_lib
|
||||
import logging
|
||||
import gzip
|
||||
import os
|
||||
import shutil
|
||||
import tarfile
|
||||
import tempfile
|
||||
from dataclasses import asdict, dataclass
|
||||
from datetime import datetime, timedelta
|
||||
from pathlib import Path
|
||||
from typing import Dict, Any, Optional, List, Tuple
|
||||
from dataclasses import dataclass, asdict
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
import aiofiles
|
||||
from aiogram.types import FSInputFile
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from sqlalchemy import select, text, inspect
|
||||
from sqlalchemy import inspect, select, text
|
||||
from sqlalchemy.exc import IntegrityError
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from sqlalchemy.orm import selectinload
|
||||
|
||||
from app.config import settings
|
||||
@@ -59,8 +62,10 @@ class BackupService:
|
||||
|
||||
def __init__(self, bot=None):
|
||||
self.bot = bot
|
||||
self.backup_dir = Path(settings.SQLITE_PATH).parent / "backups"
|
||||
self.backup_dir.mkdir(exist_ok=True)
|
||||
self.backup_dir = Path(settings.BACKUP_LOCATION).expanduser().resolve()
|
||||
self.backup_dir.mkdir(parents=True, exist_ok=True)
|
||||
self.data_dir = self.backup_dir.parent
|
||||
self.archive_format_version = "2.0"
|
||||
self._auto_backup_task = None
|
||||
self._settings = self._load_settings()
|
||||
|
||||
@@ -161,289 +166,545 @@ class BackupService:
|
||||
return timedelta(hours=hours)
|
||||
|
||||
async def create_backup(
|
||||
self,
|
||||
self,
|
||||
created_by: Optional[int] = None,
|
||||
compress: bool = True,
|
||||
include_logs: bool = None
|
||||
) -> Tuple[bool, str, Optional[str]]:
|
||||
try:
|
||||
logger.info("📄 Начинаем создание бекапа...")
|
||||
|
||||
|
||||
if include_logs is None:
|
||||
include_logs = self._settings.include_logs
|
||||
|
||||
models_to_backup = self.backup_models_ordered.copy()
|
||||
if not include_logs and MonitoringLog in models_to_backup:
|
||||
models_to_backup.remove(MonitoringLog)
|
||||
elif include_logs and MonitoringLog not in models_to_backup:
|
||||
models_to_backup.append(MonitoringLog)
|
||||
|
||||
backup_data = {}
|
||||
association_data = {}
|
||||
total_records = 0
|
||||
|
||||
async for db in get_db():
|
||||
try:
|
||||
for model in models_to_backup:
|
||||
table_name = model.__tablename__
|
||||
logger.info(f"📊 Экспортируем таблицу: {table_name}")
|
||||
|
||||
query = select(model)
|
||||
|
||||
if model == User:
|
||||
query = query.options(selectinload(User.subscription))
|
||||
elif model == Subscription:
|
||||
query = query.options(selectinload(Subscription.user))
|
||||
elif model == Transaction:
|
||||
query = query.options(selectinload(Transaction.user))
|
||||
|
||||
result = await db.execute(query)
|
||||
records = result.scalars().all()
|
||||
|
||||
table_data = []
|
||||
for record in records:
|
||||
record_dict = {}
|
||||
for column in model.__table__.columns:
|
||||
value = getattr(record, column.name)
|
||||
|
||||
if value is None:
|
||||
record_dict[column.name] = None
|
||||
elif isinstance(value, datetime):
|
||||
record_dict[column.name] = value.isoformat()
|
||||
elif isinstance(value, (list, dict)):
|
||||
record_dict[column.name] = json_lib.dumps(value) if value else None
|
||||
elif hasattr(value, '__dict__'):
|
||||
record_dict[column.name] = str(value)
|
||||
else:
|
||||
record_dict[column.name] = value
|
||||
|
||||
table_data.append(record_dict)
|
||||
|
||||
backup_data[table_name] = table_data
|
||||
total_records += len(table_data)
|
||||
|
||||
logger.info(f"✅ Экспортировано {len(table_data)} записей из {table_name}")
|
||||
|
||||
association_data = await self._export_association_tables(db)
|
||||
for records in association_data.values():
|
||||
total_records += len(records)
|
||||
overview = await self._collect_database_overview()
|
||||
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error(f"Ошибка при экспорте данных: {e}")
|
||||
raise e
|
||||
finally:
|
||||
await db.close()
|
||||
|
||||
metadata = BackupMetadata(
|
||||
timestamp=datetime.utcnow().isoformat(),
|
||||
database_type="postgresql" if settings.is_postgresql() else "sqlite",
|
||||
backup_type="full",
|
||||
tables_count=len(models_to_backup) + len(association_data),
|
||||
total_records=total_records,
|
||||
compressed=compress,
|
||||
created_by=created_by,
|
||||
file_size_bytes=0
|
||||
)
|
||||
|
||||
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
|
||||
filename = f"backup_{timestamp}.json"
|
||||
if compress:
|
||||
filename += ".gz"
|
||||
|
||||
archive_suffix = ".tar.gz" if compress else ".tar"
|
||||
filename = f"backup_{timestamp}{archive_suffix}"
|
||||
backup_path = self.backup_dir / filename
|
||||
|
||||
file_snapshots = await self._collect_file_snapshots()
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
temp_path = Path(temp_dir)
|
||||
staging_dir = temp_path / "backup"
|
||||
staging_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
backup_structure = {
|
||||
"metadata": asdict(metadata),
|
||||
"data": backup_data,
|
||||
"associations": association_data,
|
||||
"files": file_snapshots,
|
||||
"config": {
|
||||
"backup_settings": asdict(self._settings)
|
||||
database_info = await self._dump_database(staging_dir)
|
||||
database_info.setdefault("tables_count", overview.get("tables_count", 0))
|
||||
database_info.setdefault("total_records", overview.get("total_records", 0))
|
||||
files_info = await self._collect_files(staging_dir, include_logs=include_logs)
|
||||
data_snapshot_info = await self._collect_data_snapshot(staging_dir)
|
||||
|
||||
metadata = {
|
||||
"format_version": self.archive_format_version,
|
||||
"timestamp": datetime.utcnow().isoformat(),
|
||||
"database_type": "postgresql" if settings.is_postgresql() else "sqlite",
|
||||
"backup_type": "full",
|
||||
"tables_count": overview.get("tables_count", 0),
|
||||
"total_records": overview.get("total_records", 0),
|
||||
"compressed": True,
|
||||
"created_by": created_by,
|
||||
"database": database_info,
|
||||
"files": files_info,
|
||||
"data_snapshot": data_snapshot_info,
|
||||
"settings": asdict(self._settings),
|
||||
}
|
||||
}
|
||||
|
||||
if compress:
|
||||
backup_json_str = json_lib.dumps(backup_structure, ensure_ascii=False, indent=2)
|
||||
async with aiofiles.open(backup_path, 'wb') as f:
|
||||
compressed_data = gzip.compress(backup_json_str.encode('utf-8'))
|
||||
await f.write(compressed_data)
|
||||
else:
|
||||
async with aiofiles.open(backup_path, 'w', encoding='utf-8') as f:
|
||||
await f.write(json_lib.dumps(backup_structure, ensure_ascii=False, indent=2))
|
||||
|
||||
|
||||
metadata_path = staging_dir / "metadata.json"
|
||||
async with aiofiles.open(metadata_path, "w", encoding="utf-8") as meta_file:
|
||||
await meta_file.write(json_lib.dumps(metadata, ensure_ascii=False, indent=2))
|
||||
|
||||
mode = "w:gz" if compress else "w"
|
||||
with tarfile.open(backup_path, mode) as tar:
|
||||
for item in staging_dir.iterdir():
|
||||
tar.add(item, arcname=item.name)
|
||||
|
||||
file_size = backup_path.stat().st_size
|
||||
backup_structure["metadata"]["file_size_bytes"] = file_size
|
||||
|
||||
if compress:
|
||||
backup_json_str = json_lib.dumps(backup_structure, ensure_ascii=False, indent=2)
|
||||
async with aiofiles.open(backup_path, 'wb') as f:
|
||||
compressed_data = gzip.compress(backup_json_str.encode('utf-8'))
|
||||
await f.write(compressed_data)
|
||||
else:
|
||||
async with aiofiles.open(backup_path, 'w', encoding='utf-8') as f:
|
||||
await f.write(json_lib.dumps(backup_structure, ensure_ascii=False, indent=2))
|
||||
|
||||
|
||||
await self._cleanup_old_backups()
|
||||
|
||||
|
||||
size_mb = file_size / 1024 / 1024
|
||||
message = (f"✅ Бекап успешно создан!\n"
|
||||
f"📁 Файл: {filename}\n"
|
||||
f"📊 Таблиц: {metadata.tables_count}\n"
|
||||
f"📈 Записей: {total_records:,}\n"
|
||||
f"📊 Таблиц: {overview.get('tables_count', 0)}\n"
|
||||
f"📈 Записей: {overview.get('total_records', 0):,}\n"
|
||||
f"💾 Размер: {size_mb:.2f} MB")
|
||||
|
||||
|
||||
logger.info(message)
|
||||
|
||||
|
||||
if self.bot:
|
||||
await self._send_backup_notification(
|
||||
"success", message, str(backup_path)
|
||||
)
|
||||
|
||||
await self._send_backup_file_to_chat(str(backup_path))
|
||||
|
||||
|
||||
return True, message, str(backup_path)
|
||||
|
||||
|
||||
except Exception as e:
|
||||
error_msg = f"❌ Ошибка создания бекапа: {str(e)}"
|
||||
logger.error(error_msg, exc_info=True)
|
||||
|
||||
|
||||
if self.bot:
|
||||
await self._send_backup_notification("error", error_msg)
|
||||
|
||||
|
||||
return False, error_msg, None
|
||||
|
||||
async def restore_backup(
|
||||
self,
|
||||
self,
|
||||
backup_file_path: str,
|
||||
clear_existing: bool = False
|
||||
) -> Tuple[bool, str]:
|
||||
try:
|
||||
logger.info(f"📄 Начинаем восстановление из {backup_file_path}")
|
||||
|
||||
|
||||
backup_path = Path(backup_file_path)
|
||||
if not backup_path.exists():
|
||||
return False, f"❌ Файл бекапа не найден: {backup_file_path}"
|
||||
|
||||
if backup_path.suffix == '.gz':
|
||||
async with aiofiles.open(backup_path, 'rb') as f:
|
||||
compressed_data = await f.read()
|
||||
uncompressed_data = gzip.decompress(compressed_data).decode('utf-8')
|
||||
backup_structure = json_lib.loads(uncompressed_data)
|
||||
|
||||
if self._is_archive_backup(backup_path):
|
||||
success, message = await self._restore_from_archive(backup_path, clear_existing)
|
||||
else:
|
||||
async with aiofiles.open(backup_path, 'r', encoding='utf-8') as f:
|
||||
file_content = await f.read()
|
||||
backup_structure = json_lib.loads(file_content)
|
||||
|
||||
metadata = backup_structure.get("metadata", {})
|
||||
backup_data = backup_structure.get("data", {})
|
||||
association_data = backup_structure.get("associations", {})
|
||||
file_snapshots = backup_structure.get("files", {})
|
||||
|
||||
if not backup_data:
|
||||
return False, "❌ Файл бекапа не содержит данных"
|
||||
|
||||
logger.info(f"📊 Загружен бекап от {metadata.get('timestamp')}")
|
||||
logger.info(f"📈 Содержит {metadata.get('total_records', 0)} записей")
|
||||
|
||||
restored_records = 0
|
||||
restored_tables = 0
|
||||
|
||||
async for db in get_db():
|
||||
try:
|
||||
if clear_existing:
|
||||
logger.warning("🗑️ Очищаем существующие данные...")
|
||||
await self._clear_database_tables(db)
|
||||
|
||||
models_by_table = {model.__tablename__: model for model in self.backup_models_ordered}
|
||||
success, message = await self._restore_from_legacy(backup_path, clear_existing)
|
||||
|
||||
pre_restore_tables = {"promo_groups"}
|
||||
for table_name in pre_restore_tables:
|
||||
model = models_by_table.get(table_name)
|
||||
if not model:
|
||||
continue
|
||||
|
||||
records = backup_data.get(table_name, [])
|
||||
if not records:
|
||||
continue
|
||||
|
||||
logger.info(f"🔥 Восстанавливаем таблицу {table_name} ({len(records)} записей)")
|
||||
restored = await self._restore_table_records(db, model, table_name, records, clear_existing)
|
||||
restored_records += restored
|
||||
|
||||
if restored:
|
||||
restored_tables += 1
|
||||
logger.info(f"✅ Таблица {table_name} восстановлена")
|
||||
|
||||
await self._restore_users_without_referrals(db, backup_data, models_by_table)
|
||||
|
||||
for model in self.backup_models_ordered:
|
||||
table_name = model.__tablename__
|
||||
|
||||
if table_name == "users" or table_name in pre_restore_tables:
|
||||
continue
|
||||
|
||||
records = backup_data.get(table_name, [])
|
||||
if not records:
|
||||
continue
|
||||
|
||||
logger.info(f"🔥 Восстанавливаем таблицу {table_name} ({len(records)} записей)")
|
||||
restored = await self._restore_table_records(db, model, table_name, records, clear_existing)
|
||||
restored_records += restored
|
||||
|
||||
if restored:
|
||||
restored_tables += 1
|
||||
logger.info(f"✅ Таблица {table_name} восстановлена")
|
||||
|
||||
await self._update_user_referrals(db, backup_data)
|
||||
|
||||
assoc_tables, assoc_records = await self._restore_association_tables(
|
||||
db,
|
||||
association_data,
|
||||
clear_existing
|
||||
)
|
||||
restored_tables += assoc_tables
|
||||
restored_records += assoc_records
|
||||
|
||||
await db.commit()
|
||||
|
||||
break
|
||||
|
||||
except Exception as e:
|
||||
await db.rollback()
|
||||
logger.error(f"Ошибка при восстановлении: {e}")
|
||||
raise e
|
||||
finally:
|
||||
await db.close()
|
||||
|
||||
message = (f"✅ Восстановление завершено!\n"
|
||||
f"📊 Таблиц: {restored_tables}\n"
|
||||
f"📈 Записей: {restored_records:,}\n"
|
||||
f"📅 Дата бекапа: {metadata.get('timestamp', 'неизвестно')}")
|
||||
|
||||
logger.info(message)
|
||||
|
||||
if self.bot:
|
||||
if success and self.bot:
|
||||
await self._send_backup_notification("restore_success", message)
|
||||
elif not success and self.bot:
|
||||
await self._send_backup_notification("restore_error", message)
|
||||
|
||||
return success, message
|
||||
|
||||
if file_snapshots:
|
||||
restored_files = await self._restore_file_snapshots(file_snapshots)
|
||||
if restored_files:
|
||||
logger.info(f"📁 Восстановлено файлов конфигурации: {restored_files}")
|
||||
|
||||
return True, message
|
||||
|
||||
except Exception as e:
|
||||
error_msg = f"❌ Ошибка восстановления: {str(e)}"
|
||||
logger.error(error_msg, exc_info=True)
|
||||
|
||||
|
||||
if self.bot:
|
||||
await self._send_backup_notification("restore_error", error_msg)
|
||||
|
||||
|
||||
return False, error_msg
|
||||
|
||||
async def _collect_database_overview(self) -> Dict[str, Any]:
|
||||
overview: Dict[str, Any] = {
|
||||
"tables_count": 0,
|
||||
"total_records": 0,
|
||||
"tables": [],
|
||||
}
|
||||
|
||||
try:
|
||||
async with engine.begin() as conn:
|
||||
table_names = await conn.run_sync(
|
||||
lambda sync_conn: inspect(sync_conn).get_table_names()
|
||||
)
|
||||
|
||||
for table_name in table_names:
|
||||
try:
|
||||
result = await conn.execute(text(f"SELECT COUNT(*) FROM {table_name}"))
|
||||
count = result.scalar_one()
|
||||
except Exception:
|
||||
count = 0
|
||||
|
||||
overview["tables"].append({"name": table_name, "rows": count})
|
||||
overview["total_records"] += count
|
||||
|
||||
overview["tables_count"] = len(table_names)
|
||||
except Exception as exc:
|
||||
logger.warning("Не удалось собрать статистику по БД: %s", exc)
|
||||
|
||||
return overview
|
||||
|
||||
async def _dump_database(self, staging_dir: Path) -> Dict[str, Any]:
|
||||
if settings.is_postgresql():
|
||||
dump_path = staging_dir / "database.sql"
|
||||
await self._dump_postgres(dump_path)
|
||||
size = dump_path.stat().st_size if dump_path.exists() else 0
|
||||
return {
|
||||
"type": "postgresql",
|
||||
"path": dump_path.name,
|
||||
"size_bytes": size,
|
||||
}
|
||||
|
||||
dump_path = staging_dir / "database.sqlite"
|
||||
await self._dump_sqlite(dump_path)
|
||||
size = dump_path.stat().st_size if dump_path.exists() else 0
|
||||
return {
|
||||
"type": "sqlite",
|
||||
"path": dump_path.name,
|
||||
"size_bytes": size,
|
||||
}
|
||||
|
||||
async def _dump_postgres(self, dump_path: Path):
|
||||
env = os.environ.copy()
|
||||
env.update({
|
||||
"PGHOST": settings.POSTGRES_HOST,
|
||||
"PGPORT": str(settings.POSTGRES_PORT),
|
||||
"PGUSER": settings.POSTGRES_USER,
|
||||
"PGPASSWORD": settings.POSTGRES_PASSWORD,
|
||||
})
|
||||
|
||||
command = [
|
||||
"pg_dump",
|
||||
"--format=plain",
|
||||
"--no-owner",
|
||||
"--no-privileges",
|
||||
settings.POSTGRES_DB,
|
||||
]
|
||||
|
||||
logger.info("📦 Экспорт PostgreSQL через pg_dump...")
|
||||
dump_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
with dump_path.open("wb") as dump_file:
|
||||
process = await asyncio.create_subprocess_exec(
|
||||
*command,
|
||||
stdout=dump_file,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
env=env,
|
||||
)
|
||||
_, stderr = await process.communicate()
|
||||
|
||||
if process.returncode != 0:
|
||||
error_text = stderr.decode() if stderr else "pg_dump error"
|
||||
raise RuntimeError(f"pg_dump завершился с ошибкой: {error_text}")
|
||||
|
||||
logger.info("✅ PostgreSQL dump создан (%s)", dump_path)
|
||||
|
||||
async def _dump_sqlite(self, dump_path: Path):
|
||||
sqlite_path = Path(settings.SQLITE_PATH)
|
||||
if not sqlite_path.exists():
|
||||
raise FileNotFoundError(f"SQLite база данных не найдена по пути {sqlite_path}")
|
||||
|
||||
dump_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
await asyncio.to_thread(shutil.copy2, sqlite_path, dump_path)
|
||||
logger.info("✅ SQLite база данных скопирована (%s)", dump_path)
|
||||
|
||||
async def _collect_files(self, staging_dir: Path, include_logs: bool) -> List[Dict[str, Any]]:
|
||||
files_info: List[Dict[str, Any]] = []
|
||||
files_dir = staging_dir / "files"
|
||||
files_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
app_config_path = settings.get_app_config_path()
|
||||
if app_config_path:
|
||||
src = Path(app_config_path)
|
||||
if src.exists():
|
||||
dest = files_dir / src.name
|
||||
await asyncio.to_thread(shutil.copy2, src, dest)
|
||||
files_info.append({
|
||||
"path": str(src),
|
||||
"relative_path": f"files/{src.name}",
|
||||
})
|
||||
|
||||
if include_logs and settings.LOG_FILE:
|
||||
log_path = Path(settings.LOG_FILE)
|
||||
if log_path.exists():
|
||||
dest = files_dir / log_path.name
|
||||
await asyncio.to_thread(shutil.copy2, log_path, dest)
|
||||
files_info.append({
|
||||
"path": str(log_path),
|
||||
"relative_path": f"files/{log_path.name}",
|
||||
})
|
||||
|
||||
if not files_info and files_dir.exists():
|
||||
files_dir.rmdir()
|
||||
|
||||
return files_info
|
||||
|
||||
async def _collect_data_snapshot(self, staging_dir: Path) -> Dict[str, Any]:
|
||||
data_dir = staging_dir / "data"
|
||||
snapshot_info: Dict[str, Any] = {
|
||||
"path": str(self.data_dir),
|
||||
"items": 0,
|
||||
}
|
||||
|
||||
if not self.data_dir.exists():
|
||||
return snapshot_info
|
||||
|
||||
counter = {"items": 0}
|
||||
|
||||
def _copy_data():
|
||||
data_dir.mkdir(parents=True, exist_ok=True)
|
||||
for item in self.data_dir.iterdir():
|
||||
if item.resolve() == self.backup_dir.resolve():
|
||||
continue
|
||||
|
||||
destination = data_dir / item.name
|
||||
if item.is_dir():
|
||||
shutil.copytree(item, destination, dirs_exist_ok=True)
|
||||
else:
|
||||
shutil.copy2(item, destination)
|
||||
counter["items"] += 1
|
||||
|
||||
await asyncio.to_thread(_copy_data)
|
||||
snapshot_info["items"] = counter["items"]
|
||||
return snapshot_info
|
||||
|
||||
def _is_archive_backup(self, backup_path: Path) -> bool:
|
||||
suffixes = backup_path.suffixes
|
||||
if (len(suffixes) >= 2 and suffixes[-2:] == [".tar", ".gz"]) or (suffixes and suffixes[-1] == ".tar"):
|
||||
return True
|
||||
try:
|
||||
return tarfile.is_tarfile(backup_path)
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
async def _restore_from_archive(
|
||||
self,
|
||||
backup_path: Path,
|
||||
clear_existing: bool,
|
||||
) -> Tuple[bool, str]:
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
temp_path = Path(temp_dir)
|
||||
|
||||
mode = "r:gz" if backup_path.suffixes and backup_path.suffixes[-1] == ".gz" else "r"
|
||||
with tarfile.open(backup_path, mode) as tar:
|
||||
tar.extractall(temp_path)
|
||||
|
||||
metadata_path = temp_path / "metadata.json"
|
||||
if not metadata_path.exists():
|
||||
return False, "❌ Метаданные бекапа отсутствуют"
|
||||
|
||||
async with aiofiles.open(metadata_path, "r", encoding="utf-8") as meta_file:
|
||||
metadata = json_lib.loads(await meta_file.read())
|
||||
|
||||
logger.info("📊 Загружен бекап формата %s", metadata.get("format_version", "unknown"))
|
||||
|
||||
database_info = metadata.get("database", {})
|
||||
data_snapshot_info = metadata.get("data_snapshot", {})
|
||||
files_info = metadata.get("files", [])
|
||||
|
||||
if database_info.get("type") == "postgresql":
|
||||
dump_file = temp_path / database_info.get("path", "database.sql")
|
||||
await self._restore_postgres(dump_file, clear_existing)
|
||||
else:
|
||||
dump_file = temp_path / database_info.get("path", "database.sqlite")
|
||||
await self._restore_sqlite(dump_file, clear_existing)
|
||||
|
||||
data_dir = temp_path / "data"
|
||||
if data_dir.exists():
|
||||
await self._restore_data_snapshot(data_dir, clear_existing)
|
||||
|
||||
if files_info:
|
||||
await self._restore_files(files_info, temp_path)
|
||||
|
||||
message = (f"✅ Восстановление завершено!\n"
|
||||
f"📊 Таблиц: {metadata.get('tables_count', 0)}\n"
|
||||
f"📈 Записей: {metadata.get('total_records', 0):,}\n"
|
||||
f"📅 Дата бекапа: {metadata.get('timestamp', 'неизвестно')}")
|
||||
|
||||
logger.info(message)
|
||||
return True, message
|
||||
|
||||
async def _restore_postgres(self, dump_path: Path, clear_existing: bool):
|
||||
if not dump_path.exists():
|
||||
raise FileNotFoundError(f"Dump PostgreSQL не найден: {dump_path}")
|
||||
|
||||
env = os.environ.copy()
|
||||
env.update({
|
||||
"PGHOST": settings.POSTGRES_HOST,
|
||||
"PGPORT": str(settings.POSTGRES_PORT),
|
||||
"PGUSER": settings.POSTGRES_USER,
|
||||
"PGPASSWORD": settings.POSTGRES_PASSWORD,
|
||||
})
|
||||
|
||||
if clear_existing:
|
||||
logger.info("🗑️ Полная очистка схемы PostgreSQL перед восстановлением")
|
||||
drop_command = [
|
||||
"psql",
|
||||
settings.POSTGRES_DB,
|
||||
"-c",
|
||||
"DROP SCHEMA public CASCADE; CREATE SCHEMA public; GRANT ALL ON SCHEMA public TO public;",
|
||||
]
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
*drop_command,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
env=env,
|
||||
)
|
||||
_, stderr = await proc.communicate()
|
||||
if proc.returncode != 0:
|
||||
raise RuntimeError(f"Не удалось очистить схему: {stderr.decode()}")
|
||||
|
||||
logger.info("📥 Восстановление PostgreSQL через psql...")
|
||||
restore_command = [
|
||||
"psql",
|
||||
settings.POSTGRES_DB,
|
||||
"-f",
|
||||
str(dump_path),
|
||||
]
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
*restore_command,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
env=env,
|
||||
)
|
||||
stdout, stderr = await proc.communicate()
|
||||
|
||||
if proc.returncode != 0:
|
||||
raise RuntimeError(f"Ошибка psql: {stderr.decode()}")
|
||||
|
||||
logger.info("✅ PostgreSQL восстановлен (%s)", dump_path)
|
||||
|
||||
async def _restore_sqlite(self, dump_path: Path, clear_existing: bool):
|
||||
if not dump_path.exists():
|
||||
raise FileNotFoundError(f"SQLite файл не найден: {dump_path}")
|
||||
|
||||
target_path = Path(settings.SQLITE_PATH)
|
||||
target_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
if clear_existing and target_path.exists():
|
||||
target_path.unlink()
|
||||
|
||||
await asyncio.to_thread(shutil.copy2, dump_path, target_path)
|
||||
logger.info("✅ SQLite база восстановлена (%s)", target_path)
|
||||
|
||||
async def _restore_data_snapshot(self, source_dir: Path, clear_existing: bool):
|
||||
if not source_dir.exists():
|
||||
return
|
||||
|
||||
def _restore():
|
||||
self.data_dir.mkdir(parents=True, exist_ok=True)
|
||||
for item in source_dir.iterdir():
|
||||
if item.name == self.backup_dir.name:
|
||||
continue
|
||||
|
||||
destination = self.data_dir / item.name
|
||||
if clear_existing and destination.exists():
|
||||
if destination.is_dir():
|
||||
shutil.rmtree(destination)
|
||||
else:
|
||||
destination.unlink()
|
||||
|
||||
if item.is_dir():
|
||||
shutil.copytree(item, destination, dirs_exist_ok=True)
|
||||
else:
|
||||
shutil.copy2(item, destination)
|
||||
|
||||
await asyncio.to_thread(_restore)
|
||||
logger.info("📁 Снимок директории data восстановлен")
|
||||
|
||||
async def _restore_files(self, files_info: List[Dict[str, Any]], temp_path: Path):
|
||||
for file_info in files_info:
|
||||
relative_path = file_info.get("relative_path")
|
||||
target_path = Path(file_info.get("path", ""))
|
||||
if not relative_path or not target_path:
|
||||
continue
|
||||
|
||||
source_file = temp_path / relative_path
|
||||
if not source_file.exists():
|
||||
logger.warning("Файл %s отсутствует в архиве", relative_path)
|
||||
continue
|
||||
|
||||
target_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
await asyncio.to_thread(shutil.copy2, source_file, target_path)
|
||||
logger.info("📁 Файл %s восстановлен", target_path)
|
||||
|
||||
async def _restore_from_legacy(
|
||||
self,
|
||||
backup_path: Path,
|
||||
clear_existing: bool,
|
||||
) -> Tuple[bool, str]:
|
||||
if backup_path.suffix == '.gz':
|
||||
async with aiofiles.open(backup_path, 'rb') as f:
|
||||
compressed_data = await f.read()
|
||||
uncompressed_data = gzip.decompress(compressed_data).decode('utf-8')
|
||||
backup_structure = json_lib.loads(uncompressed_data)
|
||||
else:
|
||||
async with aiofiles.open(backup_path, 'r', encoding='utf-8') as f:
|
||||
file_content = await f.read()
|
||||
backup_structure = json_lib.loads(file_content)
|
||||
|
||||
metadata = backup_structure.get("metadata", {})
|
||||
backup_data = backup_structure.get("data", {})
|
||||
association_data = backup_structure.get("associations", {})
|
||||
file_snapshots = backup_structure.get("files", {})
|
||||
|
||||
if not backup_data:
|
||||
return False, "❌ Файл бекапа не содержит данных"
|
||||
|
||||
logger.info(f"📊 Загружен legacy-бекап от {metadata.get('timestamp')}")
|
||||
logger.info(f"📈 Содержит {metadata.get('total_records', 0)} записей")
|
||||
|
||||
restored_records = 0
|
||||
restored_tables = 0
|
||||
|
||||
async for db in get_db():
|
||||
try:
|
||||
if clear_existing:
|
||||
logger.warning("🗑️ Очищаем существующие данные...")
|
||||
await self._clear_database_tables(db)
|
||||
|
||||
models_by_table = {model.__tablename__: model for model in self.backup_models_ordered}
|
||||
|
||||
pre_restore_tables = {"promo_groups"}
|
||||
for table_name in pre_restore_tables:
|
||||
model = models_by_table.get(table_name)
|
||||
if not model:
|
||||
continue
|
||||
|
||||
records = backup_data.get(table_name, [])
|
||||
if not records:
|
||||
continue
|
||||
|
||||
logger.info(f"🔥 Восстанавливаем таблицу {table_name} ({len(records)} записей)")
|
||||
restored = await self._restore_table_records(db, model, table_name, records, clear_existing)
|
||||
restored_records += restored
|
||||
|
||||
if restored:
|
||||
restored_tables += 1
|
||||
logger.info(f"✅ Таблица {table_name} восстановлена")
|
||||
|
||||
await self._restore_users_without_referrals(db, backup_data, models_by_table)
|
||||
|
||||
for model in self.backup_models_ordered:
|
||||
table_name = model.__tablename__
|
||||
|
||||
if table_name == "users" or table_name in pre_restore_tables:
|
||||
continue
|
||||
|
||||
records = backup_data.get(table_name, [])
|
||||
if not records:
|
||||
continue
|
||||
|
||||
logger.info(f"🔥 Восстанавливаем таблицу {table_name} ({len(records)} записей)")
|
||||
restored = await self._restore_table_records(db, model, table_name, records, clear_existing)
|
||||
restored_records += restored
|
||||
|
||||
if restored:
|
||||
restored_tables += 1
|
||||
logger.info(f"✅ Таблица {table_name} восстановлена")
|
||||
|
||||
await self._update_user_referrals(db, backup_data)
|
||||
|
||||
assoc_tables, assoc_records = await self._restore_association_tables(
|
||||
db,
|
||||
association_data,
|
||||
clear_existing
|
||||
)
|
||||
restored_tables += assoc_tables
|
||||
restored_records += assoc_records
|
||||
|
||||
await db.commit()
|
||||
|
||||
break
|
||||
|
||||
except Exception as e:
|
||||
await db.rollback()
|
||||
logger.error(f"Ошибка при восстановлении: {e}")
|
||||
raise e
|
||||
finally:
|
||||
await db.close()
|
||||
|
||||
if file_snapshots:
|
||||
restored_files = await self._restore_file_snapshots(file_snapshots)
|
||||
if restored_files:
|
||||
logger.info(f"📁 Восстановлено файлов конфигурации: {restored_files}")
|
||||
|
||||
message = (f"✅ Восстановление завершено!\n"
|
||||
f"📊 Таблиц: {restored_tables}\n"
|
||||
f"📈 Записей: {restored_records:,}\n"
|
||||
f"📅 Дата бекапа: {metadata.get('timestamp', 'неизвестно')}")
|
||||
|
||||
logger.info(message)
|
||||
return True, message
|
||||
|
||||
async def _restore_users_without_referrals(self, db: AsyncSession, backup_data: dict, models_by_table: dict):
|
||||
users_data = backup_data.get("users", [])
|
||||
if not users_data:
|
||||
@@ -792,34 +1053,49 @@ class BackupService:
|
||||
backups = []
|
||||
|
||||
try:
|
||||
for backup_file in sorted(self.backup_dir.glob("backup_*.json*"), reverse=True):
|
||||
for backup_file in sorted(self.backup_dir.glob("backup_*"), reverse=True):
|
||||
if not backup_file.is_file():
|
||||
continue
|
||||
|
||||
try:
|
||||
if backup_file.suffix == '.gz':
|
||||
with gzip.open(backup_file, 'rt', encoding='utf-8') as f:
|
||||
backup_structure = json_lib.load(f)
|
||||
metadata = {}
|
||||
|
||||
if self._is_archive_backup(backup_file):
|
||||
mode = "r:gz" if backup_file.suffixes and backup_file.suffixes[-1] == ".gz" else "r"
|
||||
with tarfile.open(backup_file, mode) as tar:
|
||||
try:
|
||||
member = tar.getmember("metadata.json")
|
||||
with tar.extractfile(member) as meta_file:
|
||||
metadata = json_lib.load(meta_file)
|
||||
except KeyError:
|
||||
metadata = {}
|
||||
else:
|
||||
with open(backup_file, 'r', encoding='utf-8') as f:
|
||||
backup_structure = json_lib.load(f)
|
||||
|
||||
metadata = backup_structure.get("metadata", {})
|
||||
if backup_file.suffix == '.gz':
|
||||
with gzip.open(backup_file, 'rt', encoding='utf-8') as f:
|
||||
backup_structure = json_lib.load(f)
|
||||
else:
|
||||
with open(backup_file, 'r', encoding='utf-8') as f:
|
||||
backup_structure = json_lib.load(f)
|
||||
metadata = backup_structure.get("metadata", {})
|
||||
|
||||
file_stats = backup_file.stat()
|
||||
|
||||
|
||||
backup_info = {
|
||||
"filename": backup_file.name,
|
||||
"filepath": str(backup_file),
|
||||
"timestamp": metadata.get("timestamp"),
|
||||
"tables_count": metadata.get("tables_count", 0),
|
||||
"total_records": metadata.get("total_records", 0),
|
||||
"compressed": metadata.get("compressed", False),
|
||||
"timestamp": metadata.get("timestamp", datetime.fromtimestamp(file_stats.st_mtime).isoformat()),
|
||||
"tables_count": metadata.get("tables_count", metadata.get("database", {}).get("tables_count", 0)),
|
||||
"total_records": metadata.get("total_records", metadata.get("database", {}).get("total_records", 0)),
|
||||
"compressed": self._is_archive_backup(backup_file) or backup_file.suffix == '.gz',
|
||||
"file_size_bytes": file_stats.st_size,
|
||||
"file_size_mb": round(file_stats.st_size / 1024 / 1024, 2),
|
||||
"created_by": metadata.get("created_by"),
|
||||
"database_type": metadata.get("database_type", "unknown"),
|
||||
"version": metadata.get("version", "1.0")
|
||||
"database_type": metadata.get("database_type", metadata.get("database", {}).get("type", "unknown")),
|
||||
"version": metadata.get("format_version", metadata.get("version", "1.0")),
|
||||
}
|
||||
|
||||
|
||||
backups.append(backup_info)
|
||||
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Ошибка чтения метаданных {backup_file}: {e}")
|
||||
file_stats = backup_file.stat()
|
||||
|
||||
Reference in New Issue
Block a user