Files
remnawave-bedolaga-telegram…/app/services/monitoring_service.py

2140 lines
99 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
from datetime import UTC, datetime, timedelta
from pathlib import Path
from typing import Any
import structlog
from aiogram.exceptions import TelegramBadRequest, TelegramForbiddenError, TelegramNetworkError
from sqlalchemy import and_, or_, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.config import settings
from app.database.crud.discount_offer import (
deactivate_expired_offers,
upsert_discount_offer,
)
from app.database.crud.notification import (
clear_notification_by_type,
notification_sent,
record_notification,
)
from app.database.crud.subscription import (
deactivate_subscription,
extend_subscription,
get_expired_subscriptions,
get_expiring_subscriptions,
get_subscriptions_for_autopay,
)
from app.database.crud.user import (
cleanup_expired_promo_offer_discounts,
delete_user,
get_inactive_users,
get_user_by_id,
subtract_user_balance,
)
from app.database.database import AsyncSessionLocal
from app.database.models import (
MonitoringLog,
Subscription,
SubscriptionStatus,
Ticket,
TicketStatus,
User,
UserPromoGroup,
UserStatus,
)
from app.external.remnawave_api import (
RemnaWaveAPIError,
RemnaWaveUser,
TrafficLimitStrategy,
UserStatus as RemnaWaveUserStatus,
)
from app.localization.texts import get_texts
from app.services.notification_delivery_service import (
notification_delivery_service,
)
from app.services.notification_settings_service import NotificationSettingsService
from app.services.promo_offer_service import promo_offer_service
from app.services.subscription_service import SubscriptionService
from app.utils.cache import cache
from app.utils.message_patch import caption_exceeds_telegram_limit
from app.utils.miniapp_buttons import build_miniapp_or_callback_button
from app.utils.promo_offer import get_user_active_promo_discount_percent
from app.utils.subscription_utils import (
resolve_hwid_device_limit_for_payload,
)
from app.utils.timezone import format_local_datetime
# Кулдаун между повторными уведомлениями об автоплатеже с недостаточным балансом (6 часов)
AUTOPAY_INSUFFICIENT_BALANCE_COOLDOWN_SECONDS: int = 21600
# Размер батча для проверки подписок на каналы (keyset pagination)
_CHANNEL_CHECK_BATCH_SIZE: int = 100
logger = structlog.get_logger(__name__)
LOGO_PATH = Path(settings.LOGO_FILE)
class MonitoringService:
def __init__(self, bot=None):
self.is_running = False
self.subscription_service = SubscriptionService()
self.bot = bot
self._notified_users: set[str] = set()
self._last_cleanup = datetime.now(UTC)
self._sla_task = None
async def _send_message_with_logo(
self,
chat_id: int | None,
text: str,
reply_markup=None,
parse_mode: str | None = 'HTML',
user: User | None = None,
):
"""Отправляет сообщение, добавляя логотип при необходимости."""
if not self.bot:
raise RuntimeError('Bot instance is not available')
# Skip email-only users (no telegram_id)
if not chat_id:
logger.debug('Пропуск уведомления: chat_id не указан (email-пользователь)')
return None
# Skip blocked/deleted users to save Telegram rate limits
if user and user.status in (UserStatus.BLOCKED.value, UserStatus.DELETED.value):
logger.debug('Пропуск уведомления: пользователь недоступен', user_id=user.id, status=user.status)
return None
if (
settings.ENABLE_LOGO_MODE
and await asyncio.to_thread(LOGO_PATH.exists)
and not caption_exceeds_telegram_limit(text)
):
try:
from app.utils.message_patch import _cache_logo_file_id, get_logo_media
result = await self.bot.send_photo(
chat_id=chat_id,
photo=get_logo_media(),
caption=text,
reply_markup=reply_markup,
parse_mode=parse_mode,
)
_cache_logo_file_id(result)
return result
except TelegramBadRequest as exc:
logger.warning(
'Не удалось отправить сообщение с логотипом пользователю : . Отправляем текстовое сообщение.',
chat_id=chat_id,
exc=exc,
)
return await self.bot.send_message(
chat_id=chat_id,
text=text,
reply_markup=reply_markup,
parse_mode=parse_mode,
)
@staticmethod
def _is_unreachable_error(error: TelegramBadRequest) -> bool:
message = str(error).lower()
unreachable_markers = (
'chat not found',
'user is deactivated',
'bot was blocked by the user',
"bot can't initiate conversation",
"can't initiate conversation",
'user not found',
'peer id invalid',
)
return any(marker in message for marker in unreachable_markers)
async def _handle_unreachable_user(self, user: User, error: Exception, context: str) -> bool:
if isinstance(error, TelegramForbiddenError):
logger.warning('⚠️ Пользователь недоступен: бот заблокирован', telegram_id=user.telegram_id, context=context)
return True
if isinstance(error, TelegramBadRequest) and self._is_unreachable_error(error):
logger.warning('⚠️ Пользователь недоступен', telegram_id=user.telegram_id, context=context, error=error)
return True
return False
async def start_monitoring(self):
if self.is_running:
logger.warning('Мониторинг уже запущен')
return
self.is_running = True
logger.info('🔄 Запуск службы мониторинга')
# Start dedicated SLA loop with its own interval for timely 5-min checks
try:
if not self._sla_task or self._sla_task.done():
self._sla_task = asyncio.create_task(self._sla_loop())
except Exception as e:
logger.error('Не удалось запустить SLA-мониторинг', error=e)
while self.is_running:
try:
await self._monitoring_cycle()
await asyncio.sleep(settings.MONITORING_INTERVAL * 60)
except Exception as e:
logger.error('Ошибка в цикле мониторинга', error=e)
await asyncio.sleep(60)
def stop_monitoring(self):
self.is_running = False
logger.info(' Мониторинг остановлен')
try:
if self._sla_task and not self._sla_task.done():
self._sla_task.cancel()
except Exception:
pass
async def _monitoring_cycle(self):
async with AsyncSessionLocal() as db:
try:
await self._cleanup_notification_cache()
expired_offers = await deactivate_expired_offers(db)
if expired_offers:
logger.info('🧹 Деактивировано просроченных скидочных предложений', expired_offers=expired_offers)
expired_active_discounts = await cleanup_expired_promo_offer_discounts(db)
if expired_active_discounts:
logger.info(
'🧹 Сброшено активных скидок промо-предложений с истекшим сроком',
expired_active_discounts=expired_active_discounts,
)
cleaned_test_access = await promo_offer_service.cleanup_expired_test_access(db)
if cleaned_test_access:
logger.info(
'🧹 Отозвано истекших тестовых доступов к сквадам', cleaned_test_access=cleaned_test_access
)
# ВАЖНО: autopay ПЕРЕД check_expired — иначе подписки с автоплатой
# экспайрятся до того, как autopay успеет их продлить
# Продление с баланса работает всегда, если у подписки autopay_enabled=True
await self._process_autopayments(db)
# Рекуррентные автоплатежи с карты: требуют ENABLE_AUTOPAY + YOOKASSA_RECURRENT_ENABLED
if settings.ENABLE_AUTOPAY and settings.YOOKASSA_RECURRENT_ENABLED:
try:
from app.services.recurrent_payment_service import process_recurrent_payments
await process_recurrent_payments(db=db, bot=self.bot)
except Exception as recurrent_error:
logger.error(
'Ошибка рекуррентных автоплатежей',
error=recurrent_error,
exc_info=True,
)
await self._check_expired_subscriptions(db)
await self._check_expiring_subscriptions(db)
await self._check_trial_expiring_soon(db)
await self._check_trial_channel_subscriptions(db)
await self._check_expired_subscription_followups(db)
await self._retry_stuck_guest_purchases(db)
await self._cleanup_inactive_users(db)
await self._sync_with_remnawave(db)
await self._log_monitoring_event(
db,
'monitoring_cycle_completed',
'Цикл мониторинга успешно завершен',
{'timestamp': datetime.now(UTC).isoformat()},
)
await db.commit()
except Exception as e:
logger.error('Ошибка в цикле мониторинга', error=e)
try:
await self._log_monitoring_event(
db,
'monitoring_cycle_error',
f'Ошибка в цикле мониторинга: {e!s}',
{'error': str(e)},
is_success=False,
)
except Exception:
pass
await db.rollback()
async def _cleanup_notification_cache(self):
current_time = datetime.now(UTC)
if (current_time - self._last_cleanup).total_seconds() >= 3600:
old_count = len(self._notified_users)
self._notified_users.clear()
self._last_cleanup = current_time
logger.info('🧹 Очищен кеш уведомлений ( записей)', old_count=old_count)
async def _check_expired_subscriptions(self, db: AsyncSession):
try:
from app.database.crud.subscription import is_recently_updated_by_webhook
expired_subscriptions = await get_expired_subscriptions(db)
for subscription in expired_subscriptions:
if is_recently_updated_by_webhook(subscription):
logger.debug(
'Пропуск expire подписки : обновлена вебхуком недавно', subscription_id=subscription.id
)
continue
from app.database.crud.subscription import expire_subscription
await expire_subscription(db, subscription)
user = await get_user_by_id(db, subscription.user_id)
if user and self.bot:
await self._send_subscription_expired_notification(user)
logger.info(
"🔴 Подписка пользователя истекла и статус изменен на 'expired'", user_id=subscription.user_id
)
if expired_subscriptions:
await self._log_monitoring_event(
db,
'expired_subscriptions_processed',
f'Обработано {len(expired_subscriptions)} истёкших подписок',
{'count': len(expired_subscriptions)},
)
except Exception as e:
logger.error('Ошибка проверки истёкших подписок', error=e)
async def update_remnawave_user(self, db: AsyncSession, subscription: Subscription) -> RemnaWaveUser | None:
try:
from app.database.crud.subscription import is_recently_updated_by_webhook
if is_recently_updated_by_webhook(subscription):
logger.debug(
'Пропуск RemnaWave обновления подписки : обновлена вебхуком недавно',
subscription_id=subscription.id,
)
return None
user = await get_user_by_id(db, subscription.user_id)
if not user or not user.remnawave_uuid:
logger.error('RemnaWave UUID не найден для пользователя', user_id=subscription.user_id)
return None
# Обновляем subscription в сессии, чтобы избежать detached instance
# Загружаем tariff для определения внешнего сквада
try:
await db.refresh(subscription, ['tariff'])
except Exception:
pass
# Re-check guard after refresh (webhook could have committed between first check and refresh)
if is_recently_updated_by_webhook(subscription):
logger.debug(
'Пропуск RemnaWave обновления подписки : обновлена вебхуком недавно (после refresh)',
subscription_id=subscription.id,
)
return None
current_time = datetime.now(UTC)
is_active = subscription.status == SubscriptionStatus.ACTIVE.value and subscription.end_date > current_time
if subscription.status == SubscriptionStatus.ACTIVE.value and subscription.end_date <= current_time:
# Суточные подписки управляются DailySubscriptionService — не экспайрим
tariff = getattr(subscription, 'tariff', None)
is_active_daily = (
tariff is not None
and getattr(tariff, 'is_daily', False)
and not getattr(subscription, 'is_daily_paused', False)
)
if is_active_daily:
logger.debug(
'update_remnawave_user: пропуск expire для суточной подписки',
subscription_id=subscription.id,
)
else:
subscription.status = SubscriptionStatus.EXPIRED.value
await db.commit()
is_active = False
logger.info("📝 Статус подписки обновлен на 'expired'", subscription_id=subscription.id)
if not self.subscription_service.is_configured:
logger.warning(
'RemnaWave API не настроен. Пропускаем обновление пользователя', user_id=subscription.user_id
)
return None
async with self.subscription_service.get_api_client() as api:
hwid_limit = resolve_hwid_device_limit_for_payload(subscription)
update_kwargs = dict(
uuid=user.remnawave_uuid,
status=RemnaWaveUserStatus.ACTIVE if is_active else RemnaWaveUserStatus.DISABLED,
expire_at=subscription.end_date
if is_active
else max(subscription.end_date, current_time + timedelta(minutes=1)),
traffic_limit_bytes=self._gb_to_bytes(subscription.traffic_limit_gb),
traffic_limit_strategy=TrafficLimitStrategy.MONTH,
description=settings.format_remnawave_user_description(
full_name=user.full_name, username=user.username, telegram_id=user.telegram_id
),
)
# Не пересылаем activeInternalSquads в рутинном sync — сквады уже назначены
# при создании подписки, пересылка стейловых UUID вызывает FK violation → A039
if hwid_limit is not None:
update_kwargs['hwid_device_limit'] = hwid_limit
# Внешний сквад НЕ пересылаем в рутинном sync — стейловый UUID
# вызывает FK violation → A039. Назначается при создании подписки.
updated_user = await api.update_user(**update_kwargs)
subscription.subscription_url = updated_user.subscription_url
subscription.subscription_crypto_link = updated_user.happ_crypto_link
await db.commit()
status_text = 'активным' if is_active else 'истёкшим'
logger.info(
'✅ Обновлен RemnaWave пользователь со статусом',
remnawave_uuid=user.remnawave_uuid,
status_text=status_text,
)
return updated_user
except RemnaWaveAPIError as e:
logger.error('Ошибка обновления RemnaWave пользователя', error=e)
return None
except Exception as e:
logger.error('Ошибка обновления RemnaWave пользователя', error=e)
return None
async def _check_expiring_subscriptions(self, db: AsyncSession):
try:
warning_days = settings.get_autopay_warning_days()
all_processed_users = set()
for days in warning_days:
expiring_subscriptions = await self._get_expiring_paid_subscriptions(db, days)
sent_count = 0
# Batch-запрос: собираем user_id с autopay и проверяем наличие карт одним запросом
users_with_cards: set[int] = set()
if settings.ENABLE_AUTOPAY and settings.YOOKASSA_RECURRENT_ENABLED:
autopay_user_ids = [s.user_id for s in expiring_subscriptions if s.autopay_enabled]
if autopay_user_ids:
from app.database.crud.saved_payment_method import get_user_ids_with_active_payment_methods
users_with_cards = await get_user_ids_with_active_payment_methods(db, autopay_user_ids)
for subscription in expiring_subscriptions:
user = await get_user_by_id(db, subscription.user_id)
if not user:
continue
# Use user.id for key to support both Telegram and email users
user_key = f'user_{user.id}_today'
user_identifier = user.telegram_id or f'email:{user.id}'
if (
await notification_sent(db, user.id, subscription.id, 'expiring', days)
or user_key in all_processed_users
):
logger.debug(
'Уведомление уже отправлено, пропускаем',
user_identifier=user_identifier,
days=days,
)
continue
has_saved_card = subscription.autopay_enabled and user.id in users_with_cards
should_send = True
for other_days in warning_days:
if other_days < days:
other_subs = await self._get_expiring_paid_subscriptions(db, other_days)
if any(s.user_id == user.id for s in other_subs):
should_send = False
logger.debug(
'🎯 Пропускаем уведомление на дней для пользователя есть более срочное на дней',
days=days,
user_identifier=user_identifier,
other_days=other_days,
)
break
if not should_send:
continue
# Handle email-only users via notification delivery service
if not user.telegram_id:
success = await notification_delivery_service.notify_subscription_expiring(
user=user,
days_left=days,
expires_at=subscription.end_date,
)
if success:
await record_notification(db, user.id, subscription.id, 'expiring', days)
all_processed_users.add(user_key)
sent_count += 1
logger.info(
'✅ Email-пользователю отправлено уведомление об истечении подписки через дней',
user_id=user.id,
days=days,
)
continue
if self.bot:
success = await self._send_subscription_expiring_notification(
user, subscription, days, has_saved_card=has_saved_card
)
if success:
await record_notification(db, user.id, subscription.id, 'expiring', days)
all_processed_users.add(user_key)
sent_count += 1
logger.info(
'✅ Пользователю отправлено уведомление об истечении подписки через дней',
telegram_id=user.telegram_id,
days=days,
)
else:
logger.warning(
'Не удалось отправить уведомление пользователю', telegram_id=user.telegram_id
)
if sent_count > 0:
await self._log_monitoring_event(
db,
'expiring_notifications_sent',
f'Отправлено {sent_count} уведомлений об истечении через {days} дней',
{'days': days, 'count': sent_count},
)
except Exception as e:
logger.error('Ошибка проверки истекающих подписок', error=e)
async def _check_trial_expiring_soon(self, db: AsyncSession):
try:
threshold_time = datetime.now(UTC) + timedelta(hours=2)
result = await db.execute(
select(Subscription)
.join(Subscription.user)
.options(
selectinload(Subscription.user).selectinload(User.promo_group),
selectinload(Subscription.user)
.selectinload(User.user_promo_groups)
.selectinload(UserPromoGroup.promo_group),
)
.where(
and_(
Subscription.status == SubscriptionStatus.ACTIVE.value,
Subscription.is_trial == True,
Subscription.end_date <= threshold_time,
Subscription.end_date > datetime.now(UTC),
User.status == UserStatus.ACTIVE.value,
)
)
)
trial_expiring = result.scalars().all()
for subscription in trial_expiring:
user = subscription.user
if not user:
continue
if await notification_sent(db, user.id, subscription.id, 'trial_2h'):
continue
if self.bot:
success = await self._send_trial_ending_notification(user, subscription)
if success:
await record_notification(db, user.id, subscription.id, 'trial_2h')
logger.info(
'🎁 Пользователю отправлено уведомление об окончании тестовой подписки через 2 часа',
telegram_id=user.telegram_id,
)
if trial_expiring:
await self._log_monitoring_event(
db,
'trial_expiring_notifications_sent',
f'Отправлено {len(trial_expiring)} уведомлений об окончании тестовых подписок',
{'count': len(trial_expiring)},
)
except Exception as e:
logger.error('Ошибка проверки истекающих тестовых подписок', error=e)
async def _check_trial_channel_subscriptions(self, db: AsyncSession):
"""Background reconciliation of channel subscriptions (rate-limited).
Processes subscriptions in batches using keyset pagination to avoid
loading all trial subscriptions into memory at once. Each batch gets
a fresh DB session to avoid holding a connection pool slot for hours.
When CHANNEL_REQUIRED_FOR_ALL is True, checks ALL active subscriptions
(not just trials). Otherwise only checks trial subscriptions.
"""
from app.database.crud.subscription import is_active_paid_subscription, is_recently_updated_by_webhook
if not settings.CHANNEL_IS_REQUIRED_SUB:
return
if not settings.CHANNEL_DISABLE_TRIAL_ON_UNSUBSCRIBE and not settings.CHANNEL_REQUIRED_FOR_ALL:
logger.debug('Channel unsubscribe check disabled')
return
if not self.bot:
logger.debug('Skipping channel subscription check - bot unavailable')
return
from app.database.crud.required_channel import upsert_user_channel_sub
from app.services.channel_subscription_service import channel_subscription_service
from app.utils.cache import ChannelSubCache
channels = await channel_subscription_service.get_required_channels()
if not channels:
return
# Ensure bot is set on service
if not channel_subscription_service.bot:
channel_subscription_service.bot = self.bot
try:
now = datetime.now(UTC)
notifications_allowed = (
NotificationSettingsService.are_notifications_globally_enabled()
and NotificationSettingsService.is_trial_channel_unsubscribed_enabled()
)
disabled_count = 0
restored_count = 0
checked_count = 0
last_id = 0
# Build the trial/all filter based on CHANNEL_REQUIRED_FOR_ALL setting
from sqlalchemy import true as sa_true
is_trial_filter = sa_true() if settings.CHANNEL_REQUIRED_FOR_ALL else Subscription.is_trial.is_(True)
while True:
# Fresh session per batch to avoid long-running connections
async with AsyncSessionLocal() as batch_db:
result = await batch_db.execute(
select(Subscription)
.join(Subscription.user)
.options(
selectinload(Subscription.user),
selectinload(Subscription.tariff),
)
.where(
and_(
Subscription.id > last_id,
is_trial_filter,
Subscription.end_date > now,
Subscription.status.in_(
[
SubscriptionStatus.ACTIVE.value,
SubscriptionStatus.DISABLED.value,
]
),
User.status == UserStatus.ACTIVE.value,
)
)
.order_by(Subscription.id)
.limit(_CHANNEL_CHECK_BATCH_SIZE)
)
subscriptions = result.scalars().all()
if not subscriptions:
break
last_id = subscriptions[-1].id
for subscription in subscriptions:
user = subscription.user
if not user or not user.telegram_id:
continue
# Existing guard: skip if recently updated by webhook
if is_recently_updated_by_webhook(subscription):
logger.debug(
'Skipping subscription: recently updated by webhook',
subscription_id=subscription.id,
)
continue
checked_count += 1
# Rate-limited check for ALL channels
all_subscribed = True
for ch in channels:
is_member = await channel_subscription_service._rate_limited_check(
user.telegram_id, ch['channel_id']
)
# Update DB + cache
await upsert_user_channel_sub(batch_db, user.telegram_id, ch['channel_id'], is_member)
await ChannelSubCache.set_sub_status(user.telegram_id, ch['channel_id'], is_member)
if not is_member:
all_subscribed = False
# DEACTIVATE: was active, now not subscribed to all
if subscription.status == SubscriptionStatus.ACTIVE.value and not all_subscribed:
# Guard: always skip paid subscriptions (user paid money)
if is_active_paid_subscription(subscription):
continue
subscription = await deactivate_subscription(batch_db, subscription)
disabled_count += 1
logger.info(
'Subscription deactivated (channel unsubscribe)',
telegram_id=user.telegram_id,
subscription_id=subscription.id,
is_trial=subscription.is_trial,
)
if user.remnawave_uuid:
try:
await self.subscription_service.disable_remnawave_user(user.remnawave_uuid)
except Exception as api_error:
logger.error(
'Failed to disable RemnaWave user',
remnawave_uuid=user.remnawave_uuid,
api_error=api_error,
)
if notifications_allowed:
if not await notification_sent(
batch_db,
user.id,
subscription.id,
'trial_channel_unsubscribed',
):
sent = await self._send_trial_channel_unsubscribed_notification(user)
if sent:
await record_notification(
batch_db,
user.id,
subscription.id,
'trial_channel_unsubscribed',
)
# REACTIVATE: was disabled, now subscribed to all
elif subscription.status == SubscriptionStatus.DISABLED.value and all_subscribed:
# Guard: traffic limit exhausted
if (
subscription.traffic_limit_gb
and subscription.traffic_used_gb is not None
and subscription.traffic_used_gb >= subscription.traffic_limit_gb
):
logger.debug(
'Skipping reactivation: traffic exhausted',
subscription_id=subscription.id,
traffic_used=subscription.traffic_used_gb,
traffic_limit=subscription.traffic_limit_gb,
)
continue
# Guard: disabled by webhook, not by monitoring
if (
subscription.last_webhook_update_at
and subscription.updated_at
and subscription.last_webhook_update_at
>= subscription.updated_at - timedelta(seconds=10)
):
logger.debug(
'Skipping reactivation: disabled by RemnaWave panel',
subscription_id=subscription.id,
last_webhook_at=subscription.last_webhook_update_at,
updated_at=subscription.updated_at,
)
continue
subscription.status = SubscriptionStatus.ACTIVE.value
subscription.updated_at = datetime.now(UTC)
restored_count += 1
logger.info(
'Subscription restored (channel resubscribe)',
telegram_id=user.telegram_id,
subscription_id=subscription.id,
is_trial=subscription.is_trial,
)
try:
if user.remnawave_uuid:
await self.subscription_service.update_remnawave_user(batch_db, subscription)
else:
await self.subscription_service.create_remnawave_user(batch_db, subscription)
except Exception as api_error:
logger.error(
'Failed to update RemnaWave user',
telegram_id=user.telegram_id,
api_error=api_error,
)
await clear_notification_by_type(
batch_db,
subscription.id,
'trial_channel_unsubscribed',
)
# Commit all changes for this batch
await batch_db.commit()
if disabled_count or restored_count:
check_scope = 'all' if settings.CHANNEL_REQUIRED_FOR_ALL else 'trial'
await self._log_monitoring_event(
db,
'trial_channel_subscription_check',
(
f'Checked {checked_count} {check_scope} subscriptions: '
f'disabled {disabled_count}, restored {restored_count}'
),
{
'checked': checked_count,
'disabled': disabled_count,
'restored': restored_count,
'scope': check_scope,
},
)
except Exception as error:
logger.error('Error checking channel subscriptions', error=error)
async def _check_expired_subscription_followups(self, db: AsyncSession):
if not NotificationSettingsService.are_notifications_globally_enabled():
return
if not self.bot:
return
try:
now = datetime.now(UTC)
result = await db.execute(
select(Subscription)
.options(
selectinload(Subscription.user),
selectinload(Subscription.tariff),
)
.where(
and_(
Subscription.is_trial == False,
Subscription.end_date <= now,
)
)
)
all_subscriptions = result.scalars().all()
# Исключаем суточные тарифы - для них отдельная логика
subscriptions = [
sub for sub in all_subscriptions if not (sub.tariff and getattr(sub.tariff, 'is_daily', False))
]
sent_day1 = 0
sent_wave2 = 0
sent_wave3 = 0
for subscription in subscriptions:
user = subscription.user
if not user:
continue
if subscription.end_date is None:
continue
time_since_end = now - subscription.end_date
if time_since_end.total_seconds() < 0:
continue
days_since = time_since_end.total_seconds() / 86400
# Day 1 reminder
if NotificationSettingsService.is_expired_1d_enabled() and 1 <= days_since < 2:
if not await notification_sent(db, user.id, subscription.id, 'expired_1d'):
success = await self._send_expired_day1_notification(user, subscription)
if success:
await record_notification(db, user.id, subscription.id, 'expired_1d')
sent_day1 += 1
# Second wave (2-3 days) discount
if NotificationSettingsService.is_second_wave_enabled() and 2 <= days_since < 4:
if not await notification_sent(db, user.id, subscription.id, 'expired_discount_wave2'):
percent = NotificationSettingsService.get_second_wave_discount_percent()
valid_hours = NotificationSettingsService.get_second_wave_valid_hours()
offer = await upsert_discount_offer(
db,
user_id=user.id,
subscription_id=subscription.id,
notification_type='expired_discount_wave2',
discount_percent=percent,
bonus_amount_kopeks=0,
valid_hours=valid_hours,
effect_type='percent_discount',
)
success = await self._send_expired_discount_notification(
user,
subscription,
percent,
offer.expires_at,
offer.id,
'second',
)
if success:
await record_notification(db, user.id, subscription.id, 'expired_discount_wave2')
sent_wave2 += 1
# Third wave (N days) discount
if NotificationSettingsService.is_third_wave_enabled():
trigger_days = NotificationSettingsService.get_third_wave_trigger_days()
if trigger_days <= days_since < trigger_days + 1:
if not await notification_sent(db, user.id, subscription.id, 'expired_discount_wave3'):
percent = NotificationSettingsService.get_third_wave_discount_percent()
valid_hours = NotificationSettingsService.get_third_wave_valid_hours()
offer = await upsert_discount_offer(
db,
user_id=user.id,
subscription_id=subscription.id,
notification_type='expired_discount_wave3',
discount_percent=percent,
bonus_amount_kopeks=0,
valid_hours=valid_hours,
effect_type='percent_discount',
)
success = await self._send_expired_discount_notification(
user,
subscription,
percent,
offer.expires_at,
offer.id,
'third',
trigger_days=trigger_days,
)
if success:
await record_notification(db, user.id, subscription.id, 'expired_discount_wave3')
sent_wave3 += 1
if sent_day1 or sent_wave2 or sent_wave3:
await self._log_monitoring_event(
db,
'expired_followups_sent',
(f'Follow-ups: 1д={sent_day1}, скидка 2-3д={sent_wave2}, скидка N={sent_wave3}'),
{
'day1': sent_day1,
'wave2': sent_wave2,
'wave3': sent_wave3,
},
)
except Exception as e:
logger.error('Ошибка проверки напоминаний об истекшей подписке', error=e)
async def _get_expiring_paid_subscriptions(self, db: AsyncSession, days_before: int) -> list[Subscription]:
current_time = datetime.now(UTC)
threshold_date = current_time + timedelta(days=days_before)
result = await db.execute(
select(Subscription)
.options(
selectinload(Subscription.user),
selectinload(Subscription.tariff),
)
.where(
and_(
Subscription.status == SubscriptionStatus.ACTIVE.value,
Subscription.is_trial == False,
Subscription.end_date > current_time,
Subscription.end_date <= threshold_date,
)
)
)
logger.debug('🔍 Поиск платных подписок, истекающих в ближайшие дней', days_before=days_before)
logger.debug('📅 Текущее время', current_time=current_time)
logger.debug('📅 Пороговая дата', threshold_date=threshold_date)
all_subscriptions = result.scalars().all()
# Исключаем суточные тарифы - для них отдельная логика списания
subscriptions = [
sub for sub in all_subscriptions if not (sub.tariff and getattr(sub.tariff, 'is_daily', False))
]
excluded_count = len(all_subscriptions) - len(subscriptions)
if excluded_count > 0:
logger.debug('🔄 Исключено суточных подписок из уведомлений', excluded_count=excluded_count)
logger.info('📊 Найдено платных подписок для уведомлений', subscriptions_count=len(subscriptions))
return subscriptions
async def _process_autopayments(self, db: AsyncSession):
try:
current_time = datetime.now(UTC)
# Берём ACTIVE + недавно EXPIRED (middleware или check_and_update могли
# экспайрить до того, как monitoring успел запустить autopay)
recently_expired_threshold = current_time - timedelta(hours=2)
result = await db.execute(
select(Subscription)
.options(
selectinload(Subscription.user).options(
selectinload(User.promo_group),
selectinload(User.user_promo_groups).selectinload(UserPromoGroup.promo_group),
),
selectinload(Subscription.tariff),
)
.where(
and_(
or_(
Subscription.status == SubscriptionStatus.ACTIVE.value,
# Подписки, которые были экспайрены middleware/CRUD
# недавно (в пределах 2ч) — autopay может их восстановить
and_(
Subscription.status == SubscriptionStatus.EXPIRED.value,
Subscription.end_date >= recently_expired_threshold,
),
),
Subscription.autopay_enabled == True,
Subscription.is_trial == False,
)
)
)
all_autopay_subscriptions = result.scalars().all()
autopay_subscriptions = []
for sub in all_autopay_subscriptions:
# Суточные подписки имеют свой собственный механизм продления
# (DailySubscriptionService), глобальный autopay на них не распространяется
if sub.tariff and getattr(sub.tariff, 'is_daily', False):
logger.debug(
'Пропускаем суточную подписку (тариф) в глобальном autopay', sub_id=sub.id, name=sub.tariff.name
)
continue
days_before_expiry = (sub.end_date - current_time).days
if days_before_expiry <= min(sub.autopay_days_before or 3, 3):
autopay_subscriptions.append(sub)
processed_count = 0
failed_count = 0
for subscription in autopay_subscriptions:
from app.database.crud.subscription import is_recently_updated_by_webhook
if is_recently_updated_by_webhook(subscription):
logger.debug(
'Пропуск автоплатежа подписки : обновлена вебхуком недавно', subscription_id=subscription.id
)
continue
user = subscription.user
if not user:
continue
user_identifier = user.telegram_id or f'email:{user.id}'
# Определяем период продления: из тарифа (минимальный) или 30 дней по умолчанию
tariff = getattr(subscription, 'tariff', None)
if tariff:
autopay_period = tariff.get_shortest_period() or 30
else:
autopay_period = 30
try:
from app.database.crud.user import lock_user_for_pricing
from app.services.pricing_engine import pricing_engine
user = await lock_user_for_pricing(db, user.id)
pricing = await pricing_engine.calculate_renewal_price(
db,
subscription,
autopay_period,
user=user,
)
renewal_cost = pricing.final_total
except Exception as e:
logger.error(
'Ошибка расчёта стоимости автопродления, пропускаем',
subscription_id=subscription.id,
user_id=user.id,
error=str(e),
)
failed_count += 1
continue
if renewal_cost <= 0:
logger.warning(
'Нулевая стоимость автопродления, пропускаем',
subscription_id=subscription.id,
user_id=user.id,
renewal_cost=renewal_cost,
)
failed_count += 1
continue
# calculate_renewal_price уже включает promo_group + promo_offer скидки.
# Не применяем promo_offer повторно — только consume-им при успешной оплате.
charge_amount = renewal_cost
promo_discount_percent = get_user_active_promo_discount_percent(user)
autopay_key = f'autopay_{user.id}_{subscription.id}'
if autopay_key in self._notified_users:
continue
if user.balance_kopeks >= charge_amount:
success = await subtract_user_balance(
db,
user,
charge_amount,
'Автопродление подписки',
consume_promo_offer=promo_discount_percent > 0,
mark_as_paid_subscription=True,
)
if success:
# extend_subscription сам обработает EXPIRED→ACTIVE переход
# (проверяет status + end_date для определения was_expired)
if subscription.status == SubscriptionStatus.EXPIRED.value:
logger.info(
'🔄 Autopay: продление EXPIRED подписки (восстановление)',
subscription_id=subscription.id,
user_id=user.id,
)
old_end_date = subscription.end_date
await extend_subscription(db, subscription, autopay_period)
await self.subscription_service.update_remnawave_user(
db,
subscription,
reset_traffic=settings.RESET_TRAFFIC_ON_PAYMENT,
reset_reason='автопродление подписки',
)
# Создаём транзакцию, чтобы автопродление было видно в статистике и карточке пользователя
try:
from app.database.crud.transaction import create_transaction
from app.database.models import PaymentMethod, TransactionType
transaction = await create_transaction(
db=db,
user_id=user.id,
type=TransactionType.SUBSCRIPTION_PAYMENT,
amount_kopeks=charge_amount,
description=f'Автопродление подписки на {autopay_period} дней',
payment_method=PaymentMethod.BALANCE,
)
except Exception as exc:
logger.warning('Не удалось создать транзакцию автопродления', user_id=user.id, exc=exc)
transaction = None
# Отправляем уведомление администраторам
try:
from app.services.subscription_renewal_service import with_admin_notification_service
if transaction:
await with_admin_notification_service(
lambda svc: svc.send_subscription_extension_notification(
db,
user,
subscription,
transaction,
autopay_period,
old_end_date,
new_end_date=subscription.end_date,
balance_after=user.balance_kopeks,
)
)
except Exception as exc:
logger.warning(
'Не удалось отправить админ-уведомление об автопродлении', user_id=user.id, exc=exc
)
# Send notification via appropriate channel
if user.telegram_id and self.bot:
await self._send_autopay_success_notification(user, charge_amount, autopay_period)
elif not user.telegram_id:
# Email-only user - use notification delivery service
await notification_delivery_service.notify_autopay_success(
user=user,
amount_kopeks=charge_amount,
new_expires_at=subscription.end_date,
)
processed_count += 1
self._notified_users.add(autopay_key)
logger.info(
'💳 Автопродление подписки пользователя успешно (списано , скидка %)',
user_identifier=user_identifier,
charge_amount=charge_amount,
promo_discount_percent=promo_discount_percent,
)
else:
failed_count += 1
if user.telegram_id and self.bot:
await self._send_autopay_failed_notification(user, user.balance_kopeks, charge_amount)
elif not user.telegram_id:
await notification_delivery_service.notify_autopay_failed(
user=user,
reason='Ошибка списания средств',
)
logger.warning(
'💳 Ошибка списания средств для автопродления пользователя', user_identifier=user_identifier
)
else:
failed_count += 1
# Проверяем кулдаун уведомления через Redis, чтобы не спамить
# при каждом срабатывании мониторинга
cooldown_key = f'autopay_insufficient_balance_notified:{user.id}'
should_notify = True
try:
if await cache.exists(cooldown_key):
should_notify = False
logger.debug(
'💳 Пропуск уведомления о недостаточном балансе для пользователя — кулдаун активен',
user_identifier=user_identifier,
)
except Exception as redis_err:
# Fallback: если Redis недоступен — отправляем уведомление
logger.warning(
'⚠️ Ошибка проверки кулдауна в Redis для пользователя : . Отправляем уведомление.',
user_identifier=user_identifier,
redis_err=redis_err,
)
if should_notify:
if user.telegram_id and self.bot:
await self._send_autopay_failed_notification(user, user.balance_kopeks, charge_amount)
elif not user.telegram_id:
await notification_delivery_service.notify_autopay_failed(
user=user,
reason='Недостаточно средств на балансе',
)
# Ставим ключ кулдауна после отправки
try:
await cache.set(
cooldown_key,
1,
expire=AUTOPAY_INSUFFICIENT_BALANCE_COOLDOWN_SECONDS,
)
except Exception as redis_err:
logger.warning(
'⚠️ Не удалось установить кулдаун в Redis для пользователя',
user_identifier=user_identifier,
redis_err=redis_err,
)
logger.warning(
'💳 Недостаточно средств для автопродления у пользователя', user_identifier=user_identifier
)
if processed_count > 0 or failed_count > 0:
await self._log_monitoring_event(
db,
'autopayments_processed',
f'Автоплатежи: успешно {processed_count}, неудачно {failed_count}',
{'processed': processed_count, 'failed': failed_count},
)
except Exception as e:
logger.error('Ошибка обработки автоплатежей', error=e)
async def _send_subscription_expired_notification(self, user: User) -> bool:
try:
message = """
⛔ <b>Подписка истекла</b>
Ваша подписка истекла. Для восстановления доступа продлите подписку.
🔧 Доступ к серверам заблокирован до продления.
"""
from aiogram.types import InlineKeyboardMarkup
keyboard = InlineKeyboardMarkup(
inline_keyboard=[
[build_miniapp_or_callback_button(text='💎 Купить подписку', callback_data='menu_buy')],
[build_miniapp_or_callback_button(text='💳 Пополнить баланс', callback_data='balance_topup')],
]
)
await self._send_message_with_logo(
chat_id=user.telegram_id,
text=message,
parse_mode='HTML',
reply_markup=keyboard,
)
return True
except (TelegramForbiddenError, TelegramBadRequest) as exc:
if await self._handle_unreachable_user(user, exc, 'уведомление об истечении подписки'):
return True
logger.error(
'Ошибка Telegram API при отправке уведомления об истечении подписки пользователю',
telegram_id=user.telegram_id,
exc=exc,
)
return False
except Exception as e:
logger.error(
'Ошибка отправки уведомления об истечении подписки пользователю', telegram_id=user.telegram_id, e=e
)
return False
async def _send_subscription_expiring_notification(
self, user: User, subscription: Subscription, days: int, *, has_saved_card: bool = False
) -> bool:
try:
from app.utils.formatters import format_days_declension
texts = get_texts(user.language)
days_text = format_days_declension(days, user.language)
if subscription.autopay_enabled and has_saved_card:
autopay_status = texts.t(
'AUTOPAY_STATUS_CARD_ACTIVE',
'✅ Включен — будет автоматическое списание с карты',
)
action_text = texts.t(
'AUTOPAY_ACTION_CHECK_BALANCE',
'💰 Убедитесь, что на балансе достаточно средств: {balance}',
).format(balance=texts.format_price(user.balance_kopeks))
elif subscription.autopay_enabled:
autopay_status = texts.t(
'AUTOPAY_STATUS_NO_CARD',
'✅ Включен — подписка продлится автоматически',
)
action_text = texts.t(
'AUTOPAY_ACTION_CHECK_BALANCE',
'💰 Убедитесь, что на балансе достаточно средств: {balance}',
).format(balance=texts.format_price(user.balance_kopeks))
else:
autopay_status = texts.t(
'AUTOPAY_STATUS_OFF',
'❌ Отключен — не забудьте продлить вручную!',
)
if settings.ENABLE_AUTOPAY:
action_text = texts.t(
'AUTOPAY_ACTION_ENABLE',
'💡 Включите автоплатеж или продлите подписку вручную',
)
else:
action_text = texts.t(
'AUTOPAY_ACTION_RENEW',
'💡 Продлите подписку вручную',
)
end_date = format_local_datetime(subscription.end_date, '%d.%m.%Y %H:%M')
message = texts.t(
'SUBSCRIPTION_EXPIRING_PAID',
'\n⚠️ <b>Подписка истекает через {days_text}!</b>\n\n'
'Ваша платная подписка истекает {end_date}.\n\n'
'💳 <b>Автоплатеж:</b> {autopay_status}\n\n'
'{action_text}\n',
).format(
days_text=days_text,
end_date=end_date,
autopay_status=autopay_status,
action_text=action_text,
)
from aiogram.types import InlineKeyboardMarkup
keyboard = InlineKeyboardMarkup(
inline_keyboard=[
[
build_miniapp_or_callback_button(
text='⏰ Продлить подписку', callback_data='subscription_extend'
)
],
[build_miniapp_or_callback_button(text='💳 Пополнить баланс', callback_data='balance_topup')],
[build_miniapp_or_callback_button(text='📱 Моя подписка', callback_data='menu_subscription')],
]
)
await self._send_message_with_logo(
chat_id=user.telegram_id,
text=message,
parse_mode='HTML',
reply_markup=keyboard,
)
return True
except (TelegramForbiddenError, TelegramBadRequest) as exc:
if await self._handle_unreachable_user(user, exc, 'уведомление об истекающей подписке'):
return True
logger.error(
'Ошибка Telegram API при отправке уведомления об истечении подписки пользователю',
telegram_id=user.telegram_id,
exc=exc,
)
return False
except TelegramNetworkError as e:
logger.warning(
'Таймаут отправки уведомления об истечении подписки пользователю', telegram_id=user.telegram_id, e=e
)
return False
except Exception as e:
logger.error(
'Ошибка отправки уведомления об истечении подписки пользователю', telegram_id=user.telegram_id, e=e
)
return False
async def _send_trial_ending_notification(self, user: User, subscription: Subscription) -> bool:
try:
get_texts(user.language)
message = """
🎁 <b>Тестовая подписка скоро закончится!</b>
Ваша тестовая подписка истекает через 2 часа.
💎 <b>Не хотите остаться без VPN?</b>
Переходите на полную подписку!
⚡️ Успейте оформить до окончания тестового периода!
"""
from aiogram.types import InlineKeyboardMarkup
keyboard = InlineKeyboardMarkup(
inline_keyboard=[
[build_miniapp_or_callback_button(text='💎 Купить подписку', callback_data='menu_buy')],
[build_miniapp_or_callback_button(text='💰 Пополнить баланс', callback_data='balance_topup')],
]
)
await self._send_message_with_logo(
chat_id=user.telegram_id,
text=message,
parse_mode='HTML',
reply_markup=keyboard,
user=user,
)
return True
except (TelegramForbiddenError, TelegramBadRequest) as exc:
if await self._handle_unreachable_user(user, exc, 'уведомление о завершении тестовой подписки'):
return True
logger.error(
'Ошибка Telegram API при отправке уведомления о завершении тестовой подписки пользователю',
telegram_id=user.telegram_id,
exc=exc,
)
return False
except TelegramNetworkError as e:
logger.warning(
'Таймаут отправки уведомления об окончании тестовой подписки пользователю',
telegram_id=user.telegram_id,
e=e,
)
return False
except Exception as e:
logger.error(
'Ошибка отправки уведомления об окончании тестовой подписки пользователю',
telegram_id=user.telegram_id,
e=e,
)
return False
async def _send_trial_channel_unsubscribed_notification(self, user: User) -> bool:
try:
texts = get_texts(user.language)
template = texts.get(
'TRIAL_CHANNEL_UNSUBSCRIBED',
(
'🚫 <b>Доступ приостановлен</b>\n\n'
'Мы не нашли вашу подписку на наш канал, поэтому тестовая подписка отключена.\n\n'
'Подпишитесь на канал и нажмите «{check_button}», чтобы вернуть доступ.'
),
)
check_button = texts.t('CHANNEL_CHECK_BUTTON', '✅ Я подписался')
message = template.format(check_button=check_button)
from aiogram.types import InlineKeyboardButton, InlineKeyboardMarkup
from app.services.channel_subscription_service import channel_subscription_service
unsubscribed = await channel_subscription_service.get_unsubscribed_channels(user.telegram_id)
buttons = []
for ch in unsubscribed:
link = ch.get('channel_link')
if link:
title = ch.get('title') or texts.t('CHANNEL_SUBSCRIBE_BUTTON', '🔗 Подписаться')
buttons.append([InlineKeyboardButton(text=f'🔗 {title}', url=link)])
buttons.append(
[
InlineKeyboardButton(
text=check_button,
callback_data='sub_channel_check',
)
]
)
keyboard = InlineKeyboardMarkup(inline_keyboard=buttons)
await self._send_message_with_logo(
chat_id=user.telegram_id,
text=message,
parse_mode='HTML',
reply_markup=keyboard,
user=user,
)
return True
except (TelegramForbiddenError, TelegramBadRequest) as exc:
if await self._handle_unreachable_user(user, exc, 'уведомление об отписке от канала'):
return True
logger.error(
'Ошибка Telegram API при отправке уведомления об отписке от канала пользователю',
telegram_id=user.telegram_id,
exc=exc,
)
return False
except TelegramNetworkError as error:
logger.warning(
'Таймаут отправки уведомления об отписке от канала пользователю',
telegram_id=user.telegram_id,
error=error,
)
return False
except Exception as error:
logger.error(
'Ошибка отправки уведомления об отписке от канала пользователю',
telegram_id=user.telegram_id,
error=error,
)
return False
async def _send_expired_day1_notification(self, user: User, subscription: Subscription) -> bool:
try:
texts = get_texts(user.language)
template = texts.get(
'SUBSCRIPTION_EXPIRED_1D',
(
'⛔ <b>Подписка закончилась</b>\n\n'
'Доступ был отключён {end_date}. Продлите подписку, чтобы вернуться в сервис.'
),
)
message = template.format(
end_date=format_local_datetime(subscription.end_date, '%d.%m.%Y %H:%M'),
price=settings.format_price(settings.PRICE_30_DAYS),
)
from aiogram.types import InlineKeyboardButton, InlineKeyboardMarkup
keyboard = InlineKeyboardMarkup(
inline_keyboard=[
[
build_miniapp_or_callback_button(
text=texts.t('SUBSCRIPTION_EXTEND', '💎 Продлить подписку'),
callback_data='subscription_extend',
)
],
[
build_miniapp_or_callback_button(
text=texts.t('BALANCE_TOPUP', '💳 Пополнить баланс'),
callback_data='balance_topup',
)
],
[
InlineKeyboardButton(
text=texts.t('SUPPORT_BUTTON', '🆘 Поддержка'), callback_data='menu_support'
)
],
]
)
await self._send_message_with_logo(
chat_id=user.telegram_id,
text=message,
parse_mode='HTML',
reply_markup=keyboard,
)
return True
except (TelegramForbiddenError, TelegramBadRequest) as exc:
if await self._handle_unreachable_user(user, exc, 'напоминание об истекшей подписке'):
return True
logger.error(
'Ошибка Telegram API при отправке напоминания об истекшей подписке пользователю',
telegram_id=user.telegram_id,
exc=exc,
)
return False
except TelegramNetworkError as e:
logger.warning(
'Таймаут отправки напоминания об истекшей подписке пользователю', telegram_id=user.telegram_id, e=e
)
return False
except Exception as e:
logger.error(
'Ошибка отправки напоминания об истекшей подписке пользователю', telegram_id=user.telegram_id, e=e
)
return False
async def _send_expired_discount_notification(
self,
user: User,
subscription: Subscription,
percent: int,
expires_at: datetime,
offer_id: int,
wave: str,
trigger_days: int = None,
) -> bool:
try:
texts = get_texts(user.language)
if wave == 'second':
template = texts.get(
'SUBSCRIPTION_EXPIRED_SECOND_WAVE',
(
'🔥 <b>Скидка {percent}% на продление</b>\n\n'
'Активируйте предложение, чтобы получить дополнительную скидку. '
'Она суммируется с вашей промогруппой и действует до {expires_at}.'
),
)
else:
template = texts.get(
'SUBSCRIPTION_EXPIRED_THIRD_WAVE',
(
'🎁 <b>Индивидуальная скидка {percent}%</b>\n\n'
'Прошло {trigger_days} дней без подписки — возвращайтесь и активируйте дополнительную скидку. '
'Она суммируется с промогруппой и действует до {expires_at}.'
),
)
message = template.format(
percent=percent,
expires_at=format_local_datetime(expires_at, '%d.%m.%Y %H:%M'),
trigger_days=trigger_days or '',
)
from aiogram.types import InlineKeyboardButton, InlineKeyboardMarkup
keyboard = InlineKeyboardMarkup(
inline_keyboard=[
[
build_miniapp_or_callback_button(
text='🎁 Получить скидку', callback_data=f'claim_discount_{offer_id}'
)
],
[
build_miniapp_or_callback_button(
text=texts.t('SUBSCRIPTION_EXTEND', '💎 Продлить подписку'),
callback_data='subscription_extend',
)
],
[
build_miniapp_or_callback_button(
text=texts.t('BALANCE_TOPUP', '💳 Пополнить баланс'),
callback_data='balance_topup',
)
],
[
InlineKeyboardButton(
text=texts.t('SUPPORT_BUTTON', '🆘 Поддержка'), callback_data='menu_support'
)
],
]
)
await self._send_message_with_logo(
chat_id=user.telegram_id,
text=message,
parse_mode='HTML',
reply_markup=keyboard,
)
return True
except (TelegramForbiddenError, TelegramBadRequest) as exc:
if await self._handle_unreachable_user(user, exc, 'скидочное уведомление'):
return True
logger.error(
'Ошибка Telegram API при отправке скидочного уведомления пользователю',
telegram_id=user.telegram_id,
exc=exc,
)
return False
except TelegramNetworkError as e:
logger.warning('Таймаут отправки скидочного уведомления пользователю', telegram_id=user.telegram_id, e=e)
return False
except Exception as e:
logger.error('Ошибка отправки скидочного уведомления пользователю', telegram_id=user.telegram_id, e=e)
return False
async def _send_autopay_success_notification(self, user: User, amount: int, days: int):
try:
texts = get_texts(user.language)
message = texts.AUTOPAY_SUCCESS.format(days=days, amount=settings.format_price(amount))
await self._send_message_with_logo(
chat_id=user.telegram_id,
text=message,
parse_mode='HTML',
)
except (TelegramForbiddenError, TelegramBadRequest) as exc:
if not await self._handle_unreachable_user(user, exc, 'уведомление об успешном автоплатеже'):
logger.error(
'Ошибка Telegram API при отправке уведомления об автоплатеже пользователю',
telegram_id=user.telegram_id,
exc=exc,
)
except TelegramNetworkError as e:
logger.warning(
'Таймаут отправки уведомления об автоплатеже пользователю', telegram_id=user.telegram_id, e=e
)
except Exception as e:
logger.error('Ошибка отправки уведомления об автоплатеже пользователю', telegram_id=user.telegram_id, e=e)
async def _send_autopay_failed_notification(self, user: User, balance: int, required: int):
try:
texts = get_texts(user.language)
message = texts.AUTOPAY_FAILED.format(
balance=settings.format_price(balance), required=settings.format_price(required)
)
from aiogram.types import InlineKeyboardMarkup
keyboard = InlineKeyboardMarkup(
inline_keyboard=[
[build_miniapp_or_callback_button(text='💳 Пополнить баланс', callback_data='balance_topup')],
[build_miniapp_or_callback_button(text='📱 Моя подписка', callback_data='menu_subscription')],
]
)
await self._send_message_with_logo(
chat_id=user.telegram_id,
text=message,
parse_mode='HTML',
reply_markup=keyboard,
)
except (TelegramForbiddenError, TelegramBadRequest) as exc:
if not await self._handle_unreachable_user(user, exc, 'уведомление о неудачном автоплатеже'):
logger.error(
'Ошибка Telegram API при отправке уведомления о неудачном автоплатеже пользователю',
telegram_id=user.telegram_id,
exc=exc,
)
except TelegramNetworkError as e:
logger.warning(
'Таймаут отправки уведомления о неудачном автоплатеже пользователю', telegram_id=user.telegram_id, e=e
)
except Exception as e:
logger.error(
'Ошибка отправки уведомления о неудачном автоплатеже пользователю', telegram_id=user.telegram_id, e=e
)
async def _retry_stuck_guest_purchases(self, db: AsyncSession):
from app.services.guest_purchase_service import (
recover_stuck_pending_purchases,
retry_stuck_paid_purchases,
retry_stuck_pending_activation,
)
# Phase 1: Recover PENDING purchases where provider payment already succeeded
try:
recovered = await recover_stuck_pending_purchases(db, stale_minutes=10, limit=10)
if recovered:
logger.info('Recovered stuck PENDING purchases', recovered=recovered)
except Exception:
logger.error('Error recovering stuck PENDING guest purchases', exc_info=True)
# Phase 2: Retry fulfillment for purchases in PAID status
try:
retried = await retry_stuck_paid_purchases(db, stale_minutes=5, limit=10)
if retried:
logger.info('Retried stuck guest purchases', retried=retried)
except Exception:
logger.error('Error retrying stuck PAID guest purchases', exc_info=True)
# Phase 3: Retry activation for purchases in PENDING_ACTIVATION status
try:
retried_pa = await retry_stuck_pending_activation(db, stale_minutes=10, limit=10)
if retried_pa:
logger.info('Retried stuck pending_activation purchases', retried=retried_pa)
except Exception:
logger.error('Error retrying stuck PENDING_ACTIVATION guest purchases', exc_info=True)
async def _cleanup_inactive_users(self, db: AsyncSession):
try:
now = datetime.now(UTC)
if now.hour != 3:
return
inactive_users = await get_inactive_users(db, settings.INACTIVE_USER_DELETE_MONTHS)
deleted_count = 0
for user in inactive_users:
if not user.subscription or not user.subscription.is_active:
success = await delete_user(db, user)
if success:
deleted_count += 1
if deleted_count > 0:
await self._log_monitoring_event(
db,
'inactive_users_cleanup',
f'Удалено {deleted_count} неактивных пользователей',
{'deleted_count': deleted_count},
)
logger.info('🗑️ Удалено неактивных пользователей', deleted_count=deleted_count)
except Exception as e:
logger.error('Ошибка очистки неактивных пользователей', error=e)
async def _sync_with_remnawave(self, db: AsyncSession):
try:
now = datetime.now(UTC)
if now.minute != 0:
return
if not self.subscription_service.is_configured:
logger.warning('RemnaWave API не настроен. Пропускаем синхронизацию')
return
async with self.subscription_service.get_api_client() as api:
system_stats = await api.get_system_stats()
await self._log_monitoring_event(
db, 'remnawave_sync', 'Синхронизация с RemnaWave завершена', {'stats': system_stats}
)
except Exception as e:
logger.error('Ошибка синхронизации с RemnaWave', error=e)
await self._log_monitoring_event(
db,
'remnawave_sync_error',
f'Ошибка синхронизации с RemnaWave: {e!s}',
{'error': str(e)},
is_success=False,
)
async def _check_ticket_sla(self, db: AsyncSession):
try:
# Quick guards
# Allow runtime toggle from SupportSettingsService
try:
from app.services.support_settings_service import SupportSettingsService
sla_enabled_runtime = SupportSettingsService.get_sla_enabled()
except Exception:
sla_enabled_runtime = getattr(settings, 'SUPPORT_TICKET_SLA_ENABLED', True)
if not sla_enabled_runtime:
return
if not self.bot:
return
if not settings.is_admin_notifications_enabled():
return
try:
from app.services.support_settings_service import SupportSettingsService
sla_minutes = max(1, int(SupportSettingsService.get_sla_minutes()))
except Exception:
sla_minutes = max(1, int(getattr(settings, 'SUPPORT_TICKET_SLA_MINUTES', 5)))
cooldown_minutes = max(1, int(getattr(settings, 'SUPPORT_TICKET_SLA_REMINDER_COOLDOWN_MINUTES', 15)))
now = datetime.now(UTC)
stale_before = now - timedelta(minutes=sla_minutes)
cooldown_before = now - timedelta(minutes=cooldown_minutes)
# Tickets to remind: open, no admin reply yet after user's last message (status OPEN), stale by SLA,
# and either never reminded or cooldown passed
result = await db.execute(
select(Ticket)
.options(selectinload(Ticket.user))
.where(
and_(
Ticket.status == TicketStatus.OPEN.value,
Ticket.updated_at <= stale_before,
or_(Ticket.last_sla_reminder_at.is_(None), Ticket.last_sla_reminder_at <= cooldown_before),
)
)
)
tickets = result.scalars().all()
if not tickets:
return
from app.services.admin_notification_service import AdminNotificationService
reminders_sent = 0
service = AdminNotificationService(self.bot)
for ticket in tickets:
try:
waited_minutes = max(0, int((now - ticket.updated_at).total_seconds() // 60))
title = (ticket.title or '').strip()
if len(title) > 60:
title = title[:57] + '...'
# Детали пользователя: имя, Telegram ID и username
full_name = ticket.user.full_name if ticket.user else 'Unknown'
telegram_id_display = ticket.user.telegram_id if ticket.user else ''
username_display = (ticket.user.username or 'отсутствует') if ticket.user else 'отсутствует'
text = (
f'⏰ <b>Ожидание ответа на тикет превышено</b>\n\n'
f'🆔 <b>ID:</b> <code>{ticket.id}</code>\n'
f'👤 <b>Пользователь:</b> {full_name}\n'
f'🆔 <b>Telegram ID:</b> <code>{telegram_id_display}</code>\n'
f'📱 <b>Username:</b> @{username_display}\n'
f'📝 <b>Заголовок:</b> {title or ""}\n'
f'⏱️ <b>Ожидает ответа:</b> {waited_minutes} мин\n'
)
sent = await service.send_ticket_event_notification(text)
if sent:
ticket.last_sla_reminder_at = now
reminders_sent += 1
# commit after each to persist timestamp and avoid duplicate reminders on crash
await db.commit()
except Exception as notify_error:
logger.error(
'Ошибка отправки SLA-уведомления по тикету', ticket_id=ticket.id, notify_error=notify_error
)
if reminders_sent > 0:
await self._log_monitoring_event(
db,
'ticket_sla_reminders_sent',
f'Отправлено {reminders_sent} SLA-напоминаний по тикетам',
{'count': reminders_sent},
)
except Exception as e:
logger.error('Ошибка проверки SLA тикетов', error=e)
async def _sla_loop(self):
try:
interval_seconds = max(10, int(getattr(settings, 'SUPPORT_TICKET_SLA_CHECK_INTERVAL_SECONDS', 60)))
except Exception:
interval_seconds = 60
while self.is_running:
try:
async with AsyncSessionLocal() as db:
try:
await self._check_ticket_sla(db)
await db.commit()
except Exception as e:
logger.error('Ошибка в SLA-проверке', error=e)
await db.rollback()
except asyncio.CancelledError:
break
except Exception as e:
logger.error('Ошибка в SLA-цикле', error=e)
await asyncio.sleep(interval_seconds)
async def _log_monitoring_event(
self, db: AsyncSession, event_type: str, message: str, data: dict[str, Any] = None, is_success: bool = True
):
try:
log_entry = MonitoringLog(event_type=event_type, message=message, data=data or {}, is_success=is_success)
db.add(log_entry)
await db.commit()
except Exception as e:
logger.error('Ошибка логирования события мониторинга', error=e)
async def get_monitoring_status(self, db: AsyncSession) -> dict[str, Any]:
try:
from sqlalchemy import desc, select
recent_events_result = await db.execute(
select(MonitoringLog).order_by(desc(MonitoringLog.created_at)).limit(10)
)
recent_events = recent_events_result.scalars().all()
yesterday = datetime.now(UTC) - timedelta(days=1)
events_24h_result = await db.execute(select(MonitoringLog).where(MonitoringLog.created_at >= yesterday))
events_24h = events_24h_result.scalars().all()
successful_events = sum(1 for event in events_24h if event.is_success)
failed_events = sum(1 for event in events_24h if not event.is_success)
return {
'is_running': self.is_running,
'last_update': datetime.now(UTC),
'recent_events': [
{
'type': event.event_type,
'message': event.message,
'success': event.is_success,
'created_at': event.created_at,
}
for event in recent_events
],
'stats_24h': {
'total_events': len(events_24h),
'successful': successful_events,
'failed': failed_events,
'success_rate': round(successful_events / len(events_24h) * 100, 1) if events_24h else 0,
},
}
except Exception as e:
logger.error('Ошибка получения статуса мониторинга', error=e)
return {
'is_running': self.is_running,
'last_update': datetime.now(UTC),
'recent_events': [],
'stats_24h': {'total_events': 0, 'successful': 0, 'failed': 0, 'success_rate': 0},
}
async def force_check_subscriptions(self, db: AsyncSession) -> dict[str, int]:
from app.database.crud.subscription import is_recently_updated_by_webhook
try:
expired_subscriptions = await get_expired_subscriptions(db)
expired_count = 0
for subscription in expired_subscriptions:
if is_recently_updated_by_webhook(subscription):
logger.debug(
'Пропуск force-check подписки : обновлена вебхуком недавно', subscription_id=subscription.id
)
continue
await deactivate_subscription(db, subscription)
expired_count += 1
expiring_subscriptions = await get_expiring_subscriptions(db, 1)
expiring_count = len(expiring_subscriptions)
autopay_subscriptions = await get_subscriptions_for_autopay(db)
autopay_processed = 0
for subscription in autopay_subscriptions:
user = await get_user_by_id(db, subscription.user_id)
if user and user.balance_kopeks >= settings.PRICE_30_DAYS:
autopay_processed += 1
await self._log_monitoring_event(
db,
'manual_check_subscriptions',
f'Принудительная проверка: истекло {expired_count}, истекает {expiring_count}, автоплатежей {autopay_processed}',
{'expired': expired_count, 'expiring': expiring_count, 'autopay_ready': autopay_processed},
)
return {'expired': expired_count, 'expiring': expiring_count, 'autopay_ready': autopay_processed}
except Exception as e:
logger.error('Ошибка принудительной проверки подписок', error=e)
return {'expired': 0, 'expiring': 0, 'autopay_ready': 0}
async def get_monitoring_logs(
self, db: AsyncSession, limit: int = 50, event_type: str | None = None, page: int = 1, per_page: int = 20
) -> list[dict[str, Any]]:
try:
from sqlalchemy import desc, select
query = select(MonitoringLog).order_by(desc(MonitoringLog.created_at))
if event_type:
query = query.where(MonitoringLog.event_type == event_type)
if page > 1 or per_page != 20:
offset = (page - 1) * per_page
query = query.offset(offset).limit(per_page)
else:
query = query.limit(limit)
result = await db.execute(query)
logs = result.scalars().all()
return [
{
'id': log.id,
'event_type': log.event_type,
'message': log.message,
'data': log.data,
'is_success': log.is_success,
'created_at': log.created_at,
}
for log in logs
]
except Exception as e:
logger.error('Ошибка получения логов мониторинга', error=e)
return []
async def get_monitoring_logs_count(self, db: AsyncSession, event_type: str | None = None) -> int:
try:
from sqlalchemy import func, select
query = select(func.count(MonitoringLog.id))
if event_type:
query = query.where(MonitoringLog.event_type == event_type)
result = await db.execute(query)
count = result.scalar()
return count or 0
except Exception as e:
logger.error('Ошибка получения количества логов', error=e)
return 0
async def get_monitoring_event_types(self, db: AsyncSession) -> list[str]:
try:
from sqlalchemy import select
result = await db.execute(
select(MonitoringLog.event_type)
.where(MonitoringLog.event_type.isnot(None))
.distinct()
.order_by(MonitoringLog.event_type)
)
return [row[0] for row in result.fetchall() if row[0]]
except Exception as e:
logger.error('Ошибка получения списка типов событий мониторинга', error=e)
return []
async def cleanup_old_logs(self, db: AsyncSession, days: int = 30) -> int:
try:
from sqlalchemy import delete
if days == 0:
result = await db.execute(delete(MonitoringLog))
else:
cutoff_date = datetime.now(UTC) - timedelta(days=days)
result = await db.execute(delete(MonitoringLog).where(MonitoringLog.created_at < cutoff_date))
deleted_count = result.rowcount
await db.commit()
if days == 0:
logger.info('🗑️ Удалены все логи мониторинга ( записей)', deleted_count=deleted_count)
else:
logger.info('🗑️ Удалено старых записей логов (старше дней)', deleted_count=deleted_count, days=days)
return deleted_count
except Exception as e:
logger.error('Ошибка очистки логов', error=e)
await db.rollback()
return 0
monitoring_service = MonitoringService()