Files
remnawave-bedolaga-telegram…/app/database/universal_migration.py
2025-09-12 21:27:34 +03:00

777 lines
37 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
from sqlalchemy import text, inspect
from sqlalchemy.ext.asyncio import AsyncSession
from app.database.database import engine
logger = logging.getLogger(__name__)
async def get_database_type():
return engine.dialect.name
async def check_table_exists(table_name: str) -> bool:
try:
async with engine.begin() as conn:
db_type = await get_database_type()
if db_type == 'sqlite':
result = await conn.execute(text(f"""
SELECT name FROM sqlite_master
WHERE type='table' AND name='{table_name}'
"""))
return result.fetchone() is not None
elif db_type == 'postgresql':
result = await conn.execute(text("""
SELECT table_name FROM information_schema.tables
WHERE table_schema = 'public' AND table_name = :table_name
"""), {"table_name": table_name})
return result.fetchone() is not None
elif db_type == 'mysql':
result = await conn.execute(text("""
SELECT table_name FROM information_schema.tables
WHERE table_schema = DATABASE() AND table_name = :table_name
"""), {"table_name": table_name})
return result.fetchone() is not None
return False
except Exception as e:
logger.error(f"Ошибка проверки существования таблицы {table_name}: {e}")
return False
async def check_column_exists(table_name: str, column_name: str) -> bool:
try:
async with engine.begin() as conn:
db_type = await get_database_type()
if db_type == 'sqlite':
result = await conn.execute(text(f"PRAGMA table_info({table_name})"))
columns = result.fetchall()
return any(col[1] == column_name for col in columns)
elif db_type == 'postgresql':
result = await conn.execute(text("""
SELECT column_name
FROM information_schema.columns
WHERE table_name = :table_name
AND column_name = :column_name
"""), {"table_name": table_name, "column_name": column_name})
return result.fetchone() is not None
elif db_type == 'mysql':
result = await conn.execute(text("""
SELECT COLUMN_NAME
FROM information_schema.COLUMNS
WHERE TABLE_NAME = :table_name
AND COLUMN_NAME = :column_name
"""), {"table_name": table_name, "column_name": column_name})
return result.fetchone() is not None
return False
except Exception as e:
logger.error(f"Ошибка проверки существования колонки {column_name}: {e}")
return False
async def create_cryptobot_payments_table():
table_exists = await check_table_exists('cryptobot_payments')
if table_exists:
logger.info("Таблица cryptobot_payments уже существует")
return True
try:
async with engine.begin() as conn:
db_type = await get_database_type()
if db_type == 'sqlite':
create_sql = """
CREATE TABLE cryptobot_payments (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
invoice_id VARCHAR(255) UNIQUE NOT NULL,
amount VARCHAR(50) NOT NULL,
asset VARCHAR(10) NOT NULL,
status VARCHAR(50) NOT NULL,
description TEXT NULL,
payload TEXT NULL,
bot_invoice_url TEXT NULL,
mini_app_invoice_url TEXT NULL,
web_app_invoice_url TEXT NULL,
paid_at DATETIME NULL,
transaction_id INTEGER NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id),
FOREIGN KEY (transaction_id) REFERENCES transactions(id)
);
CREATE INDEX idx_cryptobot_payments_user_id ON cryptobot_payments(user_id);
CREATE INDEX idx_cryptobot_payments_invoice_id ON cryptobot_payments(invoice_id);
CREATE INDEX idx_cryptobot_payments_status ON cryptobot_payments(status);
"""
elif db_type == 'postgresql':
create_sql = """
CREATE TABLE cryptobot_payments (
id SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL,
invoice_id VARCHAR(255) UNIQUE NOT NULL,
amount VARCHAR(50) NOT NULL,
asset VARCHAR(10) NOT NULL,
status VARCHAR(50) NOT NULL,
description TEXT NULL,
payload TEXT NULL,
bot_invoice_url TEXT NULL,
mini_app_invoice_url TEXT NULL,
web_app_invoice_url TEXT NULL,
paid_at TIMESTAMP NULL,
transaction_id INTEGER NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id),
FOREIGN KEY (transaction_id) REFERENCES transactions(id)
);
CREATE INDEX idx_cryptobot_payments_user_id ON cryptobot_payments(user_id);
CREATE INDEX idx_cryptobot_payments_invoice_id ON cryptobot_payments(invoice_id);
CREATE INDEX idx_cryptobot_payments_status ON cryptobot_payments(status);
"""
elif db_type == 'mysql':
create_sql = """
CREATE TABLE cryptobot_payments (
id INT AUTO_INCREMENT PRIMARY KEY,
user_id INT NOT NULL,
invoice_id VARCHAR(255) UNIQUE NOT NULL,
amount VARCHAR(50) NOT NULL,
asset VARCHAR(10) NOT NULL,
status VARCHAR(50) NOT NULL,
description TEXT NULL,
payload TEXT NULL,
bot_invoice_url TEXT NULL,
mini_app_invoice_url TEXT NULL,
web_app_invoice_url TEXT NULL,
paid_at DATETIME NULL,
transaction_id INT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id),
FOREIGN KEY (transaction_id) REFERENCES transactions(id)
);
CREATE INDEX idx_cryptobot_payments_user_id ON cryptobot_payments(user_id);
CREATE INDEX idx_cryptobot_payments_invoice_id ON cryptobot_payments(invoice_id);
CREATE INDEX idx_cryptobot_payments_status ON cryptobot_payments(status);
"""
else:
logger.error(f"Неподдерживаемый тип БД для создания таблицы: {db_type}")
return False
await conn.execute(text(create_sql))
logger.info("Таблица cryptobot_payments успешно создана")
return True
except Exception as e:
logger.error(f"Ошибка создания таблицы cryptobot_payments: {e}")
return False
async def create_user_messages_table():
table_exists = await check_table_exists('user_messages')
if table_exists:
logger.info("Таблица user_messages уже существует")
return True
try:
async with engine.begin() as conn:
db_type = await get_database_type()
if db_type == 'sqlite':
create_sql = """
CREATE TABLE user_messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
message_text TEXT NOT NULL,
is_active BOOLEAN DEFAULT 1,
sort_order INTEGER DEFAULT 0,
created_by INTEGER NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (created_by) REFERENCES users(id) ON DELETE SET NULL
);
CREATE INDEX idx_user_messages_active ON user_messages(is_active);
CREATE INDEX idx_user_messages_sort ON user_messages(sort_order, created_at);
"""
elif db_type == 'postgresql':
create_sql = """
CREATE TABLE user_messages (
id SERIAL PRIMARY KEY,
message_text TEXT NOT NULL,
is_active BOOLEAN DEFAULT TRUE,
sort_order INTEGER DEFAULT 0,
created_by INTEGER NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (created_by) REFERENCES users(id) ON DELETE SET NULL
);
CREATE INDEX idx_user_messages_active ON user_messages(is_active);
CREATE INDEX idx_user_messages_sort ON user_messages(sort_order, created_at);
"""
elif db_type == 'mysql':
create_sql = """
CREATE TABLE user_messages (
id INT AUTO_INCREMENT PRIMARY KEY,
message_text TEXT NOT NULL,
is_active BOOLEAN DEFAULT TRUE,
sort_order INT DEFAULT 0,
created_by INT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
FOREIGN KEY (created_by) REFERENCES users(id) ON DELETE SET NULL
);
CREATE INDEX idx_user_messages_active ON user_messages(is_active);
CREATE INDEX idx_user_messages_sort ON user_messages(sort_order, created_at);
"""
else:
logger.error(f"Неподдерживаемый тип БД для создания таблицы: {db_type}")
return False
await conn.execute(text(create_sql))
logger.info("Таблица user_messages успешно создана")
return True
except Exception as e:
logger.error(f"Ошибка создания таблицы user_messages: {e}")
return False
async def add_welcome_text_is_enabled_column():
column_exists = await check_column_exists('welcome_texts', 'is_enabled')
if column_exists:
logger.info("Колонка is_enabled уже существует в таблице welcome_texts")
return True
try:
async with engine.begin() as conn:
db_type = await get_database_type()
if db_type == 'sqlite':
alter_sql = "ALTER TABLE welcome_texts ADD COLUMN is_enabled BOOLEAN DEFAULT 1 NOT NULL"
elif db_type == 'postgresql':
alter_sql = "ALTER TABLE welcome_texts ADD COLUMN is_enabled BOOLEAN DEFAULT TRUE NOT NULL"
elif db_type == 'mysql':
alter_sql = "ALTER TABLE welcome_texts ADD COLUMN is_enabled BOOLEAN DEFAULT TRUE NOT NULL"
else:
logger.error(f"Неподдерживаемый тип БД для добавления колонки: {db_type}")
return False
await conn.execute(text(alter_sql))
logger.info("✅ Поле is_enabled добавлено в таблицу welcome_texts")
if db_type == 'sqlite':
update_sql = "UPDATE welcome_texts SET is_enabled = 1 WHERE is_enabled IS NULL"
else:
update_sql = "UPDATE welcome_texts SET is_enabled = TRUE WHERE is_enabled IS NULL"
result = await conn.execute(text(update_sql))
updated_count = result.rowcount
logger.info(f"Обновлено {updated_count} существующих записей welcome_texts")
return True
except Exception as e:
logger.error(f"Ошибка при добавлении поля is_enabled: {e}")
return False
async def create_welcome_texts_table():
table_exists = await check_table_exists('welcome_texts')
if table_exists:
logger.info("Таблица welcome_texts уже существует")
return await add_welcome_text_is_enabled_column()
try:
async with engine.begin() as conn:
db_type = await get_database_type()
if db_type == 'sqlite':
create_sql = """
CREATE TABLE welcome_texts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
text_content TEXT NOT NULL,
is_active BOOLEAN DEFAULT 1,
is_enabled BOOLEAN DEFAULT 1 NOT NULL,
created_by INTEGER NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (created_by) REFERENCES users(id) ON DELETE SET NULL
);
CREATE INDEX idx_welcome_texts_active ON welcome_texts(is_active);
CREATE INDEX idx_welcome_texts_enabled ON welcome_texts(is_enabled);
CREATE INDEX idx_welcome_texts_updated ON welcome_texts(updated_at);
"""
elif db_type == 'postgresql':
create_sql = """
CREATE TABLE welcome_texts (
id SERIAL PRIMARY KEY,
text_content TEXT NOT NULL,
is_active BOOLEAN DEFAULT TRUE,
is_enabled BOOLEAN DEFAULT TRUE NOT NULL,
created_by INTEGER NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (created_by) REFERENCES users(id) ON DELETE SET NULL
);
CREATE INDEX idx_welcome_texts_active ON welcome_texts(is_active);
CREATE INDEX idx_welcome_texts_enabled ON welcome_texts(is_enabled);
CREATE INDEX idx_welcome_texts_updated ON welcome_texts(updated_at);
"""
elif db_type == 'mysql':
create_sql = """
CREATE TABLE welcome_texts (
id INT AUTO_INCREMENT PRIMARY KEY,
text_content TEXT NOT NULL,
is_active BOOLEAN DEFAULT TRUE,
is_enabled BOOLEAN DEFAULT TRUE NOT NULL,
created_by INT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
FOREIGN KEY (created_by) REFERENCES users(id) ON DELETE SET NULL
);
CREATE INDEX idx_welcome_texts_active ON welcome_texts(is_active);
CREATE INDEX idx_welcome_texts_enabled ON welcome_texts(is_enabled);
CREATE INDEX idx_welcome_texts_updated ON welcome_texts(updated_at);
"""
else:
logger.error(f"Неподдерживаемый тип БД для создания таблицы: {db_type}")
return False
await conn.execute(text(create_sql))
logger.info("✅ Таблица welcome_texts успешно создана с полем is_enabled")
return True
except Exception as e:
logger.error(f"Ошибка создания таблицы welcome_texts: {e}")
return False
async def fix_foreign_keys_for_user_deletion():
try:
async with engine.begin() as conn:
db_type = await get_database_type()
if db_type == 'postgresql':
try:
await conn.execute(text("""
ALTER TABLE user_messages
DROP CONSTRAINT IF EXISTS user_messages_created_by_fkey;
"""))
await conn.execute(text("""
ALTER TABLE user_messages
ADD CONSTRAINT user_messages_created_by_fkey
FOREIGN KEY (created_by) REFERENCES users(id) ON DELETE SET NULL;
"""))
logger.info("Обновлен внешний ключ user_messages.created_by")
except Exception as e:
logger.warning(f"Ошибка обновления FK user_messages: {e}")
try:
await conn.execute(text("""
ALTER TABLE promocodes
DROP CONSTRAINT IF EXISTS promocodes_created_by_fkey;
"""))
await conn.execute(text("""
ALTER TABLE promocodes
ADD CONSTRAINT promocodes_created_by_fkey
FOREIGN KEY (created_by) REFERENCES users(id) ON DELETE SET NULL;
"""))
logger.info("Обновлен внешний ключ promocodes.created_by")
except Exception as e:
logger.warning(f"Ошибка обновления FK promocodes: {e}")
logger.info("Внешние ключи обновлены для безопасного удаления пользователей")
return True
except Exception as e:
logger.error(f"Ошибка обновления внешних ключей: {e}")
return False
async def add_referral_system_columns():
logger.info("=== МИГРАЦИЯ РЕФЕРАЛЬНОЙ СИСТЕМЫ ===")
try:
async with engine.begin() as conn:
db_type = await get_database_type()
column_exists = await check_column_exists('users', 'has_made_first_topup')
if not column_exists:
logger.info("Добавление колонки has_made_first_topup в таблицу users")
if db_type == 'sqlite':
column_def = 'BOOLEAN DEFAULT 0'
else:
column_def = 'BOOLEAN DEFAULT FALSE'
await conn.execute(text(f"ALTER TABLE users ADD COLUMN has_made_first_topup {column_def}"))
logger.info("Колонка has_made_first_topup успешно добавлена")
logger.info("Обновление существующих пользователей...")
if db_type == 'sqlite':
update_sql = """
UPDATE users
SET has_made_first_topup = 1
WHERE balance_kopeks > 0 OR has_had_paid_subscription = 1
"""
else:
update_sql = """
UPDATE users
SET has_made_first_topup = TRUE
WHERE balance_kopeks > 0 OR has_had_paid_subscription = TRUE
"""
result = await conn.execute(text(update_sql))
updated_count = result.rowcount
logger.info(f"Обновлено {updated_count} пользователей с has_made_first_topup = TRUE")
logger.info("✅ Миграция реферальной системы завершена")
return True
else:
logger.info("Колонка has_made_first_topup уже существует")
return True
except Exception as e:
logger.error(f"Ошибка миграции реферальной системы: {e}")
return False
async def create_subscription_conversions_table():
table_exists = await check_table_exists('subscription_conversions')
if table_exists:
logger.info("Таблица subscription_conversions уже существует")
return True
try:
async with engine.begin() as conn:
db_type = await get_database_type()
if db_type == 'sqlite':
create_sql = """
CREATE TABLE subscription_conversions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
converted_at DATETIME DEFAULT CURRENT_TIMESTAMP,
trial_duration_days INTEGER NULL,
payment_method VARCHAR(50) NULL,
first_payment_amount_kopeks INTEGER NULL,
first_paid_period_days INTEGER NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id)
);
CREATE INDEX idx_subscription_conversions_user_id ON subscription_conversions(user_id);
CREATE INDEX idx_subscription_conversions_converted_at ON subscription_conversions(converted_at);
"""
elif db_type == 'postgresql':
create_sql = """
CREATE TABLE subscription_conversions (
id SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL,
converted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
trial_duration_days INTEGER NULL,
payment_method VARCHAR(50) NULL,
first_payment_amount_kopeks INTEGER NULL,
first_paid_period_days INTEGER NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id)
);
CREATE INDEX idx_subscription_conversions_user_id ON subscription_conversions(user_id);
CREATE INDEX idx_subscription_conversions_converted_at ON subscription_conversions(converted_at);
"""
elif db_type == 'mysql':
create_sql = """
CREATE TABLE subscription_conversions (
id INT AUTO_INCREMENT PRIMARY KEY,
user_id INT NOT NULL,
converted_at DATETIME DEFAULT CURRENT_TIMESTAMP,
trial_duration_days INT NULL,
payment_method VARCHAR(50) NULL,
first_payment_amount_kopeks INT NULL,
first_paid_period_days INT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id)
);
CREATE INDEX idx_subscription_conversions_user_id ON subscription_conversions(user_id);
CREATE INDEX idx_subscription_conversions_converted_at ON subscription_conversions(converted_at);
"""
else:
logger.error(f"Неподдерживаемый тип БД для создания таблицы: {db_type}")
return False
await conn.execute(text(create_sql))
logger.info("✅ Таблица subscription_conversions успешно создана")
return True
except Exception as e:
logger.error(f"Ошибка создания таблицы subscription_conversions: {e}")
return False
async def fix_subscription_duplicates_universal():
async with engine.begin() as conn:
db_type = await get_database_type()
logger.info(f"Обнаружен тип базы данных: {db_type}")
try:
result = await conn.execute(text("""
SELECT user_id, COUNT(*) as count
FROM subscriptions
GROUP BY user_id
HAVING COUNT(*) > 1
"""))
duplicates = result.fetchall()
if not duplicates:
logger.info("Дублирующихся подписок не найдено")
return 0
logger.info(f"Найдено {len(duplicates)} пользователей с дублирующимися подписками")
total_deleted = 0
for user_id_row, count in duplicates:
user_id = user_id_row
if db_type == 'sqlite':
delete_result = await conn.execute(text("""
DELETE FROM subscriptions
WHERE user_id = :user_id AND id NOT IN (
SELECT MAX(id)
FROM subscriptions
WHERE user_id = :user_id
)
"""), {"user_id": user_id})
elif db_type in ['postgresql', 'mysql']:
delete_result = await conn.execute(text("""
DELETE FROM subscriptions
WHERE user_id = :user_id AND id NOT IN (
SELECT max_id FROM (
SELECT MAX(id) as max_id
FROM subscriptions
WHERE user_id = :user_id
) as subquery
)
"""), {"user_id": user_id})
else:
subs_result = await conn.execute(text("""
SELECT id FROM subscriptions
WHERE user_id = :user_id
ORDER BY created_at DESC, id DESC
"""), {"user_id": user_id})
sub_ids = [row[0] for row in subs_result.fetchall()]
if len(sub_ids) > 1:
ids_to_delete = sub_ids[1:]
for sub_id in ids_to_delete:
await conn.execute(text("""
DELETE FROM subscriptions WHERE id = :id
"""), {"id": sub_id})
delete_result = type('Result', (), {'rowcount': len(ids_to_delete)})()
else:
delete_result = type('Result', (), {'rowcount': 0})()
deleted_count = delete_result.rowcount
total_deleted += deleted_count
logger.info(f"Удалено {deleted_count} дублирующихся подписок для пользователя {user_id}")
logger.info(f"Всего удалено дублирующихся подписок: {total_deleted}")
return total_deleted
except Exception as e:
logger.error(f"Ошибка при очистке дублирующихся подписок: {e}")
raise
async def run_universal_migration():
logger.info("=== НАЧАЛО УНИВЕРСАЛЬНОЙ МИГРАЦИИ ===")
try:
db_type = await get_database_type()
logger.info(f"Тип базы данных: {db_type}")
referral_migration_success = await add_referral_system_columns()
if not referral_migration_success:
logger.warning("⚠️ Проблемы с миграцией реферальной системы")
logger.info("=== СОЗДАНИЕ ТАБЛИЦЫ CRYPTOBOT ===")
cryptobot_created = await create_cryptobot_payments_table()
if cryptobot_created:
logger.info("✅ Таблица CryptoBot payments готова")
else:
logger.warning("⚠️ Проблемы с таблицей CryptoBot payments")
logger.info("=== СОЗДАНИЕ ТАБЛИЦЫ USER_MESSAGES ===")
user_messages_created = await create_user_messages_table()
if user_messages_created:
logger.info("✅ Таблица user_messages готова")
else:
logger.warning("⚠️ Проблемы с таблицей user_messages")
logger.info("=== СОЗДАНИЕ/ОБНОВЛЕНИЕ ТАБЛИЦЫ WELCOME_TEXTS ===")
welcome_texts_created = await create_welcome_texts_table()
if welcome_texts_created:
logger.info("✅ Таблица welcome_texts готова с полем is_enabled")
else:
logger.warning("⚠️ Проблемы с таблицей welcome_texts")
logger.info("=== ОБНОВЛЕНИЕ ВНЕШНИХ КЛЮЧЕЙ ===")
fk_updated = await fix_foreign_keys_for_user_deletion()
if fk_updated:
logger.info("✅ Внешние ключи обновлены")
else:
logger.warning("⚠️ Проблемы с обновлением внешних ключей")
logger.info("=== СОЗДАНИЕ ТАБЛИЦЫ КОНВЕРСИЙ ПОДПИСОК ===")
conversions_created = await create_subscription_conversions_table()
if conversions_created:
logger.info("✅ Таблица subscription_conversions готова")
else:
logger.warning("⚠️ Проблемы с таблицей subscription_conversions")
async with engine.begin() as conn:
total_subs = await conn.execute(text("SELECT COUNT(*) FROM subscriptions"))
unique_users = await conn.execute(text("SELECT COUNT(DISTINCT user_id) FROM subscriptions"))
total_count = total_subs.fetchone()[0]
unique_count = unique_users.fetchone()[0]
logger.info(f"Всего подписок: {total_count}")
logger.info(f"Уникальных пользователей: {unique_count}")
if total_count == unique_count:
logger.info("База данных уже в корректном состоянии")
logger.info("=== МИГРАЦИЯ ЗАВЕРШЕНА УСПЕШНО ===")
return True
deleted_count = await fix_subscription_duplicates_universal()
async with engine.begin() as conn:
final_check = await conn.execute(text("""
SELECT user_id, COUNT(*) as count
FROM subscriptions
GROUP BY user_id
HAVING COUNT(*) > 1
"""))
remaining_duplicates = final_check.fetchall()
if remaining_duplicates:
logger.warning(f"Остались дубликаты у {len(remaining_duplicates)} пользователей")
return False
else:
logger.info("=== МИГРАЦИЯ ЗАВЕРШЕНА УСПЕШНО ===")
logger.info("✅ Реферальная система обновлена")
logger.info("✅ CryptoBot таблица готова")
logger.info("✅ Таблица конверсий подписок создана")
logger.info("✅ Таблица welcome_texts с полем is_enabled готова")
logger.info("✅ Дубликаты подписок исправлены")
return True
except Exception as e:
logger.error(f"=== ОШИБКА ВЫПОЛНЕНИЯ МИГРАЦИИ: {e} ===")
return False
async def check_migration_status():
logger.info("=== ПРОВЕРКА СТАТУСА МИГРАЦИЙ ===")
try:
status = {
"has_made_first_topup_column": False,
"cryptobot_table": False,
"user_messages_table": False,
"welcome_texts_table": False,
"welcome_texts_is_enabled_column": False,
"subscription_duplicates": False,
"subscription_conversions_table": False
}
status["has_made_first_topup_column"] = await check_column_exists('users', 'has_made_first_topup')
status["cryptobot_table"] = await check_table_exists('cryptobot_payments')
status["user_messages_table"] = await check_table_exists('user_messages')
status["welcome_texts_table"] = await check_table_exists('welcome_texts')
status["subscription_conversions_table"] = await check_table_exists('subscription_conversions')
status["welcome_texts_is_enabled_column"] = await check_column_exists('welcome_texts', 'is_enabled')
async with engine.begin() as conn:
duplicates_check = await conn.execute(text("""
SELECT COUNT(*) FROM (
SELECT user_id, COUNT(*) as count
FROM subscriptions
GROUP BY user_id
HAVING COUNT(*) > 1
) as dups
"""))
duplicates_count = duplicates_check.fetchone()[0]
status["subscription_duplicates"] = (duplicates_count == 0)
check_names = {
"has_made_first_topup_column": "Колонка реферальной системы",
"cryptobot_table": "Таблица CryptoBot payments",
"user_messages_table": "Таблица пользовательских сообщений",
"welcome_texts_table": "Таблица приветственных текстов",
"welcome_texts_is_enabled_column": "Поле is_enabled в welcome_texts",
"subscription_conversions_table": "Таблица конверсий подписок",
"subscription_duplicates": "Отсутствие дубликатов подписок"
}
for check_key, check_status in status.items():
check_name = check_names.get(check_key, check_key)
icon = "" if check_status else ""
logger.info(f"{icon} {check_name}: {'OK' if check_status else 'ТРЕБУЕТ ВНИМАНИЯ'}")
all_good = all(status.values())
if all_good:
logger.info("🎉 Все миграции выполнены успешно!")
try:
async with engine.begin() as conn:
conversions_count = await conn.execute(text("SELECT COUNT(*) FROM subscription_conversions"))
users_count = await conn.execute(text("SELECT COUNT(*) FROM users"))
welcome_texts_count = await conn.execute(text("SELECT COUNT(*) FROM welcome_texts"))
conv_count = conversions_count.fetchone()[0]
usr_count = users_count.fetchone()[0]
welcome_count = welcome_texts_count.fetchone()[0]
logger.info(f"📊 Статистика: {usr_count} пользователей, {conv_count} конверсий, {welcome_count} приветственных текстов")
except Exception as stats_error:
logger.debug(f"Не удалось получить дополнительную статистику: {stats_error}")
else:
logger.warning("⚠️ Некоторые миграции требуют внимания")
missing_migrations = [check_names[k] for k, v in status.items() if not v]
logger.warning(f"Требуют выполнения: {', '.join(missing_migrations)}")
return status
except Exception as e:
logger.error(f"Ошибка проверки статуса миграций: {e}")
return None