diff --git a/docling_serve/app.py b/docling_serve/app.py index 755b2a3..0833423 100644 --- a/docling_serve/app.py +++ b/docling_serve/app.py @@ -922,7 +922,7 @@ def create_app(): # noqa: C901 return # Track active WebSocket connections for this job - orchestrator.notifier.task_subscribers[task_id].add(websocket) + orchestrator.notifier.task_subscribers.setdefault(task_id, set()).add(websocket) try: task_queue_position = await orchestrator.get_queue_position(task_id=task_id) diff --git a/docling_serve/websocket_notifier.py b/docling_serve/websocket_notifier.py index 779c5af..ea57d35 100644 --- a/docling_serve/websocket_notifier.py +++ b/docling_serve/websocket_notifier.py @@ -1,3 +1,5 @@ +import logging + from fastapi import WebSocket from docling_jobkit.datamodel.task_meta import TaskStatus @@ -10,6 +12,8 @@ from docling_serve.datamodel.responses import ( WebsocketMessage, ) +_log = logging.getLogger(__name__) + class WebsocketNotifier(BaseNotifier): def __init__(self, orchestrator: BaseOrchestrator): @@ -28,7 +32,10 @@ class WebsocketNotifier(BaseNotifier): async def notify_task_subscribers(self, task_id: str): if task_id not in self.task_subscribers: - raise RuntimeError(f"Task {task_id} does not have a subscribers list.") + _log.debug( + f"Task {task_id} has no websocket subscribers, skipping notification." + ) + return try: # Get task status from Redis or RQ directly instead of in-memory registry @@ -50,10 +57,6 @@ class WebsocketNotifier(BaseNotifier): if task.is_completed(): await websocket.close() except Exception as e: - # Log the error but don't crash the notifier - import logging - - _log = logging.getLogger(__name__) _log.error(f"Error notifying subscribers for task {task_id}: {e}") async def notify_queue_positions(self): @@ -67,10 +70,6 @@ class WebsocketNotifier(BaseNotifier): if task.task_status == TaskStatus.PENDING: await self.notify_task_subscribers(task_id) except Exception as e: - # Log the error but don't crash the notifier - import logging - - _log = logging.getLogger(__name__) _log.error( f"Error checking task {task_id} status for queue position notification: {e}" )