Files
remnawave-bedolaga-telegram…/app/database/universal_migration.py
2025-08-26 03:14:26 +03:00

186 lines
7.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
from sqlalchemy import text, inspect
from sqlalchemy.ext.asyncio import AsyncSession
from app.database.database import engine
logger = logging.getLogger(__name__)
async def get_database_type():
"""Определяет тип базы данных"""
return engine.dialect.name
async def check_unique_constraint_exists():
"""Проверяет, существует ли ограничение уникальности на user_id"""
try:
async with engine.begin() as conn:
db_type = await get_database_type()
if db_type == 'sqlite':
result = await conn.execute(text("PRAGMA table_info(subscriptions)"))
columns = result.fetchall()
check_result = await conn.execute(text("""
SELECT user_id, COUNT(*) as count
FROM subscriptions
GROUP BY user_id
HAVING COUNT(*) > 1
LIMIT 1
"""))
duplicates = check_result.fetchall()
return len(duplicates) == 0
elif db_type == 'postgresql':
result = await conn.execute(text("""
SELECT constraint_name
FROM information_schema.table_constraints
WHERE table_name = 'subscriptions'
AND constraint_type = 'UNIQUE'
AND constraint_name LIKE '%user_id%'
"""))
constraints = result.fetchall()
return len(constraints) > 0
elif db_type == 'mysql':
result = await conn.execute(text("""
SELECT CONSTRAINT_NAME
FROM information_schema.TABLE_CONSTRAINTS
WHERE TABLE_NAME = 'subscriptions'
AND CONSTRAINT_TYPE = 'UNIQUE'
AND CONSTRAINT_NAME LIKE '%user_id%'
"""))
constraints = result.fetchall()
return len(constraints) > 0
return False
except Exception as e:
logger.error(f"Ошибка проверки ограничения уникальности: {e}")
return False
async def fix_subscription_duplicates_universal():
"""Универсальная функция очистки дубликатов для разных типов БД"""
async with engine.begin() as conn:
db_type = await get_database_type()
logger.info(f"Обнаружен тип базы данных: {db_type}")
try:
result = await conn.execute(text("""
SELECT user_id, COUNT(*) as count
FROM subscriptions
GROUP BY user_id
HAVING COUNT(*) > 1
"""))
duplicates = result.fetchall()
if not duplicates:
logger.info("Дублирующихся подписок не найдено")
return 0
logger.info(f"Найдено {len(duplicates)} пользователей с дублирующимися подписками")
total_deleted = 0
for user_id_row, count in duplicates:
user_id = user_id_row
if db_type == 'sqlite':
delete_result = await conn.execute(text("""
DELETE FROM subscriptions
WHERE user_id = :user_id AND id NOT IN (
SELECT MAX(id)
FROM subscriptions
WHERE user_id = :user_id
)
"""), {"user_id": user_id})
elif db_type in ['postgresql', 'mysql']:
delete_result = await conn.execute(text("""
DELETE FROM subscriptions
WHERE user_id = :user_id AND id NOT IN (
SELECT max_id FROM (
SELECT MAX(id) as max_id
FROM subscriptions
WHERE user_id = :user_id
) as subquery
)
"""), {"user_id": user_id})
else:
subs_result = await conn.execute(text("""
SELECT id FROM subscriptions
WHERE user_id = :user_id
ORDER BY created_at DESC, id DESC
"""), {"user_id": user_id})
sub_ids = [row[0] for row in subs_result.fetchall()]
if len(sub_ids) > 1:
ids_to_delete = sub_ids[1:]
for sub_id in ids_to_delete:
await conn.execute(text("""
DELETE FROM subscriptions WHERE id = :id
"""), {"id": sub_id})
delete_result = type('Result', (), {'rowcount': len(ids_to_delete)})()
else:
delete_result = type('Result', (), {'rowcount': 0})()
deleted_count = delete_result.rowcount
total_deleted += deleted_count
logger.info(f"Удалено {deleted_count} дублирующихся подписок для пользователя {user_id}")
logger.info(f"Всего удалено дублирующихся подписок: {total_deleted}")
return total_deleted
except Exception as e:
logger.error(f"Ошибка при очистке дублирующихся подписок: {e}")
raise
async def run_universal_migration():
"""Запускает универсальную миграцию"""
logger.info("=== НАЧАЛО УНИВЕРСАЛЬНОЙ МИГРАЦИИ ПОДПИСОК ===")
try:
db_type = await get_database_type()
logger.info(f"Тип базы данных: {db_type}")
async with engine.begin() as conn:
total_subs = await conn.execute(text("SELECT COUNT(*) FROM subscriptions"))
unique_users = await conn.execute(text("SELECT COUNT(DISTINCT user_id) FROM subscriptions"))
total_count = total_subs.fetchone()[0]
unique_count = unique_users.fetchone()[0]
logger.info(f"Всего подписок: {total_count}")
logger.info(f"Уникальных пользователей: {unique_count}")
if total_count == unique_count:
logger.info("База данных уже в корректном состоянии")
return True
deleted_count = await fix_subscription_duplicates_universal()
async with engine.begin() as conn:
final_check = await conn.execute(text("""
SELECT user_id, COUNT(*) as count
FROM subscriptions
GROUP BY user_id
HAVING COUNT(*) > 1
"""))
remaining_duplicates = final_check.fetchall()
if remaining_duplicates:
logger.warning(f"Остались дубликаты у {len(remaining_duplicates)} пользователей")
return False
else:
logger.info("=== МИГРАЦИЯ ЗАВЕРШЕНА УСПЕШНО ===")
return True
except Exception as e:
logger.error(f"=== ОШИБКА ВЫПОЛНЕНИЯ МИГРАЦИИ: {e} ===")
return False