Merge pull request #2301 from BEDOLAGA-DEV/dev5

fuck db sessions
This commit is contained in:
Egor
2026-01-17 01:33:07 +03:00
committed by GitHub
12 changed files with 217 additions and 203 deletions

View File

@@ -6,7 +6,7 @@ from typing import Any, Dict, Optional
from aiohttp import web
from app.config import settings
from app.database.database import get_db
from app.database.database import AsyncSessionLocal
from app.external.heleket import HeleketService
from app.services.payment_service import PaymentService
@@ -33,8 +33,14 @@ class HeleketWebhookHandler:
return web.json_response({"status": "error", "reason": "invalid_signature"}, status=401)
processed: Optional[bool] = None
async for db in get_db():
processed = await self.payment_service.process_heleket_webhook(db, payload)
async with AsyncSessionLocal() as db:
try:
processed = await self.payment_service.process_heleket_webhook(db, payload)
await db.commit()
except Exception as e:
logger.error(f"Ошибка обработки Heleket webhook: {e}")
await db.rollback()
return web.json_response({"status": "error", "reason": "internal_error"}, status=500)
if processed:
return web.json_response({"status": "ok"}, status=200)

View File

@@ -14,7 +14,7 @@ from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding
from app.config import settings
from app.database.database import get_db
from app.database.database import AsyncSessionLocal
from app.services.payment_service import PaymentService
logger = logging.getLogger(__name__)
@@ -170,11 +170,14 @@ class WataWebhookHandler:
)
processed: Optional[bool] = None
async for db in get_db():
processed = await self.payment_service.process_wata_webhook(db, payload)
# Allow the generator to finish naturally so it can commit/rollback.
# get_db() yields only once, so exiting the loop body without breaking
# triggers the generator cleanup logic on the next iteration attempt.
async with AsyncSessionLocal() as db:
try:
processed = await self.payment_service.process_wata_webhook(db, payload)
await db.commit()
except Exception as e:
logger.error(f"Ошибка обработки WATA webhook: {e}")
await db.rollback()
return web.json_response({"status": "error", "reason": "internal_error"}, status=500)
if processed is None:
logger.error("Не удалось обработать WATA webhook: нет сессии БД")

View File

@@ -15,7 +15,7 @@ from typing import Iterable, Optional, Dict, Any, List, Union, Tuple, TYPE_CHECK
from aiohttp import web
from app.config import settings
from app.database.database import get_db
from app.database.database import AsyncSessionLocal
if TYPE_CHECKING:
from app.services.payment_service import PaymentService
@@ -273,7 +273,7 @@ class YooKassaWebhookHandler:
logger.info(f" Игнорируем событие YooKassa: {event_type}")
return web.Response(status=200, text="OK")
async for db in get_db():
async with AsyncSessionLocal() as db:
try:
# Проверяем, не обрабатывается ли этот платеж уже (защита от дублирования)
from app.database.models import PaymentMethod
@@ -285,18 +285,22 @@ class YooKassaWebhookHandler:
if existing_transaction and event_type == "payment.succeeded":
logger.info(f" Платеж YooKassa {yookassa_payment_id} уже был обработан. Пропускаем дублирующий вебхук.")
return web.Response(status=200, text="OK")
success = await self.payment_service.process_yookassa_webhook(db, webhook_data)
if success:
await db.commit()
logger.info(f"✅ Успешно обработан webhook YooKassa: {event_type} для платежа {yookassa_payment_id}")
return web.Response(status=200, text="OK")
else:
await db.rollback()
logger.error(f"❌ Ошибка обработки webhook YooKassa: {event_type} для платежа {yookassa_payment_id}")
return web.Response(status=500, text="Processing error")
finally:
await db.close()
except Exception as e:
await db.rollback()
logger.error(f"❌ Ошибка обработки webhook YooKassa: {e}", exc_info=True)
return web.Response(status=500, text="Processing error")
except Exception as e:
logger.error(f"❌ Критическая ошибка обработки webhook YooKassa: {e}", exc_info=True)

View File

@@ -8,7 +8,7 @@ from aiogram.fsm.context import FSMContext
from aiogram.exceptions import TelegramBadRequest
from app.config import settings
from app.database.database import get_db
from app.database.database import AsyncSessionLocal
from app.services.monitoring_service import monitoring_service
from app.services.nalogo_queue_service import nalogo_queue_service
from app.services.traffic_monitoring_service import (
@@ -383,12 +383,12 @@ async def _render_notification_settings_for_state(
@admin_required
async def admin_monitoring_menu(callback: CallbackQuery):
try:
async for db in get_db():
async with AsyncSessionLocal() as db:
status = await monitoring_service.get_monitoring_status(db)
running_status = "🟢 Работает" if status['is_running'] else "🔴 Остановлен"
last_update = status['last_update'].strftime('%H:%M:%S') if status['last_update'] else "Никогда"
text = f"""
🔍 <b>Система мониторинга</b>
@@ -404,12 +404,11 @@ async def admin_monitoring_menu(callback: CallbackQuery):
🔧 Выберите действие:
"""
language = callback.from_user.language_code or settings.DEFAULT_LANGUAGE
keyboard = get_monitoring_keyboard(language)
await callback.message.edit_text(text, parse_mode="HTML", reply_markup=keyboard)
break
except Exception as e:
logger.error(f"Ошибка в админ меню мониторинга: {e}")
await callback.answer("❌ Ошибка получения данных", show_alert=True)
@@ -742,7 +741,7 @@ async def force_check_callback(callback: CallbackQuery):
try:
await callback.answer("⏳ Выполняем проверку подписок...")
async for db in get_db():
async with AsyncSessionLocal() as db:
results = await monitoring_service.force_check_subscriptions(db)
text = f"""
@@ -764,7 +763,6 @@ async def force_check_callback(callback: CallbackQuery):
])
await callback.message.edit_text(text, parse_mode="HTML", reply_markup=keyboard)
break
except Exception as e:
logger.error(f"Ошибка принудительной проверки: {e}")
@@ -843,46 +841,45 @@ async def monitoring_logs_callback(callback: CallbackQuery):
if "_page_" in callback.data:
page = int(callback.data.split("_page_")[1])
async for db in get_db():
async with AsyncSessionLocal() as db:
all_logs = await monitoring_service.get_monitoring_logs(db, limit=1000)
if not all_logs:
text = "📋 <b>Логи мониторинга пусты</b>\n\nСистема еще не выполнила проверки."
keyboard = get_monitoring_logs_back_keyboard()
await callback.message.edit_text(text, parse_mode="HTML", reply_markup=keyboard)
return
per_page = 8
paginated_logs = paginate_list(all_logs, page=page, per_page=per_page)
text = f"📋 <b>Логи мониторинга</b> (стр. {page}/{paginated_logs.total_pages})\n\n"
for log in paginated_logs.items:
icon = "" if log['is_success'] else ""
time_str = log['created_at'].strftime('%m-%d %H:%M')
event_type = log['event_type'].replace('_', ' ').title()
message = log['message']
if len(message) > 45:
message = message[:45] + "..."
text += f"{icon} <code>{time_str}</code> {event_type}\n"
text += f" 📄 {message}\n\n"
total_success = sum(1 for log in all_logs if log['is_success'])
total_failed = len(all_logs) - total_success
success_rate = round(total_success / len(all_logs) * 100, 1) if all_logs else 0
text += f"📊 <b>Общая статистика:</b>\n"
text += f"Всего событий: {len(all_logs)}\n"
text += f"• Успешных: {total_success}\n"
text += f"• Ошибок: {total_failed}\n"
text += f"• Успешность: {success_rate}%"
keyboard = get_monitoring_logs_keyboard(page, paginated_logs.total_pages)
await callback.message.edit_text(text, parse_mode="HTML", reply_markup=keyboard)
break
except Exception as e:
logger.error(f"Ошибка получения логов: {e}")
await callback.answer("❌ Ошибка получения логов", show_alert=True)
@@ -892,17 +889,17 @@ async def monitoring_logs_callback(callback: CallbackQuery):
@admin_required
async def clear_logs_callback(callback: CallbackQuery):
try:
async for db in get_db():
deleted_count = await monitoring_service.cleanup_old_logs(db, days=0)
async with AsyncSessionLocal() as db:
deleted_count = await monitoring_service.cleanup_old_logs(db, days=0)
await db.commit()
if deleted_count > 0:
await callback.answer(f"🗑️ Удалено {deleted_count} записей логов")
else:
await callback.answer(" Логи уже пусты")
await monitoring_logs_callback(callback)
break
except Exception as e:
logger.error(f"Ошибка очистки логов: {e}")
await callback.answer(f"❌ Ошибка очистки: {str(e)}", show_alert=True)
@@ -942,19 +939,19 @@ async def test_notifications_callback(callback: CallbackQuery):
@admin_required
async def monitoring_statistics_callback(callback: CallbackQuery):
try:
async for db in get_db():
async with AsyncSessionLocal() as db:
from app.database.crud.subscription import get_subscriptions_statistics
sub_stats = await get_subscriptions_statistics(db)
mon_status = await monitoring_service.get_monitoring_status(db)
week_ago = datetime.now() - timedelta(days=7)
week_logs = await monitoring_service.get_monitoring_logs(db, limit=1000)
week_logs = [log for log in week_logs if log['created_at'] >= week_ago]
week_success = sum(1 for log in week_logs if log['is_success'])
week_errors = len(week_logs) - week_success
text = f"""
📊 <b>Статистика мониторинга</b>
@@ -1028,8 +1025,7 @@ async def monitoring_statistics_callback(callback: CallbackQuery):
keyboard = InlineKeyboardMarkup(inline_keyboard=buttons)
await callback.message.edit_text(text, parse_mode="HTML", reply_markup=keyboard)
break
except Exception as e:
logger.error(f"Ошибка получения статистики: {e}")
await callback.answer(f"❌ Ошибка получения статистики: {str(e)}", show_alert=True)
@@ -1068,7 +1064,7 @@ async def nalogo_force_process_callback(callback: CallbackQuery):
from aiogram.types import InlineKeyboardMarkup, InlineKeyboardButton
# Перезагружаем статистику
async for db in get_db():
async with AsyncSessionLocal() as db:
from app.database.crud.subscription import get_subscriptions_statistics
sub_stats = await get_subscriptions_statistics(db)
mon_status = await monitoring_service.get_monitoring_status(db)
@@ -1139,7 +1135,6 @@ async def nalogo_force_process_callback(callback: CallbackQuery):
keyboard = InlineKeyboardMarkup(inline_keyboard=buttons)
await callback.message.edit_text(stats_text, parse_mode="HTML", reply_markup=keyboard)
break
except Exception as e:
logger.error(f"Ошибка принудительной обработки чеков: {e}")
@@ -1309,7 +1304,7 @@ async def receipts_link_old_callback(callback: CallbackQuery):
TRACKING_START_DATE = datetime(2024, 12, 29, 0, 0, 0)
async for db in get_db():
async with AsyncSessionLocal() as db:
# Получаем старые транзакции без чеков
query = select(Transaction).where(
and_(
@@ -1387,7 +1382,6 @@ async def receipts_link_old_callback(callback: CallbackQuery):
])
await callback.message.edit_text(text, parse_mode="HTML", reply_markup=keyboard)
break
except Exception as e:
logger.error(f"Ошибка привязки старых чеков: {e}", exc_info=True)
@@ -1695,11 +1689,11 @@ def get_monitoring_logs_back_keyboard():
@admin_required
async def monitoring_command(message: Message):
try:
async for db in get_db():
async with AsyncSessionLocal() as db:
status = await monitoring_service.get_monitoring_status(db)
running_status = "🟢 Работает" if status['is_running'] else "🔴 Остановлен"
text = f"""
🔍 <b>Быстрый статус мониторинга</b>
@@ -1709,10 +1703,9 @@ async def monitoring_command(message: Message):
Для подробного управления используйте админ-панель.
"""
await message.answer(text, parse_mode="HTML")
break
except Exception as e:
logger.error(f"Ошибка команды /monitoring: {e}")
await message.answer(f"❌ Ошибка: {str(e)}")

View File

@@ -262,9 +262,9 @@ async def handle_pre_checkout_query(query: types.PreCheckoutQuery):
return
try:
from app.database.database import get_db
from app.database.database import AsyncSessionLocal
async for db in get_db():
async with AsyncSessionLocal() as db:
user = await get_user_by_telegram_id(db, query.from_user.id)
if not user:
logger.warning(f"Пользователь {query.from_user.id} не найден в БД")
@@ -277,7 +277,6 @@ async def handle_pre_checkout_query(query: types.PreCheckoutQuery):
)
return
texts = get_texts(user.language or DEFAULT_LANGUAGE)
break
except Exception as db_error:
logger.error(f"Ошибка подключения к БД в pre_checkout_query: {db_error}")
await query.answer(

View File

@@ -4,7 +4,7 @@ from aiohttp import web
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import settings
from app.database.database import get_db
from app.database.database import AsyncSessionLocal
from app.database.crud.user import get_user_by_id, add_user_balance
from app.database.crud.transaction import create_transaction, get_transaction_by_external_id
from app.database.models import TransactionType, PaymentMethod
@@ -31,25 +31,25 @@ async def tribute_webhook(request):
logger.error("Ошибка обработки Tribute webhook")
return web.Response(status=400, text="Invalid webhook data")
async for db in get_db():
async with AsyncSessionLocal() as db:
try:
existing_transaction = await get_transaction_by_external_id(
db, processed_data['payment_id'], PaymentMethod.TRIBUTE
)
if existing_transaction:
logger.info(f"Платеж {processed_data['payment_id']} уже обработан")
return web.Response(status=200, text="Already processed")
if processed_data['status'] == 'completed':
user = await get_user_by_id(db, processed_data['user_id'])
if user:
await add_user_balance(
db, user, processed_data['amount_kopeks'],
f"Пополнение через Tribute: {processed_data['payment_id']}"
)
await create_transaction(
db=db,
user_id=user.id,
@@ -59,17 +59,16 @@ async def tribute_webhook(request):
payment_method=PaymentMethod.TRIBUTE,
external_id=processed_data['payment_id']
)
logger.info(f"✅ Обработан Tribute платеж: {processed_data['payment_id']}")
await db.commit()
return web.Response(status=200, text="OK")
except Exception as e:
logger.error(f"Ошибка обработки Tribute webhook: {e}")
await db.rollback()
return web.Response(status=500, text="Internal error")
finally:
break
except Exception as e:
logger.error(f"Ошибка в Tribute webhook: {e}")
@@ -85,24 +84,24 @@ async def handle_successful_payment(message: types.Message):
user_id = int(payload_parts[1])
amount_kopeks = int(payload_parts[2])
async for db in get_db():
async with AsyncSessionLocal() as db:
try:
existing_transaction = await get_transaction_by_external_id(
db, payment.telegram_payment_charge_id, PaymentMethod.TELEGRAM_STARS
)
if existing_transaction:
logger.info(f"Stars платеж {payment.telegram_payment_charge_id} уже обработан")
return
user = await get_user_by_id(db, user_id)
if user:
await add_user_balance(
db, user, amount_kopeks,
f"Пополнение через Telegram Stars"
)
await create_transaction(
db=db,
user_id=user.id,
@@ -112,7 +111,7 @@ async def handle_successful_payment(message: types.Message):
payment_method=PaymentMethod.TELEGRAM_STARS,
external_id=payment.telegram_payment_charge_id
)
await message.answer(
f"✅ Баланс успешно пополнен на {settings.format_price(amount_kopeks)}!\n\n"
"⚠️ <b>Важно:</b> Пополнение баланса не активирует подписку автоматически. "
@@ -120,14 +119,14 @@ async def handle_successful_payment(message: types.Message):
f"🔄 При наличии сохранённой корзины подписки и включенной автопокупке, "
f"подписка будет приобретена автоматически после пополнения баланса."
)
logger.info(f"✅ Обработан Stars платеж: {payment.telegram_payment_charge_id}")
await db.commit()
except Exception as e:
logger.error(f"Ошибка обработки Stars платежа: {e}")
await db.rollback()
finally:
break
except Exception as e:
logger.error(f"Ошибка в обработчике Stars платежа: {e}")

View File

@@ -207,15 +207,14 @@ def get_texts(language: str = DEFAULT_LANGUAGE) -> Texts:
async def get_rules_from_db(language: str = DEFAULT_LANGUAGE) -> str:
try:
from app.database.database import get_db
from app.database.database import AsyncSessionLocal
from app.database.crud.rules import get_current_rules_content
async for db in get_db():
async with AsyncSessionLocal() as db:
rules = await get_current_rules_content(db, language)
if rules:
_cached_rules[language] = rules
return rules
break
except Exception as error: # pragma: no cover - defensive logging
_logger.warning("Failed to load rules from DB for %s: %s", language, error)

View File

@@ -7,7 +7,7 @@ from aiogram.types import Message, CallbackQuery, TelegramObject, User as TgUser
from aiogram.fsm.context import FSMContext
from app.config import settings
from app.database.database import get_db
from app.database.database import AsyncSessionLocal
from app.database.crud.user import get_user_by_telegram_id, create_user
from app.services.remnawave_service import RemnaWaveService
from app.states import RegistrationStates
@@ -54,26 +54,26 @@ class AuthMiddleware(BaseMiddleware):
if user.is_bot:
return await handler(event, data)
async for db in get_db():
async with AsyncSessionLocal() as db:
try:
db_user = await get_user_by_telegram_id(db, user.id)
if not db_user:
state: FSMContext = data.get('state')
current_state = None
if state:
current_state = await state.get_state()
is_reg_process = is_registration_process(event, current_state)
is_channel_check = (isinstance(event, CallbackQuery)
is_channel_check = (isinstance(event, CallbackQuery)
and event.data == "sub_channel_check")
is_start_command = (isinstance(event, Message)
and event.text
is_start_command = (isinstance(event, Message)
and event.text
and event.text.startswith('/start'))
if is_reg_process or is_channel_check or is_start_command:
if is_start_command:
logger.info(f"🚀 Пропускаем команду /start от пользователя {user.id}")
@@ -84,7 +84,9 @@ class AuthMiddleware(BaseMiddleware):
data['db'] = db
data['db_user'] = None
data['is_admin'] = False
return await handler(event, data)
result = await handler(event, data)
await db.commit()
return result
else:
if isinstance(event, Message):
await event.answer(
@@ -99,7 +101,7 @@ class AuthMiddleware(BaseMiddleware):
return
else:
from app.database.models import UserStatus
if db_user.status == UserStatus.BLOCKED.value:
if isinstance(event, Message):
await event.answer("🚫 Ваш аккаунт заблокирован администратором.")
@@ -107,14 +109,14 @@ class AuthMiddleware(BaseMiddleware):
await event.answer("🚫 Ваш аккаунт заблокирован администратором.", show_alert=True)
logger.info(f"🚫 Заблокированный пользователь {user.id} попытался использовать бота")
return
if db_user.status == UserStatus.DELETED.value:
state: FSMContext = data.get('state')
current_state = None
if state:
current_state = await state.get_state()
registration_states = [
RegistrationStates.waiting_for_language.state,
RegistrationStates.waiting_for_rules_accept.state,
@@ -134,13 +136,15 @@ class AuthMiddleware(BaseMiddleware):
)
)
)
if is_start_or_registration:
logger.info(f"🔄 Удаленный пользователь {user.id} начинает повторную регистрацию")
data['db'] = db
data['db_user'] = None
data['db_user'] = None
data['is_admin'] = False
return await handler(event, data)
result = await handler(event, data)
await db.commit()
return result
else:
if isinstance(event, Message):
await event.answer(
@@ -154,16 +158,16 @@ class AuthMiddleware(BaseMiddleware):
)
logger.info(f"❌ Удаленный пользователь {user.id} попытался использовать бота без /start")
return
profile_updated = False
if db_user.username != user.username:
old_username = db_user.username
db_user.username = user.username
logger.info(f"🔄 [Middleware] Username обновлен для {user.id}: '{old_username}''{db_user.username}'")
profile_updated = True
safe_first = sanitize_telegram_name(user.first_name)
safe_last = sanitize_telegram_name(user.last_name)
if db_user.first_name != safe_first:
@@ -171,13 +175,13 @@ class AuthMiddleware(BaseMiddleware):
db_user.first_name = safe_first
logger.info(f"🔄 [Middleware] Имя обновлено для {user.id}: '{old_first_name}''{db_user.first_name}'")
profile_updated = True
if db_user.last_name != safe_last:
old_last_name = db_user.last_name
db_user.last_name = safe_last
logger.info(f"🔄 [Middleware] Фамилия обновлена для {user.id}: '{old_last_name}''{db_user.last_name}'")
profile_updated = True
db_user.last_activity = datetime.utcnow()
if profile_updated:
@@ -198,14 +202,14 @@ class AuthMiddleware(BaseMiddleware):
)
)
await db.commit()
data['db'] = db
data['db_user'] = db_user
data['is_admin'] = settings.is_admin(user.id)
return await handler(event, data)
result = await handler(event, data)
await db.commit()
return result
except Exception as e:
logger.error(f"Ошибка в AuthMiddleware: {e}")
logger.error(f"Event type: {type(event)}")

View File

@@ -9,7 +9,7 @@ from aiogram.enums import ChatMemberStatus
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import settings
from app.database.database import get_db
from app.database.database import AsyncSessionLocal
from app.database.crud.campaign import get_campaign_by_start_parameter
from app.database.crud.subscription import deactivate_subscription, reactivate_subscription
from app.database.crud.user import get_user_by_telegram_id
@@ -228,7 +228,7 @@ class ChannelCheckerMiddleware(BaseMiddleware):
if state_data.get("campaign_notification_sent"):
return
async for db in get_db():
async with AsyncSessionLocal() as db:
try:
campaign = await get_campaign_by_start_parameter(
db,
@@ -236,7 +236,7 @@ class ChannelCheckerMiddleware(BaseMiddleware):
only_active=True,
)
if not campaign:
break
return
user = await get_user_by_telegram_id(db, telegram_user.id)
@@ -249,14 +249,14 @@ class ChannelCheckerMiddleware(BaseMiddleware):
)
if sent:
await state.update_data(campaign_notification_sent=True)
await db.commit()
except Exception as error:
logger.error(
"❌ Ошибка отправки уведомления о переходе по кампании %s: %s",
payload,
error,
)
finally:
break
await db.rollback()
async def _deactivate_subscription_on_unsubscribe(
self, telegram_id: int, bot: Bot, channel_link: Optional[str]
@@ -265,21 +265,21 @@ class ChannelCheckerMiddleware(BaseMiddleware):
if not settings.CHANNEL_DISABLE_TRIAL_ON_UNSUBSCRIBE and not settings.CHANNEL_REQUIRED_FOR_ALL:
return
async for db in get_db():
async with AsyncSessionLocal() as db:
try:
user = await get_user_by_telegram_id(db, telegram_id)
if not user or not user.subscription:
break
return
subscription = user.subscription
if subscription.status != SubscriptionStatus.ACTIVE.value:
break
return
if settings.CHANNEL_REQUIRED_FOR_ALL:
pass
elif not subscription.is_trial:
break
return
await deactivate_subscription(db, subscription)
sub_type = "Триальная" if subscription.is_trial else "Платная"
@@ -316,35 +316,35 @@ class ChannelCheckerMiddleware(BaseMiddleware):
telegram_id,
notify_error,
)
await db.commit()
except Exception as db_error:
logger.error(
"❌ Ошибка деактивации подписки пользователя %s после отписки: %s",
telegram_id,
db_error,
)
finally:
break
await db.rollback()
async def _reactivate_subscription_on_subscribe(self, telegram_id: int, bot: Bot) -> None:
"""Реактивация подписки после повторной подписки на канал."""
if not settings.CHANNEL_DISABLE_TRIAL_ON_UNSUBSCRIBE and not settings.CHANNEL_REQUIRED_FOR_ALL:
return
async for db in get_db():
async with AsyncSessionLocal() as db:
try:
user = await get_user_by_telegram_id(db, telegram_id)
if not user or not user.subscription:
break
return
subscription = user.subscription
# Реактивируем только DISABLED подписки
if subscription.status != SubscriptionStatus.DISABLED.value:
break
return
# Проверяем что подписка ещё не истекла
if subscription.end_date and subscription.end_date <= datetime.utcnow():
break
return
# Реактивируем в БД
await reactivate_subscription(db, subscription)
@@ -382,14 +382,14 @@ class ChannelCheckerMiddleware(BaseMiddleware):
telegram_id,
notify_error,
)
await db.commit()
except Exception as db_error:
logger.error(
"❌ Ошибка реактивации подписки пользователя %s: %s",
telegram_id,
db_error,
)
finally:
break
await db.rollback()
@staticmethod
async def _deny_message(

View File

@@ -19,7 +19,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.config import settings
from app.database.database import get_db, engine
from app.database.database import AsyncSessionLocal, engine
from app.database.models import (
User, Subscription, Transaction, PromoCode, PromoCodeUse,
ReferralEarning, Squad, ServiceRule, SystemSetting, MonitoringLog,
@@ -475,7 +475,7 @@ class BackupService:
backup_data: Dict[str, List[Dict[str, Any]]] = {}
total_records = 0
async for db in get_db():
async with AsyncSessionLocal() as db:
try:
for model in models_to_backup:
table_name = model.__tablename__
@@ -533,10 +533,6 @@ class BackupService:
except Exception as exc:
logger.error("Ошибка при экспорте данных: %s", exc)
raise exc
finally:
await db.close()
return backup_data, {}, total_records, len(models_to_backup)
async def _collect_files(self, staging_dir: Path, include_logs: bool) -> List[Dict[str, Any]]:
files_info: List[Dict[str, Any]] = []
@@ -815,7 +811,7 @@ class BackupService:
restored_records = 0
restored_tables = 0
async for db in get_db():
async with AsyncSessionLocal() as db:
try:
if clear_existing:
logger.warning("🗑️ Очищаем существующие данные...")
@@ -903,14 +899,10 @@ class BackupService:
await db.commit()
break
except Exception as exc:
await db.rollback()
logger.error("Ошибка при восстановлении: %s", exc)
raise exc
finally:
await db.close()
return restored_tables, restored_records

View File

@@ -13,7 +13,7 @@ from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import settings
from app.database.database import get_db
from app.database.database import AsyncSessionLocal
from app.database.crud.subscription import (
get_daily_subscriptions_for_charge,
update_daily_charge_time,
@@ -65,25 +65,30 @@ class DailySubscriptionService:
}
try:
async for db in get_db():
subscriptions = await get_daily_subscriptions_for_charge(db)
stats["checked"] = len(subscriptions)
async with AsyncSessionLocal() as db:
try:
subscriptions = await get_daily_subscriptions_for_charge(db)
stats["checked"] = len(subscriptions)
for subscription in subscriptions:
try:
result = await self._process_single_charge(db, subscription)
if result == "charged":
stats["charged"] += 1
elif result == "suspended":
stats["suspended"] += 1
elif result == "error":
for subscription in subscriptions:
try:
result = await self._process_single_charge(db, subscription)
if result == "charged":
stats["charged"] += 1
elif result == "suspended":
stats["suspended"] += 1
elif result == "error":
stats["errors"] += 1
except Exception as e:
logger.error(
f"Ошибка обработки суточной подписки {subscription.id}: {e}",
exc_info=True
)
stats["errors"] += 1
except Exception as e:
logger.error(
f"Ошибка обработки суточной подписки {subscription.id}: {e}",
exc_info=True
)
stats["errors"] += 1
await db.commit()
except Exception as e:
logger.error(f"Ошибка при обработке подписок: {e}", exc_info=True)
await db.rollback()
except Exception as e:
logger.error(f"Ошибка при получении подписок для списания: {e}", exc_info=True)
@@ -272,35 +277,40 @@ class DailySubscriptionService:
from app.database.models import TrafficPurchase
try:
async for db in get_db():
# Находим все истекшие докупки
now = datetime.utcnow()
query = (
select(TrafficPurchase)
.where(TrafficPurchase.expires_at <= now)
)
result = await db.execute(query)
expired_purchases = result.scalars().all()
stats["checked"] = len(expired_purchases)
async with AsyncSessionLocal() as db:
try:
# Находим все истекшие докупки
now = datetime.utcnow()
query = (
select(TrafficPurchase)
.where(TrafficPurchase.expires_at <= now)
)
result = await db.execute(query)
expired_purchases = result.scalars().all()
stats["checked"] = len(expired_purchases)
# Группируем по подпискам для обновления
subscriptions_to_update = {}
for purchase in expired_purchases:
if purchase.subscription_id not in subscriptions_to_update:
subscriptions_to_update[purchase.subscription_id] = []
subscriptions_to_update[purchase.subscription_id].append(purchase)
# Группируем по подпискам для обновления
subscriptions_to_update = {}
for purchase in expired_purchases:
if purchase.subscription_id not in subscriptions_to_update:
subscriptions_to_update[purchase.subscription_id] = []
subscriptions_to_update[purchase.subscription_id].append(purchase)
# Удаляем истекшие докупки и обновляем подписки
for subscription_id, purchases in subscriptions_to_update.items():
try:
await self._reset_subscription_traffic(db, subscription_id, purchases)
stats["reset"] += len(purchases)
except Exception as e:
logger.error(
f"Ошибка сброса трафика подписки {subscription_id}: {e}",
exc_info=True
)
stats["errors"] += 1
# Удаляем истекшие докупки и обновляем подписки
for subscription_id, purchases in subscriptions_to_update.items():
try:
await self._reset_subscription_traffic(db, subscription_id, purchases)
stats["reset"] += len(purchases)
except Exception as e:
logger.error(
f"Ошибка сброса трафика подписки {subscription_id}: {e}",
exc_info=True
)
stats["errors"] += 1
await db.commit()
except Exception as e:
logger.error(f"Ошибка при обработке сброса трафика: {e}", exc_info=True)
await db.rollback()
except Exception as e:
logger.error(f"Ошибка при получении подписок для сброса трафика: {e}", exc_info=True)

View File

@@ -12,7 +12,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.config import settings
from app.database.database import get_db
from app.database.database import AsyncSessionLocal
from app.database.crud.discount_offer import (
deactivate_expired_offers,
get_latest_claimed_offer_for_user,
@@ -190,7 +190,7 @@ class MonitoringService:
pass
async def _monitoring_cycle(self):
async for db in get_db():
async with AsyncSessionLocal() as db:
try:
await self._cleanup_notification_cache()
@@ -219,23 +219,26 @@ class MonitoringService:
await self._process_autopayments(db)
await self._cleanup_inactive_users(db)
await self._sync_with_remnawave(db)
await self._log_monitoring_event(
db, "monitoring_cycle_completed",
"Цикл мониторинга успешно завершен",
db, "monitoring_cycle_completed",
"Цикл мониторинга успешно завершен",
{"timestamp": datetime.utcnow().isoformat()}
)
await db.commit()
except Exception as e:
logger.error(f"Ошибка в цикле мониторинга: {e}")
await self._log_monitoring_event(
db, "monitoring_cycle_error",
f"Ошибка в цикле мониторинга: {str(e)}",
{"error": str(e)},
is_success=False
)
finally:
break
try:
await self._log_monitoring_event(
db, "monitoring_cycle_error",
f"Ошибка в цикле мониторинга: {str(e)}",
{"error": str(e)},
is_success=False
)
except Exception:
pass
await db.rollback()
async def _cleanup_notification_cache(self):
current_time = datetime.utcnow()
@@ -1724,11 +1727,13 @@ class MonitoringService:
interval_seconds = 60
while self.is_running:
try:
async for db in get_db():
async with AsyncSessionLocal() as db:
try:
await self._check_ticket_sla(db)
finally:
break
await db.commit()
except Exception as e:
logger.error(f"Ошибка в SLA-проверке: {e}")
await db.rollback()
except asyncio.CancelledError:
break
except Exception as e: