fix: security and architecture fixes for webhook handlers

- Add html.escape() to all untrusted webhook data in admin and device
  notifications (prevents HTML/Telegram injection)
- Add public send_webhook_notification() and is_enabled property to
  AdminNotificationService (eliminates private method access)
- Add dedicated NotificationType enum values for device and not_connected
  events (fixes incorrect semantic mapping)
- Extend user resolution to handle nested user objects and userUuid for
  device-scope events
- Replace manual __anext__() DB session with AsyncSessionLocal context
  manager; skip DB session for admin-only events
- Replace deprecated datetime.utcnow() with datetime.now(UTC)
- Use db.flush() instead of db.commit() in handlers (router commits)
- Wrap _notify_user in try/except to prevent notification failures from
  rolling back successful DB mutations
This commit is contained in:
Fringg
2026-02-10 05:55:48 +03:00
parent 1e37fd9dd2
commit dc1e96bbe9
4 changed files with 110 additions and 56 deletions

View File

@@ -1216,6 +1216,21 @@ class AdminNotificationService:
def _is_enabled(self) -> bool:
return self.enabled and bool(self.chat_id)
@property
def is_enabled(self) -> bool:
"""Public check for whether admin notifications are configured and active."""
return self._is_enabled()
async def send_webhook_notification(self, text: str) -> bool:
"""Send a generic webhook/infrastructure notification to admin chat.
Used by RemnaWaveWebhookService for node, service, and CRM events.
The caller is responsible for HTML-escaping all untrusted data in `text`.
"""
if not self._is_enabled():
return False
return await self._send_message(text)
def _get_payment_method_display(self, payment_method: str | None) -> str:
if not payment_method:
return '💰 С баланса'

View File

@@ -68,6 +68,9 @@ class NotificationType(Enum):
WEBHOOK_SUB_EXPIRING = 'webhook_sub_expiring'
WEBHOOK_SUB_FIRST_CONNECTED = 'webhook_sub_first_connected'
WEBHOOK_SUB_BANDWIDTH_THRESHOLD = 'webhook_sub_bandwidth_threshold'
WEBHOOK_USER_NOT_CONNECTED = 'webhook_user_not_connected'
WEBHOOK_DEVICE_ADDED = 'webhook_device_added'
WEBHOOK_DEVICE_DELETED = 'webhook_device_deleted'
# Other
BROADCAST = 'broadcast'

View File

@@ -8,6 +8,7 @@ Admin events (node, service, crm) send alerts to the admin notification chat.
from __future__ import annotations
import html
import logging
import re
from datetime import UTC, datetime
@@ -50,9 +51,9 @@ _TEXT_KEY_TO_NOTIFICATION_TYPE: dict[str, NotificationType] = {
'WEBHOOK_SUB_EXPIRED_24H_AGO': NotificationType.WEBHOOK_SUB_EXPIRED,
'WEBHOOK_SUB_FIRST_CONNECTED': NotificationType.WEBHOOK_SUB_FIRST_CONNECTED,
'WEBHOOK_SUB_BANDWIDTH_THRESHOLD': NotificationType.WEBHOOK_SUB_BANDWIDTH_THRESHOLD,
'WEBHOOK_USER_NOT_CONNECTED': NotificationType.WEBHOOK_SUB_FIRST_CONNECTED,
'WEBHOOK_DEVICE_ADDED': NotificationType.WEBHOOK_SUB_ENABLED,
'WEBHOOK_DEVICE_DELETED': NotificationType.WEBHOOK_SUB_DISABLED,
'WEBHOOK_USER_NOT_CONNECTED': NotificationType.WEBHOOK_USER_NOT_CONNECTED,
'WEBHOOK_DEVICE_ADDED': NotificationType.WEBHOOK_DEVICE_ADDED,
'WEBHOOK_DEVICE_DELETED': NotificationType.WEBHOOK_DEVICE_DELETED,
}
# Admin event display names for notification messages
@@ -120,20 +121,28 @@ class RemnaWaveWebhookService:
**_ADMIN_CRM_EVENTS,
}
async def process_event(self, db: AsyncSession, event_name: str, data: dict) -> bool:
def is_admin_event(self, event_name: str) -> bool:
"""Check if the event is admin-scoped (no DB session needed)."""
return event_name in self._admin_handlers
async def process_event(self, db: AsyncSession | None, event_name: str, data: dict) -> bool:
"""Route event to the appropriate handler.
Returns True if the event was processed, False if skipped/unknown.
db may be None for admin events that don't require database access.
"""
# Check user-scoped handlers first
user_handler = self._user_handlers.get(event_name)
if user_handler:
return await self._process_user_event(db, event_name, data, user_handler)
# Check admin-scoped handlers
# Check admin-scoped handlers (no DB needed)
if event_name in self._admin_handlers:
return await self._process_admin_event(event_name, data)
# Check user-scoped handlers (require DB session)
user_handler = self._user_handlers.get(event_name)
if user_handler:
if db is None:
logger.error('RemnaWave webhook: DB session required for user event %s', event_name)
return False
return await self._process_user_event(db, event_name, data, user_handler)
logger.debug('Unhandled RemnaWave webhook event: %s', event_name)
return False
@@ -162,52 +171,52 @@ class RemnaWaveWebhookService:
async def _process_admin_event(self, event_name: str, data: dict) -> bool:
"""Format and send admin notification for infrastructure events."""
if not self._admin_service._is_enabled():
if not self._admin_service.is_enabled:
logger.debug('Admin notifications disabled, skipping event %s', event_name)
return True
title = self._admin_handlers.get(event_name, event_name)
# Build message from event data
# Build message from event data (escape all untrusted values to prevent HTML injection)
lines = [f'<b>{title}</b>']
# Extract common fields
name = data.get('name') or data.get('nodeName') or data.get('username') or ''
name = html.escape(data.get('name') or data.get('nodeName') or data.get('username') or '')
if name:
lines.append(f'Имя: <code>{name}</code>')
address = data.get('address') or data.get('ip') or ''
address = html.escape(data.get('address') or data.get('ip') or '')
if address:
lines.append(f'Адрес: <code>{address}</code>')
port = data.get('port')
if port:
lines.append(f'Порт: <code>{port}</code>')
lines.append(f'Порт: <code>{html.escape(str(port))}</code>')
version = data.get('version') or data.get('panelVersion') or ''
version = html.escape(data.get('version') or data.get('panelVersion') or '')
if version:
lines.append(f'Версия: <code>{version}</code>')
# CRM billing fields
amount = data.get('amount') or data.get('price') or ''
amount = html.escape(str(data.get('amount') or data.get('price') or ''))
if amount:
lines.append(f'Сумма: <code>{amount}</code>')
due_date = data.get('dueDate') or data.get('paymentDate') or ''
due_date = html.escape(data.get('dueDate') or data.get('paymentDate') or '')
if due_date:
lines.append(f'Дата: <code>{due_date}</code>')
# Login attempt fields
ip_addr = data.get('ipAddress') or data.get('ip') or ''
ip_addr = html.escape(data.get('ipAddress') or data.get('ip') or '')
if ip_addr and not address:
lines.append(f'IP: <code>{ip_addr}</code>')
message = data.get('message') or ''
message = html.escape(data.get('message') or '')
if message:
lines.append(f'Сообщение: {message}')
try:
await self._admin_service._send_message('\n'.join(lines))
await self._admin_service.send_webhook_notification('\n'.join(lines))
return True
except Exception:
logger.exception('Failed to send admin notification for event %s', event_name)
@@ -220,9 +229,14 @@ class RemnaWaveWebhookService:
async def _resolve_user_and_subscription(
self, db: AsyncSession, data: dict
) -> tuple[User | None, Subscription | None]:
"""Find bot user by telegramId or uuid from webhook payload."""
"""Find bot user by telegramId or uuid from webhook payload.
Handles both user-scope events (top-level telegramId/uuid) and
device-scope events (userUuid, or nested user.telegramId/user.uuid).
"""
user: User | None = None
# Try top-level telegramId first
telegram_id = data.get('telegramId')
if telegram_id:
try:
@@ -230,11 +244,27 @@ class RemnaWaveWebhookService:
except (ValueError, TypeError):
pass
# Try top-level uuid
if not user:
uuid = data.get('uuid')
uuid = data.get('uuid') or data.get('userUuid')
if uuid:
user = await get_user_by_remnawave_uuid(db, uuid)
# Try nested user object (e.g. user_hwid_devices events)
if not user:
nested_user = data.get('user')
if isinstance(nested_user, dict):
nested_tid = nested_user.get('telegramId')
if nested_tid:
try:
user = await get_user_by_telegram_id(db, int(nested_tid))
except (ValueError, TypeError):
pass
if not user:
nested_uuid = nested_user.get('uuid')
if nested_uuid:
user = await get_user_by_remnawave_uuid(db, nested_uuid)
if not user:
return None, None
@@ -295,14 +325,17 @@ class RemnaWaveWebhookService:
context = {'text_key': text_key, **(format_kwargs or {})}
await notification_delivery_service.send_notification(
user=user,
notification_type=notification_type,
context=context,
bot=self.bot,
telegram_message=message,
telegram_markup=reply_markup,
)
try:
await notification_delivery_service.send_notification(
user=user,
notification_type=notification_type,
context=context,
bot=self.bot,
telegram_message=message,
telegram_markup=reply_markup,
)
except Exception:
logger.exception('Notification delivery failed for user %s, text_key %s', user.id, text_key)
# ------------------------------------------------------------------
# User event handlers
@@ -409,9 +442,8 @@ class RemnaWaveWebhookService:
changed = True
if changed:
subscription.updated_at = datetime.utcnow()
await db.commit()
await db.refresh(subscription)
subscription.updated_at = datetime.now(UTC).replace(tzinfo=None)
await db.flush()
logger.info('Webhook: subscription %s modified (synced from panel) for user %s', subscription.id, user.id)
async def _handle_user_deleted(
@@ -426,7 +458,7 @@ class RemnaWaveWebhookService:
# Clear remnawave linkage
if user.remnawave_uuid:
user.remnawave_uuid = None
await db.commit()
await db.flush()
await self._notify_user(user, 'WEBHOOK_SUB_DELETED')
@@ -450,9 +482,8 @@ class RemnaWaveWebhookService:
changed = True
if changed:
subscription.updated_at = datetime.utcnow()
await db.commit()
await db.refresh(subscription)
subscription.updated_at = datetime.now(UTC).replace(tzinfo=None)
await db.flush()
logger.info(
'Webhook: subscription %s credentials revoked/updated for user %s', subscription.id, user.id
)
@@ -523,7 +554,7 @@ class RemnaWaveWebhookService:
async def _handle_device_added(
self, db: AsyncSession, user: User, subscription: Subscription | None, data: dict
) -> None:
device_name = data.get('deviceName') or data.get('hwid') or ''
device_name = html.escape(data.get('deviceName') or data.get('hwid') or '')
logger.info('Webhook: device added for user %s: %s', user.id, device_name)
await self._notify_user(
user,
@@ -534,7 +565,7 @@ class RemnaWaveWebhookService:
async def _handle_device_deleted(
self, db: AsyncSession, user: User, subscription: Subscription | None, data: dict
) -> None:
device_name = data.get('deviceName') or data.get('hwid') or ''
device_name = html.escape(data.get('deviceName') or data.get('hwid') or '')
logger.info('Webhook: device deleted for user %s: %s', user.id, device_name)
await self._notify_user(
user,

View File

@@ -17,7 +17,7 @@ from fastapi import APIRouter, Request, status
from fastapi.responses import JSONResponse
from app.config import settings
from app.database.database import get_db
from app.database.database import AsyncSessionLocal
from app.services.remnawave_webhook_service import RemnaWaveWebhookService
@@ -119,26 +119,31 @@ def create_remnawave_webhook_router(bot: Bot) -> APIRouter:
# Process event — return 200 to prevent retries for application-level errors.
# Only return non-200 for infrastructure failures (DB unavailable).
db_generator = get_db()
# Admin events (node/service/crm) don't need a DB session.
if webhook_service.is_admin_event(event_name):
try:
processed = await webhook_service.process_event(None, event_name, data)
return JSONResponse({'status': 'ok', 'processed': processed})
except Exception:
logger.exception('RemnaWave webhook processing error for event %s', event_name)
return JSONResponse({'status': 'ok', 'processed': False})
# User events require a DB session
try:
db = await db_generator.__anext__()
except StopAsyncIteration:
async with AsyncSessionLocal() as db:
try:
processed = await webhook_service.process_event(db, event_name, data)
await db.commit()
return JSONResponse({'status': 'ok', 'processed': processed})
except Exception:
await db.rollback()
logger.exception('RemnaWave webhook processing error for event %s', event_name)
return JSONResponse({'status': 'ok', 'processed': False})
except Exception:
logger.error('RemnaWave webhook: failed to get database session')
return JSONResponse(
{'status': 'error', 'reason': 'database_unavailable'},
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
)
try:
processed = await webhook_service.process_event(db, event_name, data)
return JSONResponse({'status': 'ok', 'processed': processed})
except Exception:
logger.exception('RemnaWave webhook processing error for event %s', event_name)
return JSONResponse({'status': 'ok', 'processed': False})
finally:
try:
await db_generator.__anext__()
except StopAsyncIteration:
pass
return router