feat: Surface task error messages in status API responses (#502)

Signed-off-by: Pawel Rein <pawel.rein@prezi.com>
Signed-off-by: Michele Dolfi <dol@zurich.ibm.com>
Co-authored-by: Michele Dolfi <dol@zurich.ibm.com>
This commit is contained in:
Paweł Rein
2026-02-24 13:50:53 +01:00
committed by GitHub
parent 3462b7731c
commit e1d8ea9278
6 changed files with 253 additions and 31 deletions

View File

@@ -1,6 +1,7 @@
import asyncio
import copy
import gc
import hashlib
import importlib.metadata
import logging
import os
@@ -344,9 +345,18 @@ def create_app(): # noqa: C901
# Load the uploaded files to Docling DocumentStream
file_sources: list[TaskSource] = []
for i, file in enumerate(files):
buf = BytesIO(file.file.read())
file_bytes = file.file.read()
buf = BytesIO(file_bytes)
suffix = "" if len(file_sources) == 1 else f"_{i}"
name = file.filename if file.filename else f"file{suffix}.pdf"
# Log file details for debugging transmission issues
file_hash = hashlib.md5(file_bytes, usedforsecurity=False).hexdigest()[:12]
_log.info(
f"File {i}: name={name}, size={len(file_bytes)} bytes, "
f"md5={file_hash}, content_type={file.content_type}"
)
file_sources.append(DocumentStream(name=name, stream=buf))
task = await orchestrator.enqueue(
@@ -606,6 +616,7 @@ def create_app(): # noqa: C901
task_status=task.task_status,
task_position=task_queue_position,
task_meta=task.processing_meta,
error_message=getattr(task, "error_message", None),
)
# Convert a document from file(s) using the async api
@@ -643,6 +654,7 @@ def create_app(): # noqa: C901
task_status=task.task_status,
task_position=task_queue_position,
task_meta=task.processing_meta,
error_message=getattr(task, "error_message", None),
)
# Chunking endpoints
@@ -674,6 +686,7 @@ def create_app(): # noqa: C901
task_status=task.task_status,
task_position=task_queue_position,
task_meta=task.processing_meta,
error_message=getattr(task, "error_message", None),
)
@app.post(
@@ -737,6 +750,7 @@ def create_app(): # noqa: C901
task_status=task.task_status,
task_position=task_queue_position,
task_meta=task.processing_meta,
error_message=getattr(task, "error_message", None),
)
@app.post(
@@ -892,6 +906,7 @@ def create_app(): # noqa: C901
task_status=task.task_status,
task_position=task_queue_position,
task_meta=task.processing_meta,
error_message=getattr(task, "error_message", None),
)
# Task status websocket
@@ -937,6 +952,7 @@ def create_app(): # noqa: C901
task_status=task.task_status,
task_position=task_queue_position,
task_meta=task.processing_meta,
error_message=getattr(task, "error_message", None),
)
await websocket.send_text(
WebsocketMessage(
@@ -953,6 +969,7 @@ def create_app(): # noqa: C901
task_status=task.task_status,
task_position=task_queue_position,
task_meta=task.processing_meta,
error_message=getattr(task, "error_message", None),
)
await websocket.send_text(
WebsocketMessage(

View File

@@ -53,6 +53,7 @@ class TaskStatusResponse(BaseModel):
task_status: str
task_position: Optional[int] = None
task_meta: Optional[TaskProcessingMeta] = None
error_message: Optional[str] = None
class MessageKind(str, enum.Enum):

View File

@@ -140,6 +140,7 @@ class RedisTaskStatusMixin:
task_type=data["task_type"],
task_status=TaskStatus(data["task_status"]),
processing_meta=meta,
error_message=data.get("error_message"),
)
except Exception as e:
_log.error(f"Redis get task {task_id}: {e}")
@@ -239,6 +240,7 @@ class RedisTaskStatusMixin:
),
"task_status": task.task_status.value,
"processing_meta": meta,
"error_message": getattr(task, "error_message", None),
}
async with redis.Redis(connection_pool=self._redis_pool) as r:
await r.set(

View File

@@ -1,5 +1,7 @@
"""Instrumented wrapper for RQ job functions with OpenTelemetry tracing."""
import base64
import hashlib
import logging
import shutil
from pathlib import Path
@@ -95,31 +97,52 @@ def instrumented_docling_task( # noqa: C901
with tracer.start_as_current_span("prepare_sources") as prep_span:
convert_sources: list[Union[str, DocumentStream]] = []
headers: dict[str, Any] | None = None
source_info: list[dict[str, str]] = []
for idx, source in enumerate(task.sources):
if isinstance(source, DocumentStream):
convert_sources.append(source)
prep_span.add_event(
f"source_{idx}_prepared",
{"type": "DocumentStream", "name": source.name},
)
info = {"type": "DocumentStream", "name": source.name}
source_info.append(info)
prep_span.add_event(f"source_{idx}_prepared", info)
elif isinstance(source, FileSource):
convert_sources.append(source.to_document_stream())
prep_span.add_event(
f"source_{idx}_prepared",
{"type": "FileSource", "filename": source.filename},
decoded_bytes = base64.b64decode(source.base64_string)
file_hash = hashlib.md5(
decoded_bytes, usedforsecurity=False
).hexdigest()[:12]
logger.info(
f"FileSource {idx}: filename={source.filename}, "
f"base64_len={len(source.base64_string)}, "
f"decoded_size={len(decoded_bytes)}, md5={file_hash}"
)
doc_stream = source.to_document_stream()
convert_sources.append(doc_stream)
info = {
"type": "FileSource",
"filename": source.filename,
"size": str(len(decoded_bytes)),
"md5": file_hash,
}
source_info.append(info)
prep_span.add_event(f"source_{idx}_prepared", info)
elif isinstance(source, HttpSource):
convert_sources.append(str(source.url))
info = {"type": "HttpSource", "url": str(source.url)}
source_info.append(info)
if headers is None and source.headers:
headers = source.headers
prep_span.add_event(
f"source_{idx}_prepared",
{"type": "HttpSource", "url": str(source.url)},
)
prep_span.add_event(f"source_{idx}_prepared", info)
prep_span.set_attribute("num_sources", len(convert_sources))
source_names = ", ".join(
f"{s['type']}={s.get('name') or s.get('filename') or s.get('url', 'unknown')}"
for s in source_info
)
logger.info(
f"Task {task_id} processing {len(convert_sources)} source(s): {source_names}"
)
if not conversion_manager:
raise RuntimeError("No converter")
if not task.convert_options:
@@ -140,20 +163,32 @@ def instrumented_docling_task( # noqa: C901
with tracer.start_as_current_span("process_results") as proc_span:
proc_span.set_attribute("task_type", str(task.task_type.value))
if task.task_type == TaskType.CONVERT:
with tracer.start_as_current_span("process_export_results"):
processed_results = process_export_results(
task=task,
conv_results=conv_results,
work_dir=workdir,
)
elif task.task_type == TaskType.CHUNK:
with tracer.start_as_current_span("process_chunk_results"):
processed_results = process_chunk_results(
task=task,
conv_results=conv_results,
work_dir=workdir,
)
try:
if task.task_type == TaskType.CONVERT:
with tracer.start_as_current_span("process_export_results"):
processed_results = process_export_results(
task=task,
conv_results=conv_results,
work_dir=workdir,
)
elif task.task_type == TaskType.CHUNK:
with tracer.start_as_current_span("process_chunk_results"):
processed_results = process_chunk_results(
task=task,
conv_results=conv_results,
work_dir=workdir,
)
except Exception as proc_error:
source_names = ", ".join(
f"{s['type']}={s.get('name') or s.get('filename') or s.get('url', 'unknown')}"
for s in source_info
)
logger.error(
f"Task {task_id} processing failed. "
f"Sources: {source_names}. "
f"Error: {proc_error}"
)
raise
# Serialize and store results
with tracer.start_as_current_span("serialize_and_store") as store_span:
@@ -195,6 +230,7 @@ def instrumented_docling_task( # noqa: C901
_TaskUpdate(
task_id=task_id,
task_status=TaskStatus.FAILURE,
error_message=str(e),
).model_dump_json(),
)
except Exception:
@@ -209,7 +245,16 @@ def instrumented_docling_task( # noqa: C901
pass
# Record exception and mark span as failed
logger.error(f"Docling task {task_id} failed: {e}", exc_info=True)
source_context = "N/A"
if "source_info" in locals():
source_context = ", ".join(
f"{s['type']}={s.get('name') or s.get('filename') or s.get('url', 'unknown')}"
for s in source_info
)
logger.error(
f"Docling task {task_id} failed: {e}. Sources: {source_context}",
exc_info=True,
)
span.record_exception(e)
span.set_status(Status(StatusCode.ERROR, str(e)))
raise

View File

@@ -0,0 +1,157 @@
"""Tests for error_message propagation through the docling-serve API layer."""
import json
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from docling_jobkit.datamodel.task import Task
from docling_jobkit.datamodel.task_meta import TaskStatus
from docling_jobkit.datamodel.task_targets import InBodyTarget
from docling_serve.datamodel.responses import TaskStatusResponse
class TestTaskStatusResponseErrorMessage:
def test_error_message_field_exists(self):
resp = TaskStatusResponse(
task_id="t1",
task_type="convert",
task_status="failure",
error_message="conversion failed",
)
assert resp.error_message == "conversion failed"
def test_error_message_defaults_to_none(self):
resp = TaskStatusResponse(
task_id="t1",
task_type="convert",
task_status="success",
)
assert resp.error_message is None
def test_error_message_in_json_output(self):
resp = TaskStatusResponse(
task_id="t1",
task_type="convert",
task_status="failure",
error_message="OOM killed",
)
data = json.loads(resp.model_dump_json())
assert data["error_message"] == "OOM killed"
def test_error_message_none_in_json_output(self):
resp = TaskStatusResponse(
task_id="t1",
task_type="convert",
task_status="success",
)
data = json.loads(resp.model_dump_json())
assert data["error_message"] is None
def test_backward_compatible_deserialization(self):
old_json = '{"task_id": "t1", "task_type": "convert", "task_status": "failure"}'
resp = TaskStatusResponse.model_validate_json(old_json)
assert resp.error_message is None
class TestRedisErrorMessageStorage:
@pytest.mark.asyncio
async def test_store_and_retrieve_error_message(self):
from docling_serve.orchestrator_factory import RedisTaskStatusMixin
storage: dict[str, str] = {}
async def mock_set(key: str, value: str, ex: int = 0) -> None:
storage[key] = value
async def mock_get(key: str) -> bytes | None:
val = storage.get(key)
return val.encode() if val else None
mock_redis = AsyncMock()
mock_redis.set = mock_set
mock_redis.get = mock_get
mock_redis.__aenter__ = AsyncMock(return_value=mock_redis)
mock_redis.__aexit__ = AsyncMock(return_value=False)
class FakeMixin(RedisTaskStatusMixin):
def __init__(self):
self.redis_prefix = "docling:tasks:"
self._redis_pool = MagicMock()
self.tasks: dict[str, Task] = {}
self._task_result_keys: dict[str, str] = {}
self.config = MagicMock()
self.config.redis_url = "redis://localhost:6379/"
mixin = FakeMixin()
task = Task(
task_id="fail-task-1",
sources=[],
target=InBodyTarget(),
task_status=TaskStatus.FAILURE,
error_message="corrupt PDF: invalid xref table",
)
with patch(
"docling_serve.orchestrator_factory.redis.Redis", return_value=mock_redis
):
await mixin._store_task_in_redis(task)
raw = storage.get("docling:tasks:fail-task-1:metadata")
assert raw is not None
data = json.loads(raw)
assert data["error_message"] == "corrupt PDF: invalid xref table"
with patch(
"docling_serve.orchestrator_factory.redis.Redis", return_value=mock_redis
):
restored = await mixin._get_task_from_redis("fail-task-1")
assert restored is not None
assert restored.error_message == "corrupt PDF: invalid xref table"
assert restored.task_status == TaskStatus.FAILURE
@pytest.mark.asyncio
async def test_retrieve_without_error_message_backward_compat(self):
from docling_serve.orchestrator_factory import RedisTaskStatusMixin
old_data = json.dumps(
{
"task_id": "old-task-1",
"task_type": "convert",
"task_status": "success",
"processing_meta": {
"num_docs": 1,
"num_processed": 1,
"num_succeeded": 1,
"num_failed": 0,
},
}
)
mock_redis = AsyncMock()
mock_redis.get = AsyncMock(return_value=old_data.encode())
mock_redis.__aenter__ = AsyncMock(return_value=mock_redis)
mock_redis.__aexit__ = AsyncMock(return_value=False)
class FakeMixin(RedisTaskStatusMixin):
def __init__(self):
self.redis_prefix = "docling:tasks:"
self._redis_pool = MagicMock()
self.tasks: dict[str, Task] = {}
self._task_result_keys: dict[str, str] = {}
self.config = MagicMock()
self.config.redis_url = "redis://localhost:6379/"
mixin = FakeMixin()
with patch(
"docling_serve.orchestrator_factory.redis.Redis", return_value=mock_redis
):
restored = await mixin._get_task_from_redis("old-task-1")
assert restored is not None
assert restored.error_message is None
assert restored.task_status == TaskStatus.SUCCESS

6
uv.lock generated
View File

@@ -1379,7 +1379,7 @@ wheels = [
[[package]]
name = "docling-jobkit"
version = "1.11.0"
version = "1.12.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "boto3", marker = "platform_machine != 'x86_64' or sys_platform != 'darwin' or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu126') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu128') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu130') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-cu128') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-cu130') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-cu128' and extra == 'group-13-docling-serve-cu130') or (extra == 'group-13-docling-serve-cu128' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu128' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-cu130' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu130' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-pypi' and extra == 'group-13-docling-serve-rocm')" },
@@ -1390,9 +1390,9 @@ dependencies = [
{ name = "pydantic-settings", marker = "platform_machine != 'x86_64' or sys_platform != 'darwin' or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu126') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu128') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu130') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-cu128') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-cu130') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-cu128' and extra == 'group-13-docling-serve-cu130') or (extra == 'group-13-docling-serve-cu128' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu128' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-cu130' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu130' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-pypi' and extra == 'group-13-docling-serve-rocm')" },
{ name = "typer", marker = "platform_machine != 'x86_64' or sys_platform != 'darwin' or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu126') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu128') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu130') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-cu128') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-cu130') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-cu128' and extra == 'group-13-docling-serve-cu130') or (extra == 'group-13-docling-serve-cu128' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu128' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-cu130' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu130' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-pypi' and extra == 'group-13-docling-serve-rocm')" },
]
sdist = { url = "https://files.pythonhosted.org/packages/18/12/712943c365c395ee922709cc6d7ec8201a030155c2cd0dc6ae761b25a981/docling_jobkit-1.11.0.tar.gz", hash = "sha256:8010d75c7c117c9978e688afb5b69d30b32c1c6666b6be626a64608b91b4734c", size = 70926, upload-time = "2026-02-18T12:20:16.195Z" }
sdist = { url = "https://files.pythonhosted.org/packages/9d/93/18d23e6d820d716401ee953c0c8a03a9d0f8aa3b0257940b26c09540824e/docling_jobkit-1.12.0.tar.gz", hash = "sha256:624d320c93d193eac8af0972c63a330689079a2f9366c24afbb0af3b31dec008", size = 71254, upload-time = "2026-02-24T10:43:52.932Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/0f/66/fb64c094935aee9921c57d4455a7bbfd192c0dcf2cc576605483cd8ac081/docling_jobkit-1.11.0-py3-none-any.whl", hash = "sha256:f3238543a7db21139efa5c4e9167d42aba3d3f2e48f61137c26111cdd4cdeb14", size = 96276, upload-time = "2026-02-18T12:20:14.752Z" },
{ url = "https://files.pythonhosted.org/packages/99/e1/b264d8b7edf078ed8e2e187a49be206d7d29504d66bb5cad98ce673944b2/docling_jobkit-1.12.0-py3-none-any.whl", hash = "sha256:adb79ea8d3b751f49d036940d46da451177cf715c20bb2377f6515224b8008a2", size = 96652, upload-time = "2026-02-24T10:43:51.54Z" },
]
[package.optional-dependencies]