mirror of
https://github.com/docling-project/docling-serve.git
synced 2026-03-07 22:33:44 +00:00
fix: prevent WebsocketNotifier crash when task has no subscribers (#498)
Signed-off-by: Pawel Rein <pawel.rein@prezi.com>
This commit is contained in:
@@ -922,7 +922,7 @@ def create_app(): # noqa: C901
|
||||
return
|
||||
|
||||
# Track active WebSocket connections for this job
|
||||
orchestrator.notifier.task_subscribers[task_id].add(websocket)
|
||||
orchestrator.notifier.task_subscribers.setdefault(task_id, set()).add(websocket)
|
||||
|
||||
try:
|
||||
task_queue_position = await orchestrator.get_queue_position(task_id=task_id)
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
import logging
|
||||
|
||||
from fastapi import WebSocket
|
||||
|
||||
from docling_jobkit.datamodel.task_meta import TaskStatus
|
||||
@@ -10,6 +12,8 @@ from docling_serve.datamodel.responses import (
|
||||
WebsocketMessage,
|
||||
)
|
||||
|
||||
_log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class WebsocketNotifier(BaseNotifier):
|
||||
def __init__(self, orchestrator: BaseOrchestrator):
|
||||
@@ -28,7 +32,10 @@ class WebsocketNotifier(BaseNotifier):
|
||||
|
||||
async def notify_task_subscribers(self, task_id: str):
|
||||
if task_id not in self.task_subscribers:
|
||||
raise RuntimeError(f"Task {task_id} does not have a subscribers list.")
|
||||
_log.debug(
|
||||
f"Task {task_id} has no websocket subscribers, skipping notification."
|
||||
)
|
||||
return
|
||||
|
||||
try:
|
||||
# Get task status from Redis or RQ directly instead of in-memory registry
|
||||
@@ -50,10 +57,6 @@ class WebsocketNotifier(BaseNotifier):
|
||||
if task.is_completed():
|
||||
await websocket.close()
|
||||
except Exception as e:
|
||||
# Log the error but don't crash the notifier
|
||||
import logging
|
||||
|
||||
_log = logging.getLogger(__name__)
|
||||
_log.error(f"Error notifying subscribers for task {task_id}: {e}")
|
||||
|
||||
async def notify_queue_positions(self):
|
||||
@@ -67,10 +70,6 @@ class WebsocketNotifier(BaseNotifier):
|
||||
if task.task_status == TaskStatus.PENDING:
|
||||
await self.notify_task_subscribers(task_id)
|
||||
except Exception as e:
|
||||
# Log the error but don't crash the notifier
|
||||
import logging
|
||||
|
||||
_log = logging.getLogger(__name__)
|
||||
_log.error(
|
||||
f"Error checking task {task_id} status for queue position notification: {e}"
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user