From dc1e96bbe9b4496e91e9dea591c7fc0ef4cc245b Mon Sep 17 00:00:00 2001 From: Fringg Date: Tue, 10 Feb 2026 05:55:48 +0300 Subject: [PATCH] fix: security and architecture fixes for webhook handlers - Add html.escape() to all untrusted webhook data in admin and device notifications (prevents HTML/Telegram injection) - Add public send_webhook_notification() and is_enabled property to AdminNotificationService (eliminates private method access) - Add dedicated NotificationType enum values for device and not_connected events (fixes incorrect semantic mapping) - Extend user resolution to handle nested user objects and userUuid for device-scope events - Replace manual __anext__() DB session with AsyncSessionLocal context manager; skip DB session for admin-only events - Replace deprecated datetime.utcnow() with datetime.now(UTC) - Use db.flush() instead of db.commit() in handlers (router commits) - Wrap _notify_user in try/except to prevent notification failures from rolling back successful DB mutations --- app/services/admin_notification_service.py | 15 +++ app/services/notification_delivery_service.py | 3 + app/services/remnawave_webhook_service.py | 111 +++++++++++------- app/webserver/remnawave_webhook.py | 37 +++--- 4 files changed, 110 insertions(+), 56 deletions(-) diff --git a/app/services/admin_notification_service.py b/app/services/admin_notification_service.py index bd0da360..297f016e 100644 --- a/app/services/admin_notification_service.py +++ b/app/services/admin_notification_service.py @@ -1216,6 +1216,21 @@ class AdminNotificationService: def _is_enabled(self) -> bool: return self.enabled and bool(self.chat_id) + @property + def is_enabled(self) -> bool: + """Public check for whether admin notifications are configured and active.""" + return self._is_enabled() + + async def send_webhook_notification(self, text: str) -> bool: + """Send a generic webhook/infrastructure notification to admin chat. + + Used by RemnaWaveWebhookService for node, service, and CRM events. + The caller is responsible for HTML-escaping all untrusted data in `text`. + """ + if not self._is_enabled(): + return False + return await self._send_message(text) + def _get_payment_method_display(self, payment_method: str | None) -> str: if not payment_method: return '💰 С баланса' diff --git a/app/services/notification_delivery_service.py b/app/services/notification_delivery_service.py index e971c74e..d8ac6895 100644 --- a/app/services/notification_delivery_service.py +++ b/app/services/notification_delivery_service.py @@ -68,6 +68,9 @@ class NotificationType(Enum): WEBHOOK_SUB_EXPIRING = 'webhook_sub_expiring' WEBHOOK_SUB_FIRST_CONNECTED = 'webhook_sub_first_connected' WEBHOOK_SUB_BANDWIDTH_THRESHOLD = 'webhook_sub_bandwidth_threshold' + WEBHOOK_USER_NOT_CONNECTED = 'webhook_user_not_connected' + WEBHOOK_DEVICE_ADDED = 'webhook_device_added' + WEBHOOK_DEVICE_DELETED = 'webhook_device_deleted' # Other BROADCAST = 'broadcast' diff --git a/app/services/remnawave_webhook_service.py b/app/services/remnawave_webhook_service.py index f1cf542c..561beb7c 100644 --- a/app/services/remnawave_webhook_service.py +++ b/app/services/remnawave_webhook_service.py @@ -8,6 +8,7 @@ Admin events (node, service, crm) send alerts to the admin notification chat. from __future__ import annotations +import html import logging import re from datetime import UTC, datetime @@ -50,9 +51,9 @@ _TEXT_KEY_TO_NOTIFICATION_TYPE: dict[str, NotificationType] = { 'WEBHOOK_SUB_EXPIRED_24H_AGO': NotificationType.WEBHOOK_SUB_EXPIRED, 'WEBHOOK_SUB_FIRST_CONNECTED': NotificationType.WEBHOOK_SUB_FIRST_CONNECTED, 'WEBHOOK_SUB_BANDWIDTH_THRESHOLD': NotificationType.WEBHOOK_SUB_BANDWIDTH_THRESHOLD, - 'WEBHOOK_USER_NOT_CONNECTED': NotificationType.WEBHOOK_SUB_FIRST_CONNECTED, - 'WEBHOOK_DEVICE_ADDED': NotificationType.WEBHOOK_SUB_ENABLED, - 'WEBHOOK_DEVICE_DELETED': NotificationType.WEBHOOK_SUB_DISABLED, + 'WEBHOOK_USER_NOT_CONNECTED': NotificationType.WEBHOOK_USER_NOT_CONNECTED, + 'WEBHOOK_DEVICE_ADDED': NotificationType.WEBHOOK_DEVICE_ADDED, + 'WEBHOOK_DEVICE_DELETED': NotificationType.WEBHOOK_DEVICE_DELETED, } # Admin event display names for notification messages @@ -120,20 +121,28 @@ class RemnaWaveWebhookService: **_ADMIN_CRM_EVENTS, } - async def process_event(self, db: AsyncSession, event_name: str, data: dict) -> bool: + def is_admin_event(self, event_name: str) -> bool: + """Check if the event is admin-scoped (no DB session needed).""" + return event_name in self._admin_handlers + + async def process_event(self, db: AsyncSession | None, event_name: str, data: dict) -> bool: """Route event to the appropriate handler. Returns True if the event was processed, False if skipped/unknown. + db may be None for admin events that don't require database access. """ - # Check user-scoped handlers first - user_handler = self._user_handlers.get(event_name) - if user_handler: - return await self._process_user_event(db, event_name, data, user_handler) - - # Check admin-scoped handlers + # Check admin-scoped handlers (no DB needed) if event_name in self._admin_handlers: return await self._process_admin_event(event_name, data) + # Check user-scoped handlers (require DB session) + user_handler = self._user_handlers.get(event_name) + if user_handler: + if db is None: + logger.error('RemnaWave webhook: DB session required for user event %s', event_name) + return False + return await self._process_user_event(db, event_name, data, user_handler) + logger.debug('Unhandled RemnaWave webhook event: %s', event_name) return False @@ -162,52 +171,52 @@ class RemnaWaveWebhookService: async def _process_admin_event(self, event_name: str, data: dict) -> bool: """Format and send admin notification for infrastructure events.""" - if not self._admin_service._is_enabled(): + if not self._admin_service.is_enabled: logger.debug('Admin notifications disabled, skipping event %s', event_name) return True title = self._admin_handlers.get(event_name, event_name) - # Build message from event data + # Build message from event data (escape all untrusted values to prevent HTML injection) lines = [f'{title}'] # Extract common fields - name = data.get('name') or data.get('nodeName') or data.get('username') or '' + name = html.escape(data.get('name') or data.get('nodeName') or data.get('username') or '') if name: lines.append(f'Имя: {name}') - address = data.get('address') or data.get('ip') or '' + address = html.escape(data.get('address') or data.get('ip') or '') if address: lines.append(f'Адрес: {address}') port = data.get('port') if port: - lines.append(f'Порт: {port}') + lines.append(f'Порт: {html.escape(str(port))}') - version = data.get('version') or data.get('panelVersion') or '' + version = html.escape(data.get('version') or data.get('panelVersion') or '') if version: lines.append(f'Версия: {version}') # CRM billing fields - amount = data.get('amount') or data.get('price') or '' + amount = html.escape(str(data.get('amount') or data.get('price') or '')) if amount: lines.append(f'Сумма: {amount}') - due_date = data.get('dueDate') or data.get('paymentDate') or '' + due_date = html.escape(data.get('dueDate') or data.get('paymentDate') or '') if due_date: lines.append(f'Дата: {due_date}') # Login attempt fields - ip_addr = data.get('ipAddress') or data.get('ip') or '' + ip_addr = html.escape(data.get('ipAddress') or data.get('ip') or '') if ip_addr and not address: lines.append(f'IP: {ip_addr}') - message = data.get('message') or '' + message = html.escape(data.get('message') or '') if message: lines.append(f'Сообщение: {message}') try: - await self._admin_service._send_message('\n'.join(lines)) + await self._admin_service.send_webhook_notification('\n'.join(lines)) return True except Exception: logger.exception('Failed to send admin notification for event %s', event_name) @@ -220,9 +229,14 @@ class RemnaWaveWebhookService: async def _resolve_user_and_subscription( self, db: AsyncSession, data: dict ) -> tuple[User | None, Subscription | None]: - """Find bot user by telegramId or uuid from webhook payload.""" + """Find bot user by telegramId or uuid from webhook payload. + + Handles both user-scope events (top-level telegramId/uuid) and + device-scope events (userUuid, or nested user.telegramId/user.uuid). + """ user: User | None = None + # Try top-level telegramId first telegram_id = data.get('telegramId') if telegram_id: try: @@ -230,11 +244,27 @@ class RemnaWaveWebhookService: except (ValueError, TypeError): pass + # Try top-level uuid if not user: - uuid = data.get('uuid') + uuid = data.get('uuid') or data.get('userUuid') if uuid: user = await get_user_by_remnawave_uuid(db, uuid) + # Try nested user object (e.g. user_hwid_devices events) + if not user: + nested_user = data.get('user') + if isinstance(nested_user, dict): + nested_tid = nested_user.get('telegramId') + if nested_tid: + try: + user = await get_user_by_telegram_id(db, int(nested_tid)) + except (ValueError, TypeError): + pass + if not user: + nested_uuid = nested_user.get('uuid') + if nested_uuid: + user = await get_user_by_remnawave_uuid(db, nested_uuid) + if not user: return None, None @@ -295,14 +325,17 @@ class RemnaWaveWebhookService: context = {'text_key': text_key, **(format_kwargs or {})} - await notification_delivery_service.send_notification( - user=user, - notification_type=notification_type, - context=context, - bot=self.bot, - telegram_message=message, - telegram_markup=reply_markup, - ) + try: + await notification_delivery_service.send_notification( + user=user, + notification_type=notification_type, + context=context, + bot=self.bot, + telegram_message=message, + telegram_markup=reply_markup, + ) + except Exception: + logger.exception('Notification delivery failed for user %s, text_key %s', user.id, text_key) # ------------------------------------------------------------------ # User event handlers @@ -409,9 +442,8 @@ class RemnaWaveWebhookService: changed = True if changed: - subscription.updated_at = datetime.utcnow() - await db.commit() - await db.refresh(subscription) + subscription.updated_at = datetime.now(UTC).replace(tzinfo=None) + await db.flush() logger.info('Webhook: subscription %s modified (synced from panel) for user %s', subscription.id, user.id) async def _handle_user_deleted( @@ -426,7 +458,7 @@ class RemnaWaveWebhookService: # Clear remnawave linkage if user.remnawave_uuid: user.remnawave_uuid = None - await db.commit() + await db.flush() await self._notify_user(user, 'WEBHOOK_SUB_DELETED') @@ -450,9 +482,8 @@ class RemnaWaveWebhookService: changed = True if changed: - subscription.updated_at = datetime.utcnow() - await db.commit() - await db.refresh(subscription) + subscription.updated_at = datetime.now(UTC).replace(tzinfo=None) + await db.flush() logger.info( 'Webhook: subscription %s credentials revoked/updated for user %s', subscription.id, user.id ) @@ -523,7 +554,7 @@ class RemnaWaveWebhookService: async def _handle_device_added( self, db: AsyncSession, user: User, subscription: Subscription | None, data: dict ) -> None: - device_name = data.get('deviceName') or data.get('hwid') or '' + device_name = html.escape(data.get('deviceName') or data.get('hwid') or '') logger.info('Webhook: device added for user %s: %s', user.id, device_name) await self._notify_user( user, @@ -534,7 +565,7 @@ class RemnaWaveWebhookService: async def _handle_device_deleted( self, db: AsyncSession, user: User, subscription: Subscription | None, data: dict ) -> None: - device_name = data.get('deviceName') or data.get('hwid') or '' + device_name = html.escape(data.get('deviceName') or data.get('hwid') or '') logger.info('Webhook: device deleted for user %s: %s', user.id, device_name) await self._notify_user( user, diff --git a/app/webserver/remnawave_webhook.py b/app/webserver/remnawave_webhook.py index a8e1e0a7..ed51f539 100644 --- a/app/webserver/remnawave_webhook.py +++ b/app/webserver/remnawave_webhook.py @@ -17,7 +17,7 @@ from fastapi import APIRouter, Request, status from fastapi.responses import JSONResponse from app.config import settings -from app.database.database import get_db +from app.database.database import AsyncSessionLocal from app.services.remnawave_webhook_service import RemnaWaveWebhookService @@ -119,26 +119,31 @@ def create_remnawave_webhook_router(bot: Bot) -> APIRouter: # Process event — return 200 to prevent retries for application-level errors. # Only return non-200 for infrastructure failures (DB unavailable). - db_generator = get_db() + # Admin events (node/service/crm) don't need a DB session. + if webhook_service.is_admin_event(event_name): + try: + processed = await webhook_service.process_event(None, event_name, data) + return JSONResponse({'status': 'ok', 'processed': processed}) + except Exception: + logger.exception('RemnaWave webhook processing error for event %s', event_name) + return JSONResponse({'status': 'ok', 'processed': False}) + + # User events require a DB session try: - db = await db_generator.__anext__() - except StopAsyncIteration: + async with AsyncSessionLocal() as db: + try: + processed = await webhook_service.process_event(db, event_name, data) + await db.commit() + return JSONResponse({'status': 'ok', 'processed': processed}) + except Exception: + await db.rollback() + logger.exception('RemnaWave webhook processing error for event %s', event_name) + return JSONResponse({'status': 'ok', 'processed': False}) + except Exception: logger.error('RemnaWave webhook: failed to get database session') return JSONResponse( {'status': 'error', 'reason': 'database_unavailable'}, status_code=status.HTTP_503_SERVICE_UNAVAILABLE, ) - try: - processed = await webhook_service.process_event(db, event_name, data) - return JSONResponse({'status': 'ok', 'processed': processed}) - except Exception: - logger.exception('RemnaWave webhook processing error for event %s', event_name) - return JSONResponse({'status': 'ok', 'processed': False}) - finally: - try: - await db_generator.__anext__() - except StopAsyncIteration: - pass - return router