fix: expand backup coverage to all 68 models and harden restore

- Add 37 missing models to backup (payment providers, polls, contests,
  wheel, FAQ, promo offers, webhooks, configs, menu buttons, etc.)
- Add tariff_promo_groups and payment_method_promo_groups association tables
- Replace hardcoded association restore with generic handler
- Fix transaction atomicity: flush instead of commit in inner methods,
  remove inner rollback calls, single commit/rollback in outer handler
- Fix composite PK support for UserPromoGroup (was only detecting first PK)
- Fix duplicate insert bug when clear_existing=True and record already exists
- Add cabinet_refresh_tokens to clear list, fix support_audit_logs deletion order
- Add Time column parsing for ReferralContest.daily_summary_time
- Security: tarfile filter='data', path traversal protection in _restore_files
  and delete_backup, os.sep in startswith checks
This commit is contained in:
Fringg
2026-02-11 03:35:16 +03:00
parent 19dabf3851
commit 02e40bd6f7

View File

@@ -7,7 +7,7 @@ import shutil
import tarfile
import tempfile
from dataclasses import asdict, dataclass
from datetime import datetime, timedelta
from datetime import datetime, time as dt_time, timedelta
from pathlib import Path
from typing import Any
@@ -24,14 +24,41 @@ from app.database.models import (
AdvertisingCampaign,
AdvertisingCampaignRegistration,
BroadcastHistory,
ButtonClickLog,
CloudPaymentsPayment,
ContestAttempt,
ContestRound,
ContestTemplate,
CryptoBotPayment,
DiscountOffer,
FaqPage,
FaqSetting,
FreekassaPayment,
HeleketPayment,
KassaAiPayment,
MainMenuButton,
MenuLayoutHistory,
MonitoringLog,
MulenPayPayment,
Pal24Payment,
PaymentMethodConfig,
PinnedMessage,
PlategaPayment,
Poll,
PollAnswer,
PollOption,
PollQuestion,
PollResponse,
PrivacyPolicy,
PromoCode,
PromoCodeUse,
PromoGroup,
PromoOfferLog,
PromoOfferTemplate,
PublicOffer,
ReferralContest,
ReferralContestEvent,
ReferralContestVirtualParticipant,
ReferralEarning,
SentNotification,
ServerSquad,
@@ -39,19 +66,33 @@ from app.database.models import (
Squad,
Subscription,
SubscriptionConversion,
SubscriptionEvent,
SubscriptionServer,
SubscriptionTemporaryAccess,
SupportAuditLog,
SystemSetting,
Tariff,
Ticket,
TicketMessage,
TicketNotification,
TrafficPurchase,
Transaction,
User,
UserMessage,
UserPromoGroup,
WataPayment,
WebApiToken,
Webhook,
WebhookDelivery,
WelcomeText,
WheelConfig,
WheelPrize,
WheelSpin,
WithdrawalRequest,
YooKassaPayment,
payment_method_promo_groups,
server_squad_promo_groups,
tariff_promo_groups,
)
@@ -122,6 +163,53 @@ class BackupService:
TicketMessage,
SupportAuditLog,
WebApiToken,
# --- Payment providers (FK: users, transactions) ---
HeleketPayment,
WataPayment,
PlategaPayment,
CloudPaymentsPayment,
FreekassaPayment,
KassaAiPayment,
# --- Settings/content ---
PaymentMethodConfig,
PrivacyPolicy,
PublicOffer,
FaqSetting,
FaqPage,
PinnedMessage,
MainMenuButton,
MenuLayoutHistory,
# --- User data (FK: users, promo_groups, subscriptions) ---
UserPromoGroup,
TrafficPurchase,
SubscriptionEvent,
SubscriptionTemporaryAccess,
PromoOfferTemplate,
PromoOfferLog,
# --- Referral/contests (FK: users) ---
WithdrawalRequest,
ReferralContest,
ReferralContestEvent,
ReferralContestVirtualParticipant,
ContestTemplate,
ContestRound,
ContestAttempt,
# --- Polls (FK chain: polls -> questions -> options -> answers) ---
Poll,
PollQuestion,
PollOption,
PollResponse,
PollAnswer,
# --- Webhooks ---
Webhook,
WebhookDelivery,
# --- Wheel (FK chain: configs -> prizes -> spins) ---
WheelConfig,
WheelPrize,
WheelSpin,
# --- Support ---
TicketNotification,
ButtonClickLog,
]
self.backup_models_ordered = self._base_backup_models.copy()
@@ -131,6 +219,8 @@ class BackupService:
self.association_tables = {
'server_squad_promo_groups': server_squad_promo_groups,
'tariff_promo_groups': tariff_promo_groups,
'payment_method_promo_groups': payment_method_promo_groups,
}
def _load_settings(self) -> BackupSettings:
@@ -538,7 +628,7 @@ class BackupService:
except Exception as exc:
logger.error('Ошибка при экспорте данных: %s', exc)
raise exc
raise
async def _collect_files(self, staging_dir: Path, include_logs: bool) -> list[dict[str, Any]]:
files_info: list[dict[str, Any]] = []
@@ -623,7 +713,7 @@ class BackupService:
mode = 'r:gz' if backup_path.suffixes and backup_path.suffixes[-1] == '.gz' else 'r'
with tarfile.open(backup_path, mode) as tar:
tar.extractall(temp_path)
tar.extractall(temp_path, filter='data')
metadata_path = temp_path / 'metadata.json'
if not metadata_path.exists():
@@ -785,20 +875,31 @@ class BackupService:
logger.info('📁 Снимок директории data восстановлен')
async def _restore_files(self, files_info: list[dict[str, Any]], temp_path: Path):
allowed_base = self.data_dir.resolve()
for file_info in files_info:
relative_path = file_info.get('relative_path')
target_path = Path(file_info.get('path', ''))
if not relative_path or not target_path:
continue
source_file = temp_path / relative_path
target_resolved = target_path.resolve()
if not str(target_resolved).startswith(str(allowed_base) + os.sep) and target_resolved != allowed_base:
logger.warning('Заблокирована запись за пределами data_dir: %s', target_path)
continue
source_file = (temp_path / relative_path).resolve()
if not str(source_file).startswith(str(temp_path.resolve()) + os.sep):
logger.warning('Path traversal в relative_path: %s', relative_path)
continue
if not source_file.exists():
logger.warning('Файл %s отсутствует в архиве', relative_path)
continue
target_path.parent.mkdir(parents=True, exist_ok=True)
await asyncio.to_thread(shutil.copy2, source_file, target_path)
logger.info('📁 Файл %s восстановлен', target_path)
target_resolved.parent.mkdir(parents=True, exist_ok=True)
await asyncio.to_thread(shutil.copy2, source_file, target_resolved)
logger.info('📁 Файл %s восстановлен', target_resolved)
async def _restore_database_payload(
self,
@@ -914,7 +1015,7 @@ class BackupService:
except Exception as exc:
await db.rollback()
logger.error('Ошибка при восстановлении: %s', exc)
raise exc
raise
return restored_tables, restored_records
@@ -994,10 +1095,9 @@ class BackupService:
except Exception as e:
logger.error(f'Ошибка при восстановлении пользователя: {e}')
await db.rollback()
raise e
raise
await db.commit()
await db.flush()
logger.info('✅ Пользователи без реферальных связей восстановлены')
async def _update_user_referrals(self, db: AsyncSession, backup_data: dict):
@@ -1031,7 +1131,7 @@ class BackupService:
logger.error(f'Ошибка при обновлении реферальной связи: {e}')
continue
await db.commit()
await db.flush()
logger.info('✅ Реферальные связи обновлены')
def _process_record_data(self, record_data: dict, model, table_name: str) -> dict:
@@ -1058,6 +1158,12 @@ class BackupService:
except (ValueError, TypeError) as e:
logger.warning(f'Не удалось парсить дату {value} для поля {key}: {e}')
processed_data[key] = datetime.utcnow()
elif column_type_str == 'TIME' and isinstance(value, str):
try:
processed_data[key] = dt_time.fromisoformat(value)
except (ValueError, TypeError) as e:
logger.warning(f'Не удалось парсить время {value} для поля {key}: {e}')
processed_data[key] = dt_time(hour=12, minute=0)
elif ('BOOLEAN' in column_type_str or 'BOOL' in column_type_str) and isinstance(value, str):
processed_data[key] = value.lower() in ('true', '1', 'yes', 'on')
elif (
@@ -1089,11 +1195,8 @@ class BackupService:
return processed_data
def _get_primary_key_column(self, model) -> str | None:
for col in model.__table__.columns:
if col.primary_key:
return col.name
return None
def _get_primary_key_columns(self, model) -> list[str]:
return [col.name for col in model.__table__.columns if col.primary_key]
async def _export_association_tables(self, db: AsyncSession) -> dict[str, list[dict[str, Any]]]:
association_data: dict[str, list[dict[str, Any]]] = {}
@@ -1119,63 +1222,60 @@ class BackupService:
restored_tables = 0
restored_records = 0
if 'server_squad_promo_groups' in association_data:
restored = await self._restore_server_squad_promo_groups(
db, association_data['server_squad_promo_groups'], clear_existing
for table_name, table_obj in self.association_tables.items():
if table_name not in association_data:
continue
col_names = [col.name for col in table_obj.columns]
restored = await self._restore_association_table(
db, table_obj, table_name, association_data[table_name], clear_existing, col_names
)
restored_tables += 1
restored_records += restored
return restored_tables, restored_records
async def _restore_server_squad_promo_groups(
self, db: AsyncSession, records: list[dict[str, Any]], clear_existing: bool
async def _restore_association_table(
self,
db: AsyncSession,
table_obj,
table_name: str,
records: list[dict[str, Any]],
clear_existing: bool,
col_names: list[str],
) -> int:
if not records:
return 0
if clear_existing:
await db.execute(server_squad_promo_groups.delete())
await db.execute(table_obj.delete())
restored = 0
for record in records:
server_id = record.get('server_squad_id')
promo_id = record.get('promo_group_id')
values = {col: record.get(col) for col in col_names}
if server_id is None or promo_id is None:
logger.warning('Пропущена некорректная запись server_squad_promo_groups: %s', record)
if any(v is None for v in values.values()):
logger.warning('Пропущена некорректная запись %s: %s', table_name, record)
continue
try:
first_col = col_names[0]
exists_stmt = (
select(server_squad_promo_groups.c.server_squad_id)
.where(
server_squad_promo_groups.c.server_squad_id == server_id,
server_squad_promo_groups.c.promo_group_id == promo_id,
)
select(table_obj.c[first_col])
.where(*[table_obj.c[col] == values[col] for col in col_names])
.limit(1)
)
existing = await db.execute(exists_stmt)
if existing.scalar_one_or_none() is not None:
logger.debug(
'Запись server_squad_promo_groups (%s, %s) уже существует',
server_id,
promo_id,
)
logger.debug('Запись %s %s уже существует', table_name, values)
continue
await db.execute(
server_squad_promo_groups.insert().values(server_squad_id=server_id, promo_group_id=promo_id)
)
await db.execute(table_obj.insert().values(**values))
restored += 1
except Exception as e:
logger.error(
'Ошибка при восстановлении связи server_squad_promo_groups (%s, %s): %s', server_id, promo_id, e
)
await db.rollback()
raise e
logger.error('Ошибка при восстановлении связи %s %s: %s', table_name, values, e)
raise
return restored
@@ -1205,17 +1305,16 @@ class BackupService:
logger.warning(f'⚠️ Тариф {tariff_id} не найден, устанавливаем tariff_id=NULL для подписки')
processed_data['tariff_id'] = None
primary_key_col = self._get_primary_key_column(model)
pk_cols = self._get_primary_key_columns(model)
if primary_key_col and primary_key_col in processed_data:
existing_record = await db.execute(
select(model).where(getattr(model, primary_key_col) == processed_data[primary_key_col])
)
if pk_cols and all(col in processed_data for col in pk_cols):
where_clause = [getattr(model, col) == processed_data[col] for col in pk_cols]
existing_record = await db.execute(select(model).where(*where_clause))
existing = existing_record.scalar_one_or_none()
if existing and not clear_existing:
if existing:
for key, value in processed_data.items():
if key != primary_key_col:
if key not in pk_cols:
setattr(existing, key, value)
else:
instance = model(**processed_data)
@@ -1229,17 +1328,69 @@ class BackupService:
except Exception as e:
logger.error(f'Ошибка восстановления записи в {table_name}: {e}')
logger.error(f'Проблемные данные: {record_data}')
await db.rollback()
raise e
raise
return restored_count
async def _clear_database_tables(self, db: AsyncSession, backup_data: dict[str, Any] | None = None):
tables_order = [
# --- Association tables (no FK deps on them, safe to delete first) ---
'server_squad_promo_groups',
'tariff_promo_groups',
'payment_method_promo_groups',
# --- Polls (child -> parent order) ---
'poll_answers',
'poll_responses',
'poll_options',
'poll_questions',
'polls',
# --- Wheel (child -> parent) ---
'wheel_spins',
'wheel_prizes',
'wheel_configs',
# --- Contests (child -> parent) ---
'contest_attempts',
'contest_rounds',
'contest_templates',
'referral_contest_virtual_participants',
'referral_contest_events',
'referral_contests',
# --- Webhooks ---
'webhook_deliveries',
'webhooks',
# --- Promo offers ---
'promo_offer_logs',
'promo_offer_templates',
'subscription_temporary_access',
# --- User engagement ---
'subscription_events',
'traffic_purchases',
'user_promo_groups',
'withdrawal_requests',
# --- Support extras ---
'ticket_notifications',
'button_click_logs',
# --- Payment providers ---
'heleket_payments',
'wata_payments',
'platega_payments',
'cloudpayments_payments',
'freekassa_payments',
'kassa_ai_payments',
# --- Content/config ---
'pinned_messages',
'main_menu_buttons',
'menu_layout_history',
'faq_pages',
'faq_settings',
'privacy_policies',
'public_offers',
'payment_method_configs',
# --- Original tables (preserved order) ---
'support_audit_logs',
'ticket_messages',
'tickets',
'support_audit_logs',
'cabinet_refresh_tokens',
'advertising_campaign_registrations',
'advertising_campaigns',
'subscription_servers',
@@ -1408,7 +1559,9 @@ class BackupService:
async def delete_backup(self, backup_filename: str) -> tuple[bool, str]:
try:
backup_path = self.backup_dir / backup_filename
backup_path = (self.backup_dir / backup_filename).resolve()
if not str(backup_path).startswith(str(self.backup_dir.resolve()) + os.sep):
return False, '❌ Недопустимое имя файла бекапа'
if not backup_path.exists():
return False, f'❌ Файл бекапа не найден: {backup_filename}'