From dfec81c00a5c3ab25843fed505b5b6cf902b0868 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Rein?= Date: Mon, 23 Feb 2026 18:46:03 +0100 Subject: [PATCH] fix: snapshot dict/set iterations in WebsocketNotifier to prevent RuntimeError (#511) Signed-off-by: Pawel Rein --- docling_serve/app.py | 4 +++- docling_serve/websocket_notifier.py | 11 +++++------ 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/docling_serve/app.py b/docling_serve/app.py index 0833423..e7a8825 100644 --- a/docling_serve/app.py +++ b/docling_serve/app.py @@ -962,7 +962,9 @@ def create_app(): # noqa: C901 _log.info(f"WebSocket disconnected for job {task_id}") finally: - orchestrator.notifier.task_subscribers[task_id].remove(websocket) + subs = orchestrator.notifier.task_subscribers.get(task_id) + if subs: + subs.discard(websocket) # Task result @app.get( diff --git a/docling_serve/websocket_notifier.py b/docling_serve/websocket_notifier.py index ea57d35..47a8235 100644 --- a/docling_serve/websocket_notifier.py +++ b/docling_serve/websocket_notifier.py @@ -24,12 +24,11 @@ class WebsocketNotifier(BaseNotifier): self.task_subscribers[task_id] = set() async def remove_task(self, task_id: str): - if task_id in self.task_subscribers: - for websocket in self.task_subscribers[task_id]: + subscribers = self.task_subscribers.pop(task_id, None) + if subscribers: + for websocket in list(subscribers): await websocket.close() - del self.task_subscribers[task_id] - async def notify_task_subscribers(self, task_id: str): if task_id not in self.task_subscribers: _log.debug( @@ -48,7 +47,7 @@ class WebsocketNotifier(BaseNotifier): task_position=task_queue_position, task_meta=task.processing_meta, ) - for websocket in self.task_subscribers[task_id]: + for websocket in list(self.task_subscribers.get(task_id, set())): await websocket.send_text( WebsocketMessage( message=MessageKind.UPDATE, task=msg @@ -61,7 +60,7 @@ class WebsocketNotifier(BaseNotifier): async def notify_queue_positions(self): """Notify all subscribers of pending tasks about queue position updates.""" - for task_id in self.task_subscribers.keys(): + for task_id in list(self.task_subscribers.keys()): try: # Check task status directly from Redis or RQ task = await self.orchestrator.task_status(task_id)