Files
remnawave-bedolaga-telegram…/subscription_monitor.py
2025-08-05 03:05:49 +03:00

504 lines
23 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Subscription Monitor Service
Сервис для мониторинга подписок, уведомлений пользователей и предложений продления
"""
import asyncio
import logging
from datetime import datetime, timedelta
from typing import List, Optional
from dataclasses import dataclass
import traceback
from aiogram import Bot
from aiogram.types import InlineKeyboardMarkup, InlineKeyboardButton
from database import Database, UserSubscription, Subscription, User
from remnawave_api import RemnaWaveAPI
from translations import t
from keyboards import extend_subscription_keyboard, main_menu_keyboard
from utils import format_datetime, log_user_action
from config import Config
# Настройка логирования
logger = logging.getLogger(__name__)
@dataclass
class NotificationResult:
"""Результат отправки уведомления"""
success: bool
user_id: int
message: str
error: Optional[str] = None
class SubscriptionMonitorService:
"""Сервис мониторинга подписок"""
def __init__(self, bot: Bot, db: Database, config: Config, api: Optional[RemnaWaveAPI] = None):
self.bot = bot
self.db = db
self.config = config
self.api = api
self.is_running = False
self._monitor_task: Optional[asyncio.Task] = None
# Настройки уведомлений из конфига
self.WARNING_DAYS = config.MONITOR_WARNING_DAYS # За сколько дней предупреждать
self.CHECK_INTERVAL = config.MONITOR_CHECK_INTERVAL # Интервал проверки (в секундах)
self.DAILY_CHECK_HOUR = config.MONITOR_DAILY_CHECK_HOUR # В какой час дня делать основную
async def start(self):
"""Запуск сервиса мониторинга"""
if self.is_running:
logger.warning("Subscription monitor service is already running")
return
self.is_running = True
self._monitor_task = asyncio.create_task(self._monitor_loop())
logger.info("Subscription monitor service started")
async def stop(self):
"""Остановка сервиса мониторинга"""
if not self.is_running:
return
self.is_running = False
if self._monitor_task:
self._monitor_task.cancel()
try:
await self._monitor_task
except asyncio.CancelledError:
pass
logger.info("Subscription monitor service stopped")
async def _monitor_loop(self):
"""Основной цикл мониторинга"""
logger.info(f"Starting monitor loop with {self.CHECK_INTERVAL}s interval")
while self.is_running:
try:
current_time = datetime.utcnow()
# Основная проверка раз в день в определенное время
if current_time.hour == self.DAILY_CHECK_HOUR:
await self._daily_check()
# Дополнительная проверка каждый час для критических случаев
await self._hourly_check()
# Ожидание до следующей проверки
await asyncio.sleep(self.CHECK_INTERVAL)
except asyncio.CancelledError:
logger.info("Monitor loop cancelled")
break
except Exception as e:
logger.error(f"Error in monitor loop: {e}")
logger.error(traceback.format_exc())
# Продолжаем работу даже при ошибках
await asyncio.sleep(60) # Короткая пауза при ошибке
async def _daily_check(self):
"""Ежедневная проверка всех подписок"""
logger.info("Starting daily subscription check")
try:
# Получаем все активные подписки пользователей
all_users = await self.db.get_all_users()
total_notifications = 0
for user in all_users:
try:
user_subs = await self.db.get_user_subscriptions(user.telegram_id)
active_subs = [sub for sub in user_subs if sub.is_active]
for user_sub in active_subs:
# Проверяем каждую подписку
notification_sent = await self._check_and_notify_subscription(user, user_sub)
if notification_sent:
total_notifications += 1
# Небольшая пауза между уведомлениями
await asyncio.sleep(0.1)
except Exception as e:
logger.error(f"Error checking subscriptions for user {user.telegram_id}: {e}")
continue
logger.info(f"Daily check completed. Sent {total_notifications} notifications")
except Exception as e:
logger.error(f"Error in daily check: {e}")
logger.error(traceback.format_exc())
async def _hourly_check(self):
"""Часовая проверка критических подписок (истекают сегодня)"""
try:
now = datetime.utcnow()
tomorrow = now + timedelta(days=1)
# Получаем подписки, которые истекают в ближайшие 24 часа
all_users = await self.db.get_all_users()
for user in all_users:
try:
user_subs = await self.db.get_user_subscriptions(user.telegram_id)
for user_sub in user_subs:
if (user_sub.is_active and
user_sub.expires_at <= tomorrow and
user_sub.expires_at > now):
await self._check_and_notify_subscription(user, user_sub, urgent=True)
await asyncio.sleep(0.1)
except Exception as e:
logger.error(f"Error in hourly check for user {user.telegram_id}: {e}")
continue
except Exception as e:
logger.error(f"Error in hourly check: {e}")
async def _check_and_notify_subscription(self, user: User, user_sub: UserSubscription, urgent: bool = False) -> bool:
"""
Проверить подписку и отправить уведомление если нужно
Returns: True если уведомление было отправлено
"""
try:
now = datetime.utcnow()
days_until_expiry = (user_sub.expires_at - now).days
hours_until_expiry = (user_sub.expires_at - now).total_seconds() / 3600
# Получаем информацию о подписке
subscription = await self.db.get_subscription_by_id(user_sub.subscription_id)
if not subscription:
logger.warning(f"Subscription {user_sub.subscription_id} not found")
return False
notification_type = None
# Определяем тип уведомления
if user_sub.expires_at <= now:
# Подписка истекла
notification_type = "expired"
elif days_until_expiry <= 0 and hours_until_expiry <= 24:
# Истекает сегодня
notification_type = "expires_today"
elif days_until_expiry == 1:
# Истекает завтра
notification_type = "expires_tomorrow"
elif days_until_expiry == self.WARNING_DAYS:
# Предупреждение за 2 дня
notification_type = "warning"
elif urgent and days_until_expiry <= 1:
# Срочное уведомление
notification_type = "urgent"
if notification_type:
return await self._send_notification(user, user_sub, subscription, notification_type)
return False
except Exception as e:
logger.error(f"Error checking subscription {user_sub.id}: {e}")
return False
async def _send_notification(self, user: User, user_sub: UserSubscription,
subscription: Subscription, notification_type: str) -> bool:
"""Отправить уведомление пользователю"""
try:
# Проверяем, не является ли подписка тестовой (для тестовых другая логика)
if subscription.is_trial and notification_type in ["warning", "expires_tomorrow"]:
# Для тестовых подписок не предлагаем продление
return await self._send_trial_expiry_notification(user, user_sub, subscription, notification_type)
# Формируем текст уведомления
message_text = self._format_notification_message(user, user_sub, subscription, notification_type)
# Формируем клавиатуру
keyboard = self._create_notification_keyboard(user, user_sub, subscription, notification_type)
# Отправляем уведомление
await self.bot.send_message(
chat_id=user.telegram_id,
text=message_text,
reply_markup=keyboard,
parse_mode='Markdown'
)
# Логируем действие
log_user_action(user.telegram_id, f"notification_sent_{notification_type}", f"Sub: {subscription.name}")
logger.info(f"Sent {notification_type} notification to user {user.telegram_id} for subscription {subscription.name}")
return True
except Exception as e:
logger.error(f"Error sending notification to user {user.telegram_id}: {e}")
return False
async def _send_trial_expiry_notification(self, user: User, user_sub: UserSubscription,
subscription: Subscription, notification_type: str) -> bool:
"""Отправить уведомление об истечении тестовой подписки"""
try:
now = datetime.utcnow()
days_until_expiry = (user_sub.expires_at - now).days
hours_until_expiry = (user_sub.expires_at - now).total_seconds() / 3600
if notification_type == "expires_today" or hours_until_expiry <= 24:
message_text = (
f"⏰ *Ваша тестовая подписка истекает сегодня!*\n\n"
f"📋 Подписка: *{subscription.name}*\n"
f"⏳ Осталось: *{int(hours_until_expiry)} часов*\n\n"
f"💡 Чтобы продолжить пользоваться сервисом, приобретите полную подписку!"
)
elif notification_type == "expires_tomorrow" or days_until_expiry == 1:
message_text = (
f"⚠️ *Ваша тестовая подписка истекает завтра!*\n\n"
f"📋 Подписка: *{subscription.name}*\n"
f"📅 Истекает: *{format_datetime(user_sub.expires_at, user.language)}*\n\n"
f"💡 Не забудьте приобрести полную подписку, чтобы продолжить пользоваться сервисом!"
)
else:
return False
# Клавиатура для тестовой подписки
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(
text="💳 Купить подписку",
callback_data="buy_subscription"
)],
[InlineKeyboardButton(
text="💰 Пополнить баланс",
callback_data="topup_balance"
)],
[InlineKeyboardButton(
text="🏠 Главное меню",
callback_data="main_menu"
)]
])
await self.bot.send_message(
chat_id=user.telegram_id,
text=message_text,
reply_markup=keyboard,
parse_mode='Markdown'
)
log_user_action(user.telegram_id, f"trial_notification_sent_{notification_type}", f"Sub: {subscription.name}")
logger.info(f"Sent trial {notification_type} notification to user {user.telegram_id}")
return True
except Exception as e:
logger.error(f"Error sending trial notification to user {user.telegram_id}: {e}")
return False
def _format_notification_message(self, user: User, user_sub: UserSubscription,
subscription: Subscription, notification_type: str) -> str:
"""Форматировать текст уведомления"""
now = datetime.utcnow()
days_until_expiry = (user_sub.expires_at - now).days
hours_until_expiry = (user_sub.expires_at - now).total_seconds() / 3600
base_info = (
f"📋 Подписка: *{subscription.name}*\n"
f"📅 Истекает: *{format_datetime(user_sub.expires_at, user.language)}*\n"
f"💰 Цена продления: *{subscription.price} руб.*"
)
if notification_type == "expired":
return (
f"❌ *Ваша подписка истекла!*\n\n"
f"{base_info}\n\n"
f"🔄 Продлите подписку, чтобы продолжить пользоваться сервисом."
)
elif notification_type == "expires_today" or notification_type == "urgent":
return (
f"⏰ *Ваша подписка истекает сегодня!*\n\n"
f"{base_info}\n"
f"⏳ Осталось: *{int(hours_until_expiry)} часов*\n\n"
f"🔄 Продлите подписку прямо сейчас!"
)
elif notification_type == "expires_tomorrow":
return (
f"⚠️ *Ваша подписка истекает завтра!*\n\n"
f"{base_info}\n\n"
f"🔄 Рекомендуем продлить подписку заранее."
)
elif notification_type == "warning":
return (
f"📢 *Напоминание о подписке*\n\n"
f"{base_info}\n"
f"⏳ Осталось: *{days_until_expiry} дней*\n\n"
f"💡 Не забудьте продлить подписку вовремя!"
)
else:
return f"🔔 Уведомление о подписке *{subscription.name}*"
def _create_notification_keyboard(self, user: User, user_sub: UserSubscription,
subscription: Subscription, notification_type: str) -> InlineKeyboardMarkup:
"""Создать клавиатуру для уведомления"""
buttons = []
# Кнопка продления (только для не-тестовых подписок)
if not subscription.is_trial:
if user.balance >= subscription.price:
buttons.append([InlineKeyboardButton(
text=f"🔄 Продлить за {subscription.price} руб.",
callback_data=f"extend_sub_{user_sub.id}"
)])
else:
buttons.append([InlineKeyboardButton(
text=f"💰 Пополнить баланс (нужно {subscription.price - user.balance} руб.)",
callback_data="topup_balance"
)])
# Кнопка покупки новой подписки
buttons.append([InlineKeyboardButton(
text="💳 Купить подписку",
callback_data="buy_subscription"
)])
# Кнопка "Мои подписки"
buttons.append([InlineKeyboardButton(
text="📋 Мои подписки",
callback_data="my_subscriptions"
)])
# Кнопка главного меню
buttons.append([InlineKeyboardButton(
text="🏠 Главное меню",
callback_data="main_menu"
)])
return InlineKeyboardMarkup(inline_keyboard=buttons)
async def check_single_user(self, user_id: int) -> List[NotificationResult]:
"""Проверить подписки конкретного пользователя (для тестирования)"""
results = []
try:
user = await self.db.get_user_by_telegram_id(user_id)
if not user:
return [NotificationResult(False, user_id, "User not found")]
user_subs = await self.db.get_user_subscriptions(user_id)
for user_sub in user_subs:
if user_sub.is_active:
try:
sent = await self._check_and_notify_subscription(user, user_sub)
subscription = await self.db.get_subscription_by_id(user_sub.subscription_id)
sub_name = subscription.name if subscription else "Unknown"
results.append(NotificationResult(
success=sent,
user_id=user_id,
message=f"Subscription: {sub_name}, Sent: {sent}"
))
except Exception as e:
results.append(NotificationResult(
success=False,
user_id=user_id,
message=f"Error checking subscription {user_sub.id}",
error=str(e)
))
except Exception as e:
results.append(NotificationResult(
success=False,
user_id=user_id,
message="Error checking user",
error=str(e)
))
return results
async def get_service_status(self) -> dict:
"""Получить статус сервиса"""
return {
"is_running": self.is_running,
"check_interval": self.CHECK_INTERVAL,
"daily_check_hour": self.DAILY_CHECK_HOUR,
"warning_days": self.WARNING_DAYS,
"last_check": datetime.utcnow().isoformat() if self.is_running else None
}
async def force_daily_check(self):
"""Принудительно запустить ежедневную проверку"""
logger.info("Force starting daily check")
await self._daily_check()
async def deactivate_expired_subscriptions(self):
"""Деактивировать истекшие подписки"""
try:
now = datetime.utcnow()
all_users = await self.db.get_all_users()
deactivated_count = 0
for user in all_users:
user_subs = await self.db.get_user_subscriptions(user.telegram_id)
for user_sub in user_subs:
if user_sub.is_active and user_sub.expires_at <= now:
# Деактивируем подписку
user_sub.is_active = False
await self.db.update_user_subscription(user_sub)
# Деактивируем в RemnaWave если API доступно
if self.api and user_sub.short_uuid:
try:
remna_user_details = await self.api.get_user_by_short_uuid(user_sub.short_uuid)
if remna_user_details:
user_uuid = remna_user_details.get('uuid')
if user_uuid:
# Блокируем пользователя в RemnaWave
await self.api.update_user(user_uuid, {"enable": False})
logger.info(f"Disabled user {user_uuid} in RemnaWave")
except Exception as e:
logger.error(f"Failed to disable user in RemnaWave: {e}")
deactivated_count += 1
log_user_action(user.telegram_id, "subscription_expired", f"SubID: {user_sub.id}")
logger.info(f"Deactivated {deactivated_count} expired subscriptions")
return deactivated_count
except Exception as e:
logger.error(f"Error deactivating expired subscriptions: {e}")
return 0
# Функция для инициализации и запуска сервиса
async def create_subscription_monitor(bot: Bot, db: Database, config: Config,
api: Optional[RemnaWaveAPI] = None) -> SubscriptionMonitorService:
"""Создать и настроить сервис мониторинга подписок"""
service = SubscriptionMonitorService(bot, db, config, api)
return service
# Пример использования в основном файле бота
"""
from subscription_monitor import create_subscription_monitor
async def main():
# Инициализация бота, базы данных, конфига
bot = Bot(token=config.BOT_TOKEN)
db = Database(config.DATABASE_URL)
api = RemnaWaveAPI(config.REMNAWAVE_API_URL, config.REMNAWAVE_API_KEY)
# Создание и запуск сервиса мониторинга
monitor_service = await create_subscription_monitor(bot, db, config, api)
await monitor_service.start()
try:
# Запуск бота
await dp.start_polling(bot)
finally:
# Остановка сервиса при завершении
await monitor_service.stop()
if __name__ == "__main__":
asyncio.run(main())
"""