diff --git a/app/database/database.py b/app/database/database.py index 76026809..f198a944 100644 --- a/app/database/database.py +++ b/app/database/database.py @@ -8,7 +8,7 @@ from sqlalchemy.ext.asyncio import ( AsyncEngine ) from sqlalchemy.pool import NullPool, AsyncAdaptedQueuePool -from sqlalchemy import event, text +from sqlalchemy import event, text, bindparam from sqlalchemy.engine import Engine import time from app.config import settings @@ -194,12 +194,56 @@ class BatchOperations: @staticmethod async def bulk_update(session: AsyncSession, model, data: list[dict], chunk_size: int = 1000): """Массовое обновление с чанками""" + if not data: + return + + primary_keys = [column.name for column in model.__table__.primary_key.columns] + if not primary_keys: + raise ValueError("Model must have a primary key for bulk_update") + + updatable_columns = [ + column.name + for column in model.__table__.columns + if column.name not in primary_keys + ] + + if not updatable_columns: + raise ValueError("No columns available for update in bulk_update") + + stmt = ( + model.__table__.update() + .where( + *[ + getattr(model.__table__.c, pk) == bindparam(pk) + for pk in primary_keys + ] + ) + .values( + **{ + column: bindparam(column, required=False) + for column in updatable_columns + } + ) + ) + for i in range(0, len(data), chunk_size): chunk = data[i:i + chunk_size] - await session.execute( - model.__table__.update(), - chunk - ) + filtered_chunk = [] + for item in chunk: + missing_keys = [pk for pk in primary_keys if pk not in item] + if missing_keys: + raise ValueError( + f"Missing primary key values {missing_keys} for bulk_update" + ) + + filtered_item = { + key: value + for key, value in item.items() + if key in primary_keys or key in updatable_columns + } + filtered_chunk.append(filtered_item) + + await session.execute(stmt, filtered_chunk) await session.commit() batch_ops = BatchOperations()