diff --git a/app/services/admin_notification_service.py b/app/services/admin_notification_service.py
index bd0da360..297f016e 100644
--- a/app/services/admin_notification_service.py
+++ b/app/services/admin_notification_service.py
@@ -1216,6 +1216,21 @@ class AdminNotificationService:
def _is_enabled(self) -> bool:
return self.enabled and bool(self.chat_id)
+ @property
+ def is_enabled(self) -> bool:
+ """Public check for whether admin notifications are configured and active."""
+ return self._is_enabled()
+
+ async def send_webhook_notification(self, text: str) -> bool:
+ """Send a generic webhook/infrastructure notification to admin chat.
+
+ Used by RemnaWaveWebhookService for node, service, and CRM events.
+ The caller is responsible for HTML-escaping all untrusted data in `text`.
+ """
+ if not self._is_enabled():
+ return False
+ return await self._send_message(text)
+
def _get_payment_method_display(self, payment_method: str | None) -> str:
if not payment_method:
return '💰 С баланса'
diff --git a/app/services/notification_delivery_service.py b/app/services/notification_delivery_service.py
index e971c74e..d8ac6895 100644
--- a/app/services/notification_delivery_service.py
+++ b/app/services/notification_delivery_service.py
@@ -68,6 +68,9 @@ class NotificationType(Enum):
WEBHOOK_SUB_EXPIRING = 'webhook_sub_expiring'
WEBHOOK_SUB_FIRST_CONNECTED = 'webhook_sub_first_connected'
WEBHOOK_SUB_BANDWIDTH_THRESHOLD = 'webhook_sub_bandwidth_threshold'
+ WEBHOOK_USER_NOT_CONNECTED = 'webhook_user_not_connected'
+ WEBHOOK_DEVICE_ADDED = 'webhook_device_added'
+ WEBHOOK_DEVICE_DELETED = 'webhook_device_deleted'
# Other
BROADCAST = 'broadcast'
diff --git a/app/services/remnawave_webhook_service.py b/app/services/remnawave_webhook_service.py
index f1cf542c..561beb7c 100644
--- a/app/services/remnawave_webhook_service.py
+++ b/app/services/remnawave_webhook_service.py
@@ -8,6 +8,7 @@ Admin events (node, service, crm) send alerts to the admin notification chat.
from __future__ import annotations
+import html
import logging
import re
from datetime import UTC, datetime
@@ -50,9 +51,9 @@ _TEXT_KEY_TO_NOTIFICATION_TYPE: dict[str, NotificationType] = {
'WEBHOOK_SUB_EXPIRED_24H_AGO': NotificationType.WEBHOOK_SUB_EXPIRED,
'WEBHOOK_SUB_FIRST_CONNECTED': NotificationType.WEBHOOK_SUB_FIRST_CONNECTED,
'WEBHOOK_SUB_BANDWIDTH_THRESHOLD': NotificationType.WEBHOOK_SUB_BANDWIDTH_THRESHOLD,
- 'WEBHOOK_USER_NOT_CONNECTED': NotificationType.WEBHOOK_SUB_FIRST_CONNECTED,
- 'WEBHOOK_DEVICE_ADDED': NotificationType.WEBHOOK_SUB_ENABLED,
- 'WEBHOOK_DEVICE_DELETED': NotificationType.WEBHOOK_SUB_DISABLED,
+ 'WEBHOOK_USER_NOT_CONNECTED': NotificationType.WEBHOOK_USER_NOT_CONNECTED,
+ 'WEBHOOK_DEVICE_ADDED': NotificationType.WEBHOOK_DEVICE_ADDED,
+ 'WEBHOOK_DEVICE_DELETED': NotificationType.WEBHOOK_DEVICE_DELETED,
}
# Admin event display names for notification messages
@@ -120,20 +121,28 @@ class RemnaWaveWebhookService:
**_ADMIN_CRM_EVENTS,
}
- async def process_event(self, db: AsyncSession, event_name: str, data: dict) -> bool:
+ def is_admin_event(self, event_name: str) -> bool:
+ """Check if the event is admin-scoped (no DB session needed)."""
+ return event_name in self._admin_handlers
+
+ async def process_event(self, db: AsyncSession | None, event_name: str, data: dict) -> bool:
"""Route event to the appropriate handler.
Returns True if the event was processed, False if skipped/unknown.
+ db may be None for admin events that don't require database access.
"""
- # Check user-scoped handlers first
- user_handler = self._user_handlers.get(event_name)
- if user_handler:
- return await self._process_user_event(db, event_name, data, user_handler)
-
- # Check admin-scoped handlers
+ # Check admin-scoped handlers (no DB needed)
if event_name in self._admin_handlers:
return await self._process_admin_event(event_name, data)
+ # Check user-scoped handlers (require DB session)
+ user_handler = self._user_handlers.get(event_name)
+ if user_handler:
+ if db is None:
+ logger.error('RemnaWave webhook: DB session required for user event %s', event_name)
+ return False
+ return await self._process_user_event(db, event_name, data, user_handler)
+
logger.debug('Unhandled RemnaWave webhook event: %s', event_name)
return False
@@ -162,52 +171,52 @@ class RemnaWaveWebhookService:
async def _process_admin_event(self, event_name: str, data: dict) -> bool:
"""Format and send admin notification for infrastructure events."""
- if not self._admin_service._is_enabled():
+ if not self._admin_service.is_enabled:
logger.debug('Admin notifications disabled, skipping event %s', event_name)
return True
title = self._admin_handlers.get(event_name, event_name)
- # Build message from event data
+ # Build message from event data (escape all untrusted values to prevent HTML injection)
lines = [f'{title}']
# Extract common fields
- name = data.get('name') or data.get('nodeName') or data.get('username') or ''
+ name = html.escape(data.get('name') or data.get('nodeName') or data.get('username') or '')
if name:
lines.append(f'Имя: {name}')
- address = data.get('address') or data.get('ip') or ''
+ address = html.escape(data.get('address') or data.get('ip') or '')
if address:
lines.append(f'Адрес: {address}')
port = data.get('port')
if port:
- lines.append(f'Порт: {port}')
+ lines.append(f'Порт: {html.escape(str(port))}')
- version = data.get('version') or data.get('panelVersion') or ''
+ version = html.escape(data.get('version') or data.get('panelVersion') or '')
if version:
lines.append(f'Версия: {version}')
# CRM billing fields
- amount = data.get('amount') or data.get('price') or ''
+ amount = html.escape(str(data.get('amount') or data.get('price') or ''))
if amount:
lines.append(f'Сумма: {amount}')
- due_date = data.get('dueDate') or data.get('paymentDate') or ''
+ due_date = html.escape(data.get('dueDate') or data.get('paymentDate') or '')
if due_date:
lines.append(f'Дата: {due_date}')
# Login attempt fields
- ip_addr = data.get('ipAddress') or data.get('ip') or ''
+ ip_addr = html.escape(data.get('ipAddress') or data.get('ip') or '')
if ip_addr and not address:
lines.append(f'IP: {ip_addr}')
- message = data.get('message') or ''
+ message = html.escape(data.get('message') or '')
if message:
lines.append(f'Сообщение: {message}')
try:
- await self._admin_service._send_message('\n'.join(lines))
+ await self._admin_service.send_webhook_notification('\n'.join(lines))
return True
except Exception:
logger.exception('Failed to send admin notification for event %s', event_name)
@@ -220,9 +229,14 @@ class RemnaWaveWebhookService:
async def _resolve_user_and_subscription(
self, db: AsyncSession, data: dict
) -> tuple[User | None, Subscription | None]:
- """Find bot user by telegramId or uuid from webhook payload."""
+ """Find bot user by telegramId or uuid from webhook payload.
+
+ Handles both user-scope events (top-level telegramId/uuid) and
+ device-scope events (userUuid, or nested user.telegramId/user.uuid).
+ """
user: User | None = None
+ # Try top-level telegramId first
telegram_id = data.get('telegramId')
if telegram_id:
try:
@@ -230,11 +244,27 @@ class RemnaWaveWebhookService:
except (ValueError, TypeError):
pass
+ # Try top-level uuid
if not user:
- uuid = data.get('uuid')
+ uuid = data.get('uuid') or data.get('userUuid')
if uuid:
user = await get_user_by_remnawave_uuid(db, uuid)
+ # Try nested user object (e.g. user_hwid_devices events)
+ if not user:
+ nested_user = data.get('user')
+ if isinstance(nested_user, dict):
+ nested_tid = nested_user.get('telegramId')
+ if nested_tid:
+ try:
+ user = await get_user_by_telegram_id(db, int(nested_tid))
+ except (ValueError, TypeError):
+ pass
+ if not user:
+ nested_uuid = nested_user.get('uuid')
+ if nested_uuid:
+ user = await get_user_by_remnawave_uuid(db, nested_uuid)
+
if not user:
return None, None
@@ -295,14 +325,17 @@ class RemnaWaveWebhookService:
context = {'text_key': text_key, **(format_kwargs or {})}
- await notification_delivery_service.send_notification(
- user=user,
- notification_type=notification_type,
- context=context,
- bot=self.bot,
- telegram_message=message,
- telegram_markup=reply_markup,
- )
+ try:
+ await notification_delivery_service.send_notification(
+ user=user,
+ notification_type=notification_type,
+ context=context,
+ bot=self.bot,
+ telegram_message=message,
+ telegram_markup=reply_markup,
+ )
+ except Exception:
+ logger.exception('Notification delivery failed for user %s, text_key %s', user.id, text_key)
# ------------------------------------------------------------------
# User event handlers
@@ -409,9 +442,8 @@ class RemnaWaveWebhookService:
changed = True
if changed:
- subscription.updated_at = datetime.utcnow()
- await db.commit()
- await db.refresh(subscription)
+ subscription.updated_at = datetime.now(UTC).replace(tzinfo=None)
+ await db.flush()
logger.info('Webhook: subscription %s modified (synced from panel) for user %s', subscription.id, user.id)
async def _handle_user_deleted(
@@ -426,7 +458,7 @@ class RemnaWaveWebhookService:
# Clear remnawave linkage
if user.remnawave_uuid:
user.remnawave_uuid = None
- await db.commit()
+ await db.flush()
await self._notify_user(user, 'WEBHOOK_SUB_DELETED')
@@ -450,9 +482,8 @@ class RemnaWaveWebhookService:
changed = True
if changed:
- subscription.updated_at = datetime.utcnow()
- await db.commit()
- await db.refresh(subscription)
+ subscription.updated_at = datetime.now(UTC).replace(tzinfo=None)
+ await db.flush()
logger.info(
'Webhook: subscription %s credentials revoked/updated for user %s', subscription.id, user.id
)
@@ -523,7 +554,7 @@ class RemnaWaveWebhookService:
async def _handle_device_added(
self, db: AsyncSession, user: User, subscription: Subscription | None, data: dict
) -> None:
- device_name = data.get('deviceName') or data.get('hwid') or ''
+ device_name = html.escape(data.get('deviceName') or data.get('hwid') or '')
logger.info('Webhook: device added for user %s: %s', user.id, device_name)
await self._notify_user(
user,
@@ -534,7 +565,7 @@ class RemnaWaveWebhookService:
async def _handle_device_deleted(
self, db: AsyncSession, user: User, subscription: Subscription | None, data: dict
) -> None:
- device_name = data.get('deviceName') or data.get('hwid') or ''
+ device_name = html.escape(data.get('deviceName') or data.get('hwid') or '')
logger.info('Webhook: device deleted for user %s: %s', user.id, device_name)
await self._notify_user(
user,
diff --git a/app/webserver/remnawave_webhook.py b/app/webserver/remnawave_webhook.py
index a8e1e0a7..ed51f539 100644
--- a/app/webserver/remnawave_webhook.py
+++ b/app/webserver/remnawave_webhook.py
@@ -17,7 +17,7 @@ from fastapi import APIRouter, Request, status
from fastapi.responses import JSONResponse
from app.config import settings
-from app.database.database import get_db
+from app.database.database import AsyncSessionLocal
from app.services.remnawave_webhook_service import RemnaWaveWebhookService
@@ -119,26 +119,31 @@ def create_remnawave_webhook_router(bot: Bot) -> APIRouter:
# Process event — return 200 to prevent retries for application-level errors.
# Only return non-200 for infrastructure failures (DB unavailable).
- db_generator = get_db()
+ # Admin events (node/service/crm) don't need a DB session.
+ if webhook_service.is_admin_event(event_name):
+ try:
+ processed = await webhook_service.process_event(None, event_name, data)
+ return JSONResponse({'status': 'ok', 'processed': processed})
+ except Exception:
+ logger.exception('RemnaWave webhook processing error for event %s', event_name)
+ return JSONResponse({'status': 'ok', 'processed': False})
+
+ # User events require a DB session
try:
- db = await db_generator.__anext__()
- except StopAsyncIteration:
+ async with AsyncSessionLocal() as db:
+ try:
+ processed = await webhook_service.process_event(db, event_name, data)
+ await db.commit()
+ return JSONResponse({'status': 'ok', 'processed': processed})
+ except Exception:
+ await db.rollback()
+ logger.exception('RemnaWave webhook processing error for event %s', event_name)
+ return JSONResponse({'status': 'ok', 'processed': False})
+ except Exception:
logger.error('RemnaWave webhook: failed to get database session')
return JSONResponse(
{'status': 'error', 'reason': 'database_unavailable'},
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
)
- try:
- processed = await webhook_service.process_event(db, event_name, data)
- return JSONResponse({'status': 'ok', 'processed': processed})
- except Exception:
- logger.exception('RemnaWave webhook processing error for event %s', event_name)
- return JSONResponse({'status': 'ok', 'processed': False})
- finally:
- try:
- await db_generator.__anext__()
- except StopAsyncIteration:
- pass
-
return router