Deduplicate Wata payment links before enforcing unique index

This commit is contained in:
Egor
2025-10-15 04:51:05 +03:00
parent e935d13ccd
commit 8c39f5aecf

View File

@@ -1,5 +1,6 @@
import logging
from datetime import datetime
from typing import List, Tuple
from sqlalchemy import inspect, select, text
from sqlalchemy.ext.asyncio import AsyncSession
@@ -268,6 +269,210 @@ async def check_index_exists(table_name: str, index_name: str) -> bool:
)
return False
async def fetch_duplicate_payment_links(conn) -> List[Tuple[str, int]]:
result = await conn.execute(
text(
"SELECT payment_link_id, COUNT(*) AS cnt "
"FROM wata_payments "
"WHERE payment_link_id IS NOT NULL AND payment_link_id <> '' "
"GROUP BY payment_link_id "
"HAVING COUNT(*) > 1"
)
)
return [(row[0], row[1]) for row in result.fetchall()]
def _build_dedup_suffix(base_suffix: str, record_id: int, max_length: int = 64) -> Tuple[str, int]:
suffix = f"{base_suffix}{record_id}"
trimmed_length = max_length - len(suffix)
if trimmed_length < 1:
# Fallback: use the record id only to stay within the limit.
suffix = f"dup-{record_id}"
trimmed_length = max_length - len(suffix)
return suffix, trimmed_length
async def resolve_duplicate_payment_links(conn, db_type: str) -> bool:
duplicates = await fetch_duplicate_payment_links(conn)
if not duplicates:
return True
logger.warning(
"Найдены дубликаты payment_link_id в wata_payments: %s",
", ".join(f"{link}×{count}" for link, count in duplicates[:5]),
)
for payment_link_id, _ in duplicates:
result = await conn.execute(
text(
"SELECT id, payment_link_id FROM wata_payments "
"WHERE payment_link_id = :payment_link_id "
"ORDER BY id"
),
{"payment_link_id": payment_link_id},
)
rows = result.fetchall()
if not rows:
continue
# Skip the first occurrence to preserve the original link value.
for duplicate_row in rows[1:]:
record_id = duplicate_row[0]
original_link = duplicate_row[1] or ""
suffix, trimmed_length = _build_dedup_suffix("-dup-", record_id)
new_base = original_link[:trimmed_length] if trimmed_length > 0 else ""
new_link = f"{new_base}{suffix}" if new_base else suffix
await conn.execute(
text(
"UPDATE wata_payments SET payment_link_id = :new_link "
"WHERE id = :record_id"
),
{"new_link": new_link, "record_id": record_id},
)
remaining_duplicates = await fetch_duplicate_payment_links(conn)
if remaining_duplicates:
logger.error(
"Не удалось устранить дубликаты payment_link_id: %s",
", ".join(f"{link}×{count}" for link, count in remaining_duplicates[:5]),
)
return False
logger.info("✅ Дубликаты payment_link_id устранены")
return True
async def enforce_wata_payment_link_constraints(
conn,
db_type: str,
unique_index_exists: bool,
legacy_index_exists: bool,
) -> Tuple[bool, bool]:
try:
if db_type == "sqlite":
await conn.execute(
text(
"UPDATE wata_payments "
"SET payment_link_id = 'legacy-' || id "
"WHERE payment_link_id IS NULL OR payment_link_id = ''"
)
)
if not await resolve_duplicate_payment_links(conn, db_type):
return unique_index_exists, legacy_index_exists
if not unique_index_exists:
await conn.execute(
text(
"CREATE UNIQUE INDEX IF NOT EXISTS uq_wata_payment_link "
"ON wata_payments(payment_link_id)"
)
)
logger.info("✅ Создан уникальный индекс uq_wata_payment_link для payment_link_id")
unique_index_exists = True
else:
logger.info(" Уникальный индекс для payment_link_id уже существует")
if legacy_index_exists and unique_index_exists:
await conn.execute(text("DROP INDEX IF EXISTS idx_wata_link_id"))
logger.info(" Удалён устаревший индекс idx_wata_link_id")
legacy_index_exists = False
return unique_index_exists, legacy_index_exists
if db_type == "postgresql":
await conn.execute(
text(
"UPDATE wata_payments "
"SET payment_link_id = 'legacy-' || id::text "
"WHERE payment_link_id IS NULL OR payment_link_id = ''"
)
)
await conn.execute(
text(
"ALTER TABLE wata_payments "
"ALTER COLUMN payment_link_id SET NOT NULL"
)
)
logger.info("✅ Колонка payment_link_id теперь NOT NULL")
if not await resolve_duplicate_payment_links(conn, db_type):
return unique_index_exists, legacy_index_exists
if not unique_index_exists:
await conn.execute(
text(
"CREATE UNIQUE INDEX IF NOT EXISTS uq_wata_payment_link "
"ON wata_payments(payment_link_id)"
)
)
logger.info("✅ Создан уникальный индекс uq_wata_payment_link для payment_link_id")
unique_index_exists = True
else:
logger.info(" Уникальный индекс для payment_link_id уже существует")
if legacy_index_exists and unique_index_exists:
await conn.execute(text("DROP INDEX IF EXISTS idx_wata_link_id"))
logger.info(" Удалён устаревший индекс idx_wata_link_id")
legacy_index_exists = False
return unique_index_exists, legacy_index_exists
if db_type == "mysql":
await conn.execute(
text(
"UPDATE wata_payments "
"SET payment_link_id = CONCAT('legacy-', id) "
"WHERE payment_link_id IS NULL OR payment_link_id = ''"
)
)
await conn.execute(
text(
"ALTER TABLE wata_payments "
"MODIFY COLUMN payment_link_id VARCHAR(64) NOT NULL"
)
)
logger.info("✅ Колонка payment_link_id теперь NOT NULL")
if not await resolve_duplicate_payment_links(conn, db_type):
return unique_index_exists, legacy_index_exists
if not unique_index_exists:
await conn.execute(
text(
"CREATE UNIQUE INDEX uq_wata_payment_link "
"ON wata_payments(payment_link_id)"
)
)
logger.info("✅ Создан уникальный индекс uq_wata_payment_link для payment_link_id")
unique_index_exists = True
else:
logger.info(" Уникальный индекс для payment_link_id уже существует")
if legacy_index_exists and unique_index_exists:
await conn.execute(text("DROP INDEX idx_wata_link_id ON wata_payments"))
logger.info(" Удалён устаревший индекс idx_wata_link_id")
legacy_index_exists = False
return unique_index_exists, legacy_index_exists
logger.warning(
"⚠️ Неизвестный тип БД %s — не удалось усилить ограничения payment_link_id", db_type
)
return unique_index_exists, legacy_index_exists
except Exception as e:
logger.error(f"Ошибка настройки ограничений payment_link_id: {e}")
return unique_index_exists, legacy_index_exists
async def create_cryptobot_payments_table():
table_exists = await check_table_exists('cryptobot_payments')
if table_exists:
@@ -745,7 +950,7 @@ async def create_wata_payments_table():
FOREIGN KEY (transaction_id) REFERENCES transactions(id)
);
CREATE INDEX idx_wata_link_id ON wata_payments(payment_link_id);
CREATE UNIQUE INDEX idx_wata_link_id ON wata_payments(payment_link_id);
CREATE INDEX idx_wata_order_id ON wata_payments(order_id);
"""
@@ -776,7 +981,7 @@ async def create_wata_payments_table():
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_wata_link_id ON wata_payments(payment_link_id);
CREATE UNIQUE INDEX idx_wata_link_id ON wata_payments(payment_link_id);
CREATE INDEX idx_wata_order_id ON wata_payments(order_id);
"""
@@ -809,7 +1014,7 @@ async def create_wata_payments_table():
FOREIGN KEY (transaction_id) REFERENCES transactions(id)
);
CREATE INDEX idx_wata_link_id ON wata_payments(payment_link_id);
CREATE UNIQUE INDEX idx_wata_link_id ON wata_payments(payment_link_id);
CREATE INDEX idx_wata_order_id ON wata_payments(order_id);
"""
@@ -833,35 +1038,133 @@ async def ensure_wata_payment_schema() -> bool:
logger.warning("⚠️ Таблица wata_payments отсутствует — создаём заново")
return await create_wata_payments_table()
link_index_exists = await check_index_exists("wata_payments", "idx_wata_link_id")
db_type = await get_database_type()
legacy_link_index_exists = await check_index_exists(
"wata_payments", "idx_wata_link_id"
)
unique_link_index_exists = await check_index_exists(
"wata_payments", "uq_wata_payment_link"
)
builtin_unique_index_exists = await check_index_exists(
"wata_payments", "wata_payments_payment_link_id_key"
)
sqlite_auto_unique_exists = (
await check_index_exists("wata_payments", "sqlite_autoindex_wata_payments_1")
if db_type == "sqlite"
else False
)
order_index_exists = await check_index_exists("wata_payments", "idx_wata_order_id")
async with engine.begin() as conn:
db_type = await get_database_type()
payment_link_column_exists = await check_column_exists(
"wata_payments", "payment_link_id"
)
order_id_column_exists = await check_column_exists("wata_payments", "order_id")
if not link_index_exists:
if db_type in {"sqlite", "postgresql"}:
unique_index_exists = (
unique_link_index_exists
or builtin_unique_index_exists
or sqlite_auto_unique_exists
)
async with engine.begin() as conn:
if not payment_link_column_exists:
if db_type == "sqlite":
await conn.execute(
text("CREATE INDEX IF NOT EXISTS idx_wata_link_id ON wata_payments(payment_link_id)")
text(
"ALTER TABLE wata_payments "
"ADD COLUMN payment_link_id VARCHAR(64) NOT NULL DEFAULT ''"
)
)
payment_link_column_exists = True
unique_index_exists = False
elif db_type == "postgresql":
await conn.execute(
text(
"ALTER TABLE wata_payments "
"ADD COLUMN IF NOT EXISTS payment_link_id VARCHAR(64)"
)
)
payment_link_column_exists = True
elif db_type == "mysql":
await conn.execute(
text("CREATE INDEX idx_wata_link_id ON wata_payments(payment_link_id)")
text("ALTER TABLE wata_payments ADD COLUMN payment_link_id VARCHAR(64)")
)
logger.info("✅ Создан индекс idx_wata_link_id")
else:
logger.info(" Индекс idx_wata_link_id уже существует")
payment_link_column_exists = True
else:
logger.warning(
"⚠️ Неизвестный тип БД %s — пропущено добавление payment_link_id",
db_type,
)
if payment_link_column_exists:
logger.info("✅ Добавлена колонка payment_link_id в wata_payments")
if payment_link_column_exists:
unique_index_exists, legacy_link_index_exists = (
await enforce_wata_payment_link_constraints(
conn,
db_type,
unique_index_exists,
legacy_link_index_exists,
)
)
if not order_id_column_exists:
if db_type == "sqlite":
await conn.execute(
text("ALTER TABLE wata_payments ADD COLUMN order_id VARCHAR(255)")
)
order_id_column_exists = True
elif db_type == "postgresql":
await conn.execute(
text(
"ALTER TABLE wata_payments "
"ADD COLUMN IF NOT EXISTS order_id VARCHAR(255)"
)
)
order_id_column_exists = True
elif db_type == "mysql":
await conn.execute(
text("ALTER TABLE wata_payments ADD COLUMN order_id VARCHAR(255)")
)
order_id_column_exists = True
else:
logger.warning(
"⚠️ Неизвестный тип БД %s — пропущено добавление order_id",
db_type,
)
if order_id_column_exists:
logger.info("✅ Добавлена колонка order_id в wata_payments")
if not order_index_exists:
if db_type in {"sqlite", "postgresql"}:
await conn.execute(
text("CREATE INDEX IF NOT EXISTS idx_wata_order_id ON wata_payments(order_id)")
if not order_id_column_exists:
logger.warning(
"⚠️ Пропущено создание индекса idx_wata_order_id — колонка order_id отсутствует"
)
elif db_type == "mysql":
await conn.execute(
text("CREATE INDEX idx_wata_order_id ON wata_payments(order_id)")
)
logger.info("✅ Создан индекс idx_wata_order_id")
else:
index_created = False
if db_type in {"sqlite", "postgresql"}:
await conn.execute(
text(
"CREATE INDEX IF NOT EXISTS idx_wata_order_id ON wata_payments(order_id)"
)
)
index_created = True
elif db_type == "mysql":
await conn.execute(
text("CREATE INDEX idx_wata_order_id ON wata_payments(order_id)")
)
index_created = True
else:
logger.warning(
"⚠️ Неизвестный тип БД %s — пропущено создание индекса idx_wata_order_id",
db_type,
)
if index_created:
logger.info("✅ Создан индекс idx_wata_order_id")
else:
logger.info(" Индекс idx_wata_order_id уже существует")