diff --git a/app/services/backup_service.py b/app/services/backup_service.py index 08bb4efc..35bdd117 100644 --- a/app/services/backup_service.py +++ b/app/services/backup_service.py @@ -19,7 +19,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from app.config import settings -from app.database.database import get_db, engine +from app.database.database import AsyncSessionLocal, engine from app.database.models import ( User, Subscription, Transaction, PromoCode, PromoCodeUse, ReferralEarning, Squad, ServiceRule, SystemSetting, MonitoringLog, @@ -475,7 +475,7 @@ class BackupService: backup_data: Dict[str, List[Dict[str, Any]]] = {} total_records = 0 - async for db in get_db(): + async with AsyncSessionLocal() as db: try: for model in models_to_backup: table_name = model.__tablename__ @@ -533,10 +533,6 @@ class BackupService: except Exception as exc: logger.error("Ошибка при экспорте данных: %s", exc) raise exc - finally: - await db.close() - - return backup_data, {}, total_records, len(models_to_backup) async def _collect_files(self, staging_dir: Path, include_logs: bool) -> List[Dict[str, Any]]: files_info: List[Dict[str, Any]] = [] @@ -815,7 +811,7 @@ class BackupService: restored_records = 0 restored_tables = 0 - async for db in get_db(): + async with AsyncSessionLocal() as db: try: if clear_existing: logger.warning("🗑️ Очищаем существующие данные...") @@ -903,14 +899,10 @@ class BackupService: await db.commit() - break - except Exception as exc: await db.rollback() logger.error("Ошибка при восстановлении: %s", exc) raise exc - finally: - await db.close() return restored_tables, restored_records diff --git a/app/services/daily_subscription_service.py b/app/services/daily_subscription_service.py index f0f15063..fcbb5e76 100644 --- a/app/services/daily_subscription_service.py +++ b/app/services/daily_subscription_service.py @@ -13,7 +13,7 @@ from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.config import settings -from app.database.database import get_db +from app.database.database import AsyncSessionLocal from app.database.crud.subscription import ( get_daily_subscriptions_for_charge, update_daily_charge_time, @@ -65,25 +65,30 @@ class DailySubscriptionService: } try: - async for db in get_db(): - subscriptions = await get_daily_subscriptions_for_charge(db) - stats["checked"] = len(subscriptions) + async with AsyncSessionLocal() as db: + try: + subscriptions = await get_daily_subscriptions_for_charge(db) + stats["checked"] = len(subscriptions) - for subscription in subscriptions: - try: - result = await self._process_single_charge(db, subscription) - if result == "charged": - stats["charged"] += 1 - elif result == "suspended": - stats["suspended"] += 1 - elif result == "error": + for subscription in subscriptions: + try: + result = await self._process_single_charge(db, subscription) + if result == "charged": + stats["charged"] += 1 + elif result == "suspended": + stats["suspended"] += 1 + elif result == "error": + stats["errors"] += 1 + except Exception as e: + logger.error( + f"Ошибка обработки суточной подписки {subscription.id}: {e}", + exc_info=True + ) stats["errors"] += 1 - except Exception as e: - logger.error( - f"Ошибка обработки суточной подписки {subscription.id}: {e}", - exc_info=True - ) - stats["errors"] += 1 + await db.commit() + except Exception as e: + logger.error(f"Ошибка при обработке подписок: {e}", exc_info=True) + await db.rollback() except Exception as e: logger.error(f"Ошибка при получении подписок для списания: {e}", exc_info=True) @@ -272,35 +277,40 @@ class DailySubscriptionService: from app.database.models import TrafficPurchase try: - async for db in get_db(): - # Находим все истекшие докупки - now = datetime.utcnow() - query = ( - select(TrafficPurchase) - .where(TrafficPurchase.expires_at <= now) - ) - result = await db.execute(query) - expired_purchases = result.scalars().all() - stats["checked"] = len(expired_purchases) + async with AsyncSessionLocal() as db: + try: + # Находим все истекшие докупки + now = datetime.utcnow() + query = ( + select(TrafficPurchase) + .where(TrafficPurchase.expires_at <= now) + ) + result = await db.execute(query) + expired_purchases = result.scalars().all() + stats["checked"] = len(expired_purchases) - # Группируем по подпискам для обновления - subscriptions_to_update = {} - for purchase in expired_purchases: - if purchase.subscription_id not in subscriptions_to_update: - subscriptions_to_update[purchase.subscription_id] = [] - subscriptions_to_update[purchase.subscription_id].append(purchase) + # Группируем по подпискам для обновления + subscriptions_to_update = {} + for purchase in expired_purchases: + if purchase.subscription_id not in subscriptions_to_update: + subscriptions_to_update[purchase.subscription_id] = [] + subscriptions_to_update[purchase.subscription_id].append(purchase) - # Удаляем истекшие докупки и обновляем подписки - for subscription_id, purchases in subscriptions_to_update.items(): - try: - await self._reset_subscription_traffic(db, subscription_id, purchases) - stats["reset"] += len(purchases) - except Exception as e: - logger.error( - f"Ошибка сброса трафика подписки {subscription_id}: {e}", - exc_info=True - ) - stats["errors"] += 1 + # Удаляем истекшие докупки и обновляем подписки + for subscription_id, purchases in subscriptions_to_update.items(): + try: + await self._reset_subscription_traffic(db, subscription_id, purchases) + stats["reset"] += len(purchases) + except Exception as e: + logger.error( + f"Ошибка сброса трафика подписки {subscription_id}: {e}", + exc_info=True + ) + stats["errors"] += 1 + await db.commit() + except Exception as e: + logger.error(f"Ошибка при обработке сброса трафика: {e}", exc_info=True) + await db.rollback() except Exception as e: logger.error(f"Ошибка при получении подписок для сброса трафика: {e}", exc_info=True) diff --git a/app/services/monitoring_service.py b/app/services/monitoring_service.py index fffcfc9a..37f41efe 100644 --- a/app/services/monitoring_service.py +++ b/app/services/monitoring_service.py @@ -12,7 +12,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from app.config import settings -from app.database.database import get_db +from app.database.database import AsyncSessionLocal from app.database.crud.discount_offer import ( deactivate_expired_offers, get_latest_claimed_offer_for_user, @@ -190,7 +190,7 @@ class MonitoringService: pass async def _monitoring_cycle(self): - async for db in get_db(): + async with AsyncSessionLocal() as db: try: await self._cleanup_notification_cache() @@ -219,23 +219,26 @@ class MonitoringService: await self._process_autopayments(db) await self._cleanup_inactive_users(db) await self._sync_with_remnawave(db) - + await self._log_monitoring_event( - db, "monitoring_cycle_completed", - "Цикл мониторинга успешно завершен", + db, "monitoring_cycle_completed", + "Цикл мониторинга успешно завершен", {"timestamp": datetime.utcnow().isoformat()} ) - + await db.commit() + except Exception as e: logger.error(f"Ошибка в цикле мониторинга: {e}") - await self._log_monitoring_event( - db, "monitoring_cycle_error", - f"Ошибка в цикле мониторинга: {str(e)}", - {"error": str(e)}, - is_success=False - ) - finally: - break + try: + await self._log_monitoring_event( + db, "monitoring_cycle_error", + f"Ошибка в цикле мониторинга: {str(e)}", + {"error": str(e)}, + is_success=False + ) + except Exception: + pass + await db.rollback() async def _cleanup_notification_cache(self): current_time = datetime.utcnow() @@ -1724,11 +1727,13 @@ class MonitoringService: interval_seconds = 60 while self.is_running: try: - async for db in get_db(): + async with AsyncSessionLocal() as db: try: await self._check_ticket_sla(db) - finally: - break + await db.commit() + except Exception as e: + logger.error(f"Ошибка в SLA-проверке: {e}") + await db.rollback() except asyncio.CancelledError: break except Exception as e: