From 8c39f5aecf098f1ec4ffcee94d0ef9d0d2bcffd9 Mon Sep 17 00:00:00 2001 From: Egor Date: Wed, 15 Oct 2025 04:51:05 +0300 Subject: [PATCH] Deduplicate Wata payment links before enforcing unique index --- app/database/universal_migration.py | 345 ++++++++++++++++++++++++++-- 1 file changed, 324 insertions(+), 21 deletions(-) diff --git a/app/database/universal_migration.py b/app/database/universal_migration.py index 4d1610a2..8dbcbe05 100644 --- a/app/database/universal_migration.py +++ b/app/database/universal_migration.py @@ -1,5 +1,6 @@ import logging from datetime import datetime +from typing import List, Tuple from sqlalchemy import inspect, select, text from sqlalchemy.ext.asyncio import AsyncSession @@ -268,6 +269,210 @@ async def check_index_exists(table_name: str, index_name: str) -> bool: ) return False + +async def fetch_duplicate_payment_links(conn) -> List[Tuple[str, int]]: + result = await conn.execute( + text( + "SELECT payment_link_id, COUNT(*) AS cnt " + "FROM wata_payments " + "WHERE payment_link_id IS NOT NULL AND payment_link_id <> '' " + "GROUP BY payment_link_id " + "HAVING COUNT(*) > 1" + ) + ) + return [(row[0], row[1]) for row in result.fetchall()] + + +def _build_dedup_suffix(base_suffix: str, record_id: int, max_length: int = 64) -> Tuple[str, int]: + suffix = f"{base_suffix}{record_id}" + trimmed_length = max_length - len(suffix) + if trimmed_length < 1: + # Fallback: use the record id only to stay within the limit. + suffix = f"dup-{record_id}" + trimmed_length = max_length - len(suffix) + return suffix, trimmed_length + + +async def resolve_duplicate_payment_links(conn, db_type: str) -> bool: + duplicates = await fetch_duplicate_payment_links(conn) + + if not duplicates: + return True + + logger.warning( + "Найдены дубликаты payment_link_id в wata_payments: %s", + ", ".join(f"{link}×{count}" for link, count in duplicates[:5]), + ) + + for payment_link_id, _ in duplicates: + result = await conn.execute( + text( + "SELECT id, payment_link_id FROM wata_payments " + "WHERE payment_link_id = :payment_link_id " + "ORDER BY id" + ), + {"payment_link_id": payment_link_id}, + ) + + rows = result.fetchall() + + if not rows: + continue + + # Skip the first occurrence to preserve the original link value. + for duplicate_row in rows[1:]: + record_id = duplicate_row[0] + original_link = duplicate_row[1] or "" + suffix, trimmed_length = _build_dedup_suffix("-dup-", record_id) + new_base = original_link[:trimmed_length] if trimmed_length > 0 else "" + new_link = f"{new_base}{suffix}" if new_base else suffix + + await conn.execute( + text( + "UPDATE wata_payments SET payment_link_id = :new_link " + "WHERE id = :record_id" + ), + {"new_link": new_link, "record_id": record_id}, + ) + + remaining_duplicates = await fetch_duplicate_payment_links(conn) + + if remaining_duplicates: + logger.error( + "Не удалось устранить дубликаты payment_link_id: %s", + ", ".join(f"{link}×{count}" for link, count in remaining_duplicates[:5]), + ) + return False + + logger.info("✅ Дубликаты payment_link_id устранены") + return True + + +async def enforce_wata_payment_link_constraints( + conn, + db_type: str, + unique_index_exists: bool, + legacy_index_exists: bool, +) -> Tuple[bool, bool]: + try: + if db_type == "sqlite": + await conn.execute( + text( + "UPDATE wata_payments " + "SET payment_link_id = 'legacy-' || id " + "WHERE payment_link_id IS NULL OR payment_link_id = ''" + ) + ) + + if not await resolve_duplicate_payment_links(conn, db_type): + return unique_index_exists, legacy_index_exists + + if not unique_index_exists: + await conn.execute( + text( + "CREATE UNIQUE INDEX IF NOT EXISTS uq_wata_payment_link " + "ON wata_payments(payment_link_id)" + ) + ) + logger.info("✅ Создан уникальный индекс uq_wata_payment_link для payment_link_id") + unique_index_exists = True + else: + logger.info("ℹ️ Уникальный индекс для payment_link_id уже существует") + + if legacy_index_exists and unique_index_exists: + await conn.execute(text("DROP INDEX IF EXISTS idx_wata_link_id")) + logger.info("ℹ️ Удалён устаревший индекс idx_wata_link_id") + legacy_index_exists = False + + return unique_index_exists, legacy_index_exists + + if db_type == "postgresql": + await conn.execute( + text( + "UPDATE wata_payments " + "SET payment_link_id = 'legacy-' || id::text " + "WHERE payment_link_id IS NULL OR payment_link_id = ''" + ) + ) + + await conn.execute( + text( + "ALTER TABLE wata_payments " + "ALTER COLUMN payment_link_id SET NOT NULL" + ) + ) + logger.info("✅ Колонка payment_link_id теперь NOT NULL") + + if not await resolve_duplicate_payment_links(conn, db_type): + return unique_index_exists, legacy_index_exists + + if not unique_index_exists: + await conn.execute( + text( + "CREATE UNIQUE INDEX IF NOT EXISTS uq_wata_payment_link " + "ON wata_payments(payment_link_id)" + ) + ) + logger.info("✅ Создан уникальный индекс uq_wata_payment_link для payment_link_id") + unique_index_exists = True + else: + logger.info("ℹ️ Уникальный индекс для payment_link_id уже существует") + + if legacy_index_exists and unique_index_exists: + await conn.execute(text("DROP INDEX IF EXISTS idx_wata_link_id")) + logger.info("ℹ️ Удалён устаревший индекс idx_wata_link_id") + legacy_index_exists = False + + return unique_index_exists, legacy_index_exists + + if db_type == "mysql": + await conn.execute( + text( + "UPDATE wata_payments " + "SET payment_link_id = CONCAT('legacy-', id) " + "WHERE payment_link_id IS NULL OR payment_link_id = ''" + ) + ) + + await conn.execute( + text( + "ALTER TABLE wata_payments " + "MODIFY COLUMN payment_link_id VARCHAR(64) NOT NULL" + ) + ) + logger.info("✅ Колонка payment_link_id теперь NOT NULL") + + if not await resolve_duplicate_payment_links(conn, db_type): + return unique_index_exists, legacy_index_exists + + if not unique_index_exists: + await conn.execute( + text( + "CREATE UNIQUE INDEX uq_wata_payment_link " + "ON wata_payments(payment_link_id)" + ) + ) + logger.info("✅ Создан уникальный индекс uq_wata_payment_link для payment_link_id") + unique_index_exists = True + else: + logger.info("ℹ️ Уникальный индекс для payment_link_id уже существует") + + if legacy_index_exists and unique_index_exists: + await conn.execute(text("DROP INDEX idx_wata_link_id ON wata_payments")) + logger.info("ℹ️ Удалён устаревший индекс idx_wata_link_id") + legacy_index_exists = False + + return unique_index_exists, legacy_index_exists + + logger.warning( + "⚠️ Неизвестный тип БД %s — не удалось усилить ограничения payment_link_id", db_type + ) + return unique_index_exists, legacy_index_exists + + except Exception as e: + logger.error(f"Ошибка настройки ограничений payment_link_id: {e}") + return unique_index_exists, legacy_index_exists + async def create_cryptobot_payments_table(): table_exists = await check_table_exists('cryptobot_payments') if table_exists: @@ -745,7 +950,7 @@ async def create_wata_payments_table(): FOREIGN KEY (transaction_id) REFERENCES transactions(id) ); - CREATE INDEX idx_wata_link_id ON wata_payments(payment_link_id); + CREATE UNIQUE INDEX idx_wata_link_id ON wata_payments(payment_link_id); CREATE INDEX idx_wata_order_id ON wata_payments(order_id); """ @@ -776,7 +981,7 @@ async def create_wata_payments_table(): updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); - CREATE INDEX idx_wata_link_id ON wata_payments(payment_link_id); + CREATE UNIQUE INDEX idx_wata_link_id ON wata_payments(payment_link_id); CREATE INDEX idx_wata_order_id ON wata_payments(order_id); """ @@ -809,7 +1014,7 @@ async def create_wata_payments_table(): FOREIGN KEY (transaction_id) REFERENCES transactions(id) ); - CREATE INDEX idx_wata_link_id ON wata_payments(payment_link_id); + CREATE UNIQUE INDEX idx_wata_link_id ON wata_payments(payment_link_id); CREATE INDEX idx_wata_order_id ON wata_payments(order_id); """ @@ -833,35 +1038,133 @@ async def ensure_wata_payment_schema() -> bool: logger.warning("⚠️ Таблица wata_payments отсутствует — создаём заново") return await create_wata_payments_table() - link_index_exists = await check_index_exists("wata_payments", "idx_wata_link_id") + db_type = await get_database_type() + + legacy_link_index_exists = await check_index_exists( + "wata_payments", "idx_wata_link_id" + ) + unique_link_index_exists = await check_index_exists( + "wata_payments", "uq_wata_payment_link" + ) + builtin_unique_index_exists = await check_index_exists( + "wata_payments", "wata_payments_payment_link_id_key" + ) + sqlite_auto_unique_exists = ( + await check_index_exists("wata_payments", "sqlite_autoindex_wata_payments_1") + if db_type == "sqlite" + else False + ) order_index_exists = await check_index_exists("wata_payments", "idx_wata_order_id") - async with engine.begin() as conn: - db_type = await get_database_type() + payment_link_column_exists = await check_column_exists( + "wata_payments", "payment_link_id" + ) + order_id_column_exists = await check_column_exists("wata_payments", "order_id") - if not link_index_exists: - if db_type in {"sqlite", "postgresql"}: + unique_index_exists = ( + unique_link_index_exists + or builtin_unique_index_exists + or sqlite_auto_unique_exists + ) + + async with engine.begin() as conn: + if not payment_link_column_exists: + if db_type == "sqlite": await conn.execute( - text("CREATE INDEX IF NOT EXISTS idx_wata_link_id ON wata_payments(payment_link_id)") + text( + "ALTER TABLE wata_payments " + "ADD COLUMN payment_link_id VARCHAR(64) NOT NULL DEFAULT ''" + ) ) + payment_link_column_exists = True + unique_index_exists = False + elif db_type == "postgresql": + await conn.execute( + text( + "ALTER TABLE wata_payments " + "ADD COLUMN IF NOT EXISTS payment_link_id VARCHAR(64)" + ) + ) + payment_link_column_exists = True elif db_type == "mysql": await conn.execute( - text("CREATE INDEX idx_wata_link_id ON wata_payments(payment_link_id)") + text("ALTER TABLE wata_payments ADD COLUMN payment_link_id VARCHAR(64)") ) - logger.info("✅ Создан индекс idx_wata_link_id") - else: - logger.info("ℹ️ Индекс idx_wata_link_id уже существует") + payment_link_column_exists = True + else: + logger.warning( + "⚠️ Неизвестный тип БД %s — пропущено добавление payment_link_id", + db_type, + ) + + if payment_link_column_exists: + logger.info("✅ Добавлена колонка payment_link_id в wata_payments") + + if payment_link_column_exists: + unique_index_exists, legacy_link_index_exists = ( + await enforce_wata_payment_link_constraints( + conn, + db_type, + unique_index_exists, + legacy_link_index_exists, + ) + ) + + if not order_id_column_exists: + if db_type == "sqlite": + await conn.execute( + text("ALTER TABLE wata_payments ADD COLUMN order_id VARCHAR(255)") + ) + order_id_column_exists = True + elif db_type == "postgresql": + await conn.execute( + text( + "ALTER TABLE wata_payments " + "ADD COLUMN IF NOT EXISTS order_id VARCHAR(255)" + ) + ) + order_id_column_exists = True + elif db_type == "mysql": + await conn.execute( + text("ALTER TABLE wata_payments ADD COLUMN order_id VARCHAR(255)") + ) + order_id_column_exists = True + else: + logger.warning( + "⚠️ Неизвестный тип БД %s — пропущено добавление order_id", + db_type, + ) + + if order_id_column_exists: + logger.info("✅ Добавлена колонка order_id в wata_payments") if not order_index_exists: - if db_type in {"sqlite", "postgresql"}: - await conn.execute( - text("CREATE INDEX IF NOT EXISTS idx_wata_order_id ON wata_payments(order_id)") + if not order_id_column_exists: + logger.warning( + "⚠️ Пропущено создание индекса idx_wata_order_id — колонка order_id отсутствует" ) - elif db_type == "mysql": - await conn.execute( - text("CREATE INDEX idx_wata_order_id ON wata_payments(order_id)") - ) - logger.info("✅ Создан индекс idx_wata_order_id") + else: + index_created = False + if db_type in {"sqlite", "postgresql"}: + await conn.execute( + text( + "CREATE INDEX IF NOT EXISTS idx_wata_order_id ON wata_payments(order_id)" + ) + ) + index_created = True + elif db_type == "mysql": + await conn.execute( + text("CREATE INDEX idx_wata_order_id ON wata_payments(order_id)") + ) + index_created = True + else: + logger.warning( + "⚠️ Неизвестный тип БД %s — пропущено создание индекса idx_wata_order_id", + db_type, + ) + + if index_created: + logger.info("✅ Создан индекс idx_wata_order_id") else: logger.info("ℹ️ Индекс idx_wata_order_id уже существует")