diff --git a/docling_serve/app.py b/docling_serve/app.py index 5188611..108fb1b 100644 --- a/docling_serve/app.py +++ b/docling_serve/app.py @@ -1,6 +1,7 @@ import asyncio import copy import gc +import hashlib import importlib.metadata import logging import os @@ -344,9 +345,18 @@ def create_app(): # noqa: C901 # Load the uploaded files to Docling DocumentStream file_sources: list[TaskSource] = [] for i, file in enumerate(files): - buf = BytesIO(file.file.read()) + file_bytes = file.file.read() + buf = BytesIO(file_bytes) suffix = "" if len(file_sources) == 1 else f"_{i}" name = file.filename if file.filename else f"file{suffix}.pdf" + + # Log file details for debugging transmission issues + file_hash = hashlib.md5(file_bytes, usedforsecurity=False).hexdigest()[:12] + _log.info( + f"File {i}: name={name}, size={len(file_bytes)} bytes, " + f"md5={file_hash}, content_type={file.content_type}" + ) + file_sources.append(DocumentStream(name=name, stream=buf)) task = await orchestrator.enqueue( @@ -606,6 +616,7 @@ def create_app(): # noqa: C901 task_status=task.task_status, task_position=task_queue_position, task_meta=task.processing_meta, + error_message=getattr(task, "error_message", None), ) # Convert a document from file(s) using the async api @@ -643,6 +654,7 @@ def create_app(): # noqa: C901 task_status=task.task_status, task_position=task_queue_position, task_meta=task.processing_meta, + error_message=getattr(task, "error_message", None), ) # Chunking endpoints @@ -674,6 +686,7 @@ def create_app(): # noqa: C901 task_status=task.task_status, task_position=task_queue_position, task_meta=task.processing_meta, + error_message=getattr(task, "error_message", None), ) @app.post( @@ -737,6 +750,7 @@ def create_app(): # noqa: C901 task_status=task.task_status, task_position=task_queue_position, task_meta=task.processing_meta, + error_message=getattr(task, "error_message", None), ) @app.post( @@ -892,6 +906,7 @@ def create_app(): # noqa: C901 task_status=task.task_status, task_position=task_queue_position, task_meta=task.processing_meta, + error_message=getattr(task, "error_message", None), ) # Task status websocket @@ -937,6 +952,7 @@ def create_app(): # noqa: C901 task_status=task.task_status, task_position=task_queue_position, task_meta=task.processing_meta, + error_message=getattr(task, "error_message", None), ) await websocket.send_text( WebsocketMessage( @@ -953,6 +969,7 @@ def create_app(): # noqa: C901 task_status=task.task_status, task_position=task_queue_position, task_meta=task.processing_meta, + error_message=getattr(task, "error_message", None), ) await websocket.send_text( WebsocketMessage( diff --git a/docling_serve/datamodel/responses.py b/docling_serve/datamodel/responses.py index e512439..4aca578 100644 --- a/docling_serve/datamodel/responses.py +++ b/docling_serve/datamodel/responses.py @@ -53,6 +53,7 @@ class TaskStatusResponse(BaseModel): task_status: str task_position: Optional[int] = None task_meta: Optional[TaskProcessingMeta] = None + error_message: Optional[str] = None class MessageKind(str, enum.Enum): diff --git a/docling_serve/orchestrator_factory.py b/docling_serve/orchestrator_factory.py index cca89c6..ade23a3 100644 --- a/docling_serve/orchestrator_factory.py +++ b/docling_serve/orchestrator_factory.py @@ -140,6 +140,7 @@ class RedisTaskStatusMixin: task_type=data["task_type"], task_status=TaskStatus(data["task_status"]), processing_meta=meta, + error_message=data.get("error_message"), ) except Exception as e: _log.error(f"Redis get task {task_id}: {e}") @@ -239,6 +240,7 @@ class RedisTaskStatusMixin: ), "task_status": task.task_status.value, "processing_meta": meta, + "error_message": getattr(task, "error_message", None), } async with redis.Redis(connection_pool=self._redis_pool) as r: await r.set( diff --git a/docling_serve/rq_job_wrapper.py b/docling_serve/rq_job_wrapper.py index a4cf1a8..c5f8d49 100644 --- a/docling_serve/rq_job_wrapper.py +++ b/docling_serve/rq_job_wrapper.py @@ -1,5 +1,7 @@ """Instrumented wrapper for RQ job functions with OpenTelemetry tracing.""" +import base64 +import hashlib import logging import shutil from pathlib import Path @@ -95,31 +97,52 @@ def instrumented_docling_task( # noqa: C901 with tracer.start_as_current_span("prepare_sources") as prep_span: convert_sources: list[Union[str, DocumentStream]] = [] headers: dict[str, Any] | None = None + source_info: list[dict[str, str]] = [] for idx, source in enumerate(task.sources): if isinstance(source, DocumentStream): convert_sources.append(source) - prep_span.add_event( - f"source_{idx}_prepared", - {"type": "DocumentStream", "name": source.name}, - ) + info = {"type": "DocumentStream", "name": source.name} + source_info.append(info) + prep_span.add_event(f"source_{idx}_prepared", info) elif isinstance(source, FileSource): - convert_sources.append(source.to_document_stream()) - prep_span.add_event( - f"source_{idx}_prepared", - {"type": "FileSource", "filename": source.filename}, + decoded_bytes = base64.b64decode(source.base64_string) + file_hash = hashlib.md5( + decoded_bytes, usedforsecurity=False + ).hexdigest()[:12] + logger.info( + f"FileSource {idx}: filename={source.filename}, " + f"base64_len={len(source.base64_string)}, " + f"decoded_size={len(decoded_bytes)}, md5={file_hash}" ) + doc_stream = source.to_document_stream() + convert_sources.append(doc_stream) + info = { + "type": "FileSource", + "filename": source.filename, + "size": str(len(decoded_bytes)), + "md5": file_hash, + } + source_info.append(info) + prep_span.add_event(f"source_{idx}_prepared", info) elif isinstance(source, HttpSource): convert_sources.append(str(source.url)) + info = {"type": "HttpSource", "url": str(source.url)} + source_info.append(info) if headers is None and source.headers: headers = source.headers - prep_span.add_event( - f"source_{idx}_prepared", - {"type": "HttpSource", "url": str(source.url)}, - ) + prep_span.add_event(f"source_{idx}_prepared", info) prep_span.set_attribute("num_sources", len(convert_sources)) + source_names = ", ".join( + f"{s['type']}={s.get('name') or s.get('filename') or s.get('url', 'unknown')}" + for s in source_info + ) + logger.info( + f"Task {task_id} processing {len(convert_sources)} source(s): {source_names}" + ) + if not conversion_manager: raise RuntimeError("No converter") if not task.convert_options: @@ -140,20 +163,32 @@ def instrumented_docling_task( # noqa: C901 with tracer.start_as_current_span("process_results") as proc_span: proc_span.set_attribute("task_type", str(task.task_type.value)) - if task.task_type == TaskType.CONVERT: - with tracer.start_as_current_span("process_export_results"): - processed_results = process_export_results( - task=task, - conv_results=conv_results, - work_dir=workdir, - ) - elif task.task_type == TaskType.CHUNK: - with tracer.start_as_current_span("process_chunk_results"): - processed_results = process_chunk_results( - task=task, - conv_results=conv_results, - work_dir=workdir, - ) + try: + if task.task_type == TaskType.CONVERT: + with tracer.start_as_current_span("process_export_results"): + processed_results = process_export_results( + task=task, + conv_results=conv_results, + work_dir=workdir, + ) + elif task.task_type == TaskType.CHUNK: + with tracer.start_as_current_span("process_chunk_results"): + processed_results = process_chunk_results( + task=task, + conv_results=conv_results, + work_dir=workdir, + ) + except Exception as proc_error: + source_names = ", ".join( + f"{s['type']}={s.get('name') or s.get('filename') or s.get('url', 'unknown')}" + for s in source_info + ) + logger.error( + f"Task {task_id} processing failed. " + f"Sources: {source_names}. " + f"Error: {proc_error}" + ) + raise # Serialize and store results with tracer.start_as_current_span("serialize_and_store") as store_span: @@ -195,6 +230,7 @@ def instrumented_docling_task( # noqa: C901 _TaskUpdate( task_id=task_id, task_status=TaskStatus.FAILURE, + error_message=str(e), ).model_dump_json(), ) except Exception: @@ -209,7 +245,16 @@ def instrumented_docling_task( # noqa: C901 pass # Record exception and mark span as failed - logger.error(f"Docling task {task_id} failed: {e}", exc_info=True) + source_context = "N/A" + if "source_info" in locals(): + source_context = ", ".join( + f"{s['type']}={s.get('name') or s.get('filename') or s.get('url', 'unknown')}" + for s in source_info + ) + logger.error( + f"Docling task {task_id} failed: {e}. Sources: {source_context}", + exc_info=True, + ) span.record_exception(e) span.set_status(Status(StatusCode.ERROR, str(e))) raise diff --git a/tests/test_error_message_propagation.py b/tests/test_error_message_propagation.py new file mode 100644 index 0000000..8a26740 --- /dev/null +++ b/tests/test_error_message_propagation.py @@ -0,0 +1,157 @@ +"""Tests for error_message propagation through the docling-serve API layer.""" + +import json +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from docling_jobkit.datamodel.task import Task +from docling_jobkit.datamodel.task_meta import TaskStatus +from docling_jobkit.datamodel.task_targets import InBodyTarget + +from docling_serve.datamodel.responses import TaskStatusResponse + + +class TestTaskStatusResponseErrorMessage: + def test_error_message_field_exists(self): + resp = TaskStatusResponse( + task_id="t1", + task_type="convert", + task_status="failure", + error_message="conversion failed", + ) + assert resp.error_message == "conversion failed" + + def test_error_message_defaults_to_none(self): + resp = TaskStatusResponse( + task_id="t1", + task_type="convert", + task_status="success", + ) + assert resp.error_message is None + + def test_error_message_in_json_output(self): + resp = TaskStatusResponse( + task_id="t1", + task_type="convert", + task_status="failure", + error_message="OOM killed", + ) + data = json.loads(resp.model_dump_json()) + assert data["error_message"] == "OOM killed" + + def test_error_message_none_in_json_output(self): + resp = TaskStatusResponse( + task_id="t1", + task_type="convert", + task_status="success", + ) + data = json.loads(resp.model_dump_json()) + assert data["error_message"] is None + + def test_backward_compatible_deserialization(self): + old_json = '{"task_id": "t1", "task_type": "convert", "task_status": "failure"}' + resp = TaskStatusResponse.model_validate_json(old_json) + assert resp.error_message is None + + +class TestRedisErrorMessageStorage: + @pytest.mark.asyncio + async def test_store_and_retrieve_error_message(self): + from docling_serve.orchestrator_factory import RedisTaskStatusMixin + + storage: dict[str, str] = {} + + async def mock_set(key: str, value: str, ex: int = 0) -> None: + storage[key] = value + + async def mock_get(key: str) -> bytes | None: + val = storage.get(key) + return val.encode() if val else None + + mock_redis = AsyncMock() + mock_redis.set = mock_set + mock_redis.get = mock_get + mock_redis.__aenter__ = AsyncMock(return_value=mock_redis) + mock_redis.__aexit__ = AsyncMock(return_value=False) + + class FakeMixin(RedisTaskStatusMixin): + def __init__(self): + self.redis_prefix = "docling:tasks:" + self._redis_pool = MagicMock() + self.tasks: dict[str, Task] = {} + self._task_result_keys: dict[str, str] = {} + self.config = MagicMock() + self.config.redis_url = "redis://localhost:6379/" + + mixin = FakeMixin() + + task = Task( + task_id="fail-task-1", + sources=[], + target=InBodyTarget(), + task_status=TaskStatus.FAILURE, + error_message="corrupt PDF: invalid xref table", + ) + + with patch( + "docling_serve.orchestrator_factory.redis.Redis", return_value=mock_redis + ): + await mixin._store_task_in_redis(task) + + raw = storage.get("docling:tasks:fail-task-1:metadata") + assert raw is not None + data = json.loads(raw) + assert data["error_message"] == "corrupt PDF: invalid xref table" + + with patch( + "docling_serve.orchestrator_factory.redis.Redis", return_value=mock_redis + ): + restored = await mixin._get_task_from_redis("fail-task-1") + + assert restored is not None + assert restored.error_message == "corrupt PDF: invalid xref table" + assert restored.task_status == TaskStatus.FAILURE + + @pytest.mark.asyncio + async def test_retrieve_without_error_message_backward_compat(self): + from docling_serve.orchestrator_factory import RedisTaskStatusMixin + + old_data = json.dumps( + { + "task_id": "old-task-1", + "task_type": "convert", + "task_status": "success", + "processing_meta": { + "num_docs": 1, + "num_processed": 1, + "num_succeeded": 1, + "num_failed": 0, + }, + } + ) + + mock_redis = AsyncMock() + mock_redis.get = AsyncMock(return_value=old_data.encode()) + mock_redis.__aenter__ = AsyncMock(return_value=mock_redis) + mock_redis.__aexit__ = AsyncMock(return_value=False) + + class FakeMixin(RedisTaskStatusMixin): + def __init__(self): + self.redis_prefix = "docling:tasks:" + self._redis_pool = MagicMock() + self.tasks: dict[str, Task] = {} + self._task_result_keys: dict[str, str] = {} + self.config = MagicMock() + self.config.redis_url = "redis://localhost:6379/" + + mixin = FakeMixin() + + with patch( + "docling_serve.orchestrator_factory.redis.Redis", return_value=mock_redis + ): + restored = await mixin._get_task_from_redis("old-task-1") + + assert restored is not None + assert restored.error_message is None + assert restored.task_status == TaskStatus.SUCCESS diff --git a/uv.lock b/uv.lock index c8299d5..dcae9ed 100644 --- a/uv.lock +++ b/uv.lock @@ -1379,7 +1379,7 @@ wheels = [ [[package]] name = "docling-jobkit" -version = "1.11.0" +version = "1.12.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "boto3", marker = "platform_machine != 'x86_64' or sys_platform != 'darwin' or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu126') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu128') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu130') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-cu128') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-cu130') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-cu128' and extra == 'group-13-docling-serve-cu130') or (extra == 'group-13-docling-serve-cu128' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu128' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-cu130' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu130' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-pypi' and extra == 'group-13-docling-serve-rocm')" }, @@ -1390,9 +1390,9 @@ dependencies = [ { name = "pydantic-settings", marker = "platform_machine != 'x86_64' or sys_platform != 'darwin' or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu126') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu128') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu130') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-cu128') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-cu130') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-cu128' and extra == 'group-13-docling-serve-cu130') or (extra == 'group-13-docling-serve-cu128' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu128' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-cu130' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu130' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-pypi' and extra == 'group-13-docling-serve-rocm')" }, { name = "typer", marker = "platform_machine != 'x86_64' or sys_platform != 'darwin' or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu126') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu128') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu130') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-cu128') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-cu130') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-cu128' and extra == 'group-13-docling-serve-cu130') or (extra == 'group-13-docling-serve-cu128' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu128' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-cu130' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu130' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-pypi' and extra == 'group-13-docling-serve-rocm')" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/18/12/712943c365c395ee922709cc6d7ec8201a030155c2cd0dc6ae761b25a981/docling_jobkit-1.11.0.tar.gz", hash = "sha256:8010d75c7c117c9978e688afb5b69d30b32c1c6666b6be626a64608b91b4734c", size = 70926, upload-time = "2026-02-18T12:20:16.195Z" } +sdist = { url = "https://files.pythonhosted.org/packages/9d/93/18d23e6d820d716401ee953c0c8a03a9d0f8aa3b0257940b26c09540824e/docling_jobkit-1.12.0.tar.gz", hash = "sha256:624d320c93d193eac8af0972c63a330689079a2f9366c24afbb0af3b31dec008", size = 71254, upload-time = "2026-02-24T10:43:52.932Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/0f/66/fb64c094935aee9921c57d4455a7bbfd192c0dcf2cc576605483cd8ac081/docling_jobkit-1.11.0-py3-none-any.whl", hash = "sha256:f3238543a7db21139efa5c4e9167d42aba3d3f2e48f61137c26111cdd4cdeb14", size = 96276, upload-time = "2026-02-18T12:20:14.752Z" }, + { url = "https://files.pythonhosted.org/packages/99/e1/b264d8b7edf078ed8e2e187a49be206d7d29504d66bb5cad98ce673944b2/docling_jobkit-1.12.0-py3-none-any.whl", hash = "sha256:adb79ea8d3b751f49d036940d46da451177cf715c20bb2377f6515224b8008a2", size = 96652, upload-time = "2026-02-24T10:43:51.54Z" }, ] [package.optional-dependencies]