Merge branch 'main' into ui-revamp

Signed-off-by: DKL <dkl@zurich.ibm.com>
This commit is contained in:
DKL
2026-02-12 10:50:14 +01:00
26 changed files with 4657 additions and 3777 deletions

View File

@@ -1,3 +1,6 @@
TESSDATA_PREFIX=/usr/share/tesseract/tessdata/
UVICORN_WORKERS=2
UVICORN_RELOAD=True
UVICORN_RELOAD=True
# Logging configuration (case-insensitive)
# DOCLING_SERVE_LOG_LEVEL=WARNING # Options: WARNING, INFO, DEBUG (or warning, info, debug)

View File

@@ -38,3 +38,7 @@ Panos
Vagenas
Staar
Livathinos
(?i)redis
(?i)prometheus
(?i)grafana
(?i)tempo

View File

@@ -160,13 +160,10 @@ jobs:
pip install uv
uv venv --allow-existing
source .venv/bin/activate
uv sync --all-extras --no-extra flash-attn
uv sync --only-dev
# Run pytest tests
echo "Running tests..."
# Test import
python -c 'from docling_serve.app import create_app; create_app()'
# Run pytest and check result directly
if ! pytest -sv -k "test_convert_url" tests/test_1-url-async.py \
--disable-warnings; then

View File

@@ -1,3 +1,78 @@
## [v1.12.0](https://github.com/docling-project/docling-serve/releases/tag/v1.12.0) - 2026-02-06
### Feature
* Updates for chart extraction functionality ([#491](https://github.com/docling-project/docling-serve/issues/491)) ([`7e461b1`](https://github.com/docling-project/docling-serve/commit/7e461b115bb66ddd1628372e09b40669138bd3e4))
### Docling libraries included in this release:
- docling 2.72.0
- docling-core 2.63.0
- docling-ibm-models 3.11.0
- docling-jobkit 1.10.1
- docling-mcp 1.3.4
- docling-parse 4.7.3
- docling-serve 1.12.0
## [v1.11.0](https://github.com/docling-project/docling-serve/releases/tag/v1.11.0) - 2026-01-28
### Feature
* Updated Docling with new features and dependencies updates ([#476](https://github.com/docling-project/docling-serve/issues/476)) ([`cfe747f`](https://github.com/docling-project/docling-serve/commit/cfe747fbfcb44fe74502dc4d6a7662265a9567da))
### Fix
* New docling-jobkit with memory fixes for RQ ([#479](https://github.com/docling-project/docling-serve/issues/479)) ([`8885993`](https://github.com/docling-project/docling-serve/commit/8885993a89e28416c096f63cc96f314b0ebdfe04))
### Docling libraries included in this release:
- docling 2.70.0
- docling-core 2.61.0
- docling-ibm-models 3.11.0
- docling-jobkit 1.9.0
- docling-mcp 1.3.4
- docling-parse 4.7.3
- docling-serve 1.11.0
## [v1.10.0](https://github.com/docling-project/docling-serve/releases/tag/v1.10.0) - 2026-01-13
### Feature
* OpenTelemetry support for traces and metrics ([#456](https://github.com/docling-project/docling-serve/issues/456)) ([`416312a`](https://github.com/docling-project/docling-serve/commit/416312a41b04f184a9b8bd37d8a4db9a2dfa1014))
* Make RQ results_ttl configurable ([#460](https://github.com/docling-project/docling-serve/issues/460)) ([`c57dd51`](https://github.com/docling-project/docling-serve/commit/c57dd51c4d5c05515e0fe160b237197bac0d668b))
### Fix
* Cleanup error prints ([#470](https://github.com/docling-project/docling-serve/issues/470)) ([`c59b771`](https://github.com/docling-project/docling-serve/commit/c59b77151d265d54c6c1f65ebfb16ca185a04b7f))
* Update dependencies ([#469](https://github.com/docling-project/docling-serve/issues/469)) ([`8eddd58`](https://github.com/docling-project/docling-serve/commit/8eddd589bb82787130015a976592fff2dddc77c6))
* Race condition in Gradio UI task result retrieval ([#454](https://github.com/docling-project/docling-serve/issues/454)) ([`a179338`](https://github.com/docling-project/docling-serve/commit/a179338c785ef9b84696f41b7ab2f2cafe80973d))
### Docling libraries included in this release:
- docling 2.67.0
- docling-core 2.59.0
- docling-ibm-models 3.10.3
- docling-jobkit 1.8.1
- docling-mcp 1.3.3
- docling-parse 4.7.2
- docling-serve 1.10.0
## [v1.9.0](https://github.com/docling-project/docling-serve/releases/tag/v1.9.0) - 2025-11-24
### Feature
* Version endpoint ([#442](https://github.com/docling-project/docling-serve/issues/442)) ([`2c23f65`](https://github.com/docling-project/docling-serve/commit/2c23f65507d7699694debd7faa0de840ef2d2cb7))
### Fix
* Dependencies updates Docling 2.63.0 ([#443](https://github.com/docling-project/docling-serve/issues/443)) ([`e437e83`](https://github.com/docling-project/docling-serve/commit/e437e830c956f9a76cd0c62faf9add0231992548))
### Docling libraries included in this release:
- docling 2.63.0
- docling-core 2.52.0
- docling-ibm-models 3.10.2
- docling-jobkit 1.8.0
- docling-mcp 1.3.3
- docling-parse 4.7.1
- docling-serve 1.9.0
## [v1.8.0](https://github.com/docling-project/docling-serve/releases/tag/v1.8.0) - 2025-10-31
### Feature

View File

@@ -4,7 +4,7 @@ import platform
import sys
import warnings
from pathlib import Path
from typing import Annotated, Any, Optional, Union
from typing import Annotated, Any, Union
import typer
import uvicorn
@@ -66,12 +66,21 @@ def callback(
),
] = 0,
) -> None:
if verbose == 0:
# Priority: CLI flag > ENV variable > default (WARNING)
if verbose > 0:
# CLI flag takes precedence
if verbose == 1:
logging.basicConfig(level=logging.INFO)
elif verbose >= 2:
logging.basicConfig(level=logging.DEBUG)
elif docling_serve_settings.log_level:
# Use ENV variable if CLI flag not provided
logging.basicConfig(
level=getattr(logging, docling_serve_settings.log_level.value)
)
else:
# Default to WARNING
logging.basicConfig(level=logging.WARNING)
elif verbose == 1:
logging.basicConfig(level=logging.INFO)
elif verbose == 2:
logging.basicConfig(level=logging.DEBUG)
def _run(
@@ -205,17 +214,17 @@ def dev(
int, typer.Option(help="Timeout for the server response.")
] = uvicorn_settings.timeout_keep_alive,
ssl_certfile: Annotated[
Optional[Path], typer.Option(help="SSL certificate file")
Path | None, typer.Option(help="SSL certificate file")
] = uvicorn_settings.ssl_certfile,
ssl_keyfile: Annotated[
Optional[Path], typer.Option(help="SSL key file")
Path | None, typer.Option(help="SSL key file")
] = uvicorn_settings.ssl_keyfile,
ssl_keyfile_password: Annotated[
Optional[str], typer.Option(help="SSL keyfile password")
str | None, typer.Option(help="SSL keyfile password")
] = uvicorn_settings.ssl_keyfile_password,
# docling options
artifacts_path: Annotated[
Optional[Path],
Path | None,
typer.Option(
help=(
"If set to a valid directory, "
@@ -312,17 +321,17 @@ def run(
int, typer.Option(help="Timeout for the server response.")
] = uvicorn_settings.timeout_keep_alive,
ssl_certfile: Annotated[
Optional[Path], typer.Option(help="SSL certificate file")
Path | None, typer.Option(help="SSL certificate file")
] = uvicorn_settings.ssl_certfile,
ssl_keyfile: Annotated[
Optional[Path], typer.Option(help="SSL key file")
Path | None, typer.Option(help="SSL key file")
] = uvicorn_settings.ssl_keyfile,
ssl_keyfile_password: Annotated[
Optional[str], typer.Option(help="SSL keyfile password")
str | None, typer.Option(help="SSL keyfile password")
] = uvicorn_settings.ssl_keyfile_password,
# docling options
artifacts_path: Annotated[
Optional[Path],
Path | None,
typer.Option(
help=(
"If set to a valid directory, "
@@ -367,15 +376,38 @@ def rq_worker() -> Any:
"""
Run the [bold]Docling JobKit[/bold] RQ worker.
"""
from docling_jobkit.convert.manager import DoclingConverterManagerConfig
from docling_jobkit.orchestrators.rq.orchestrator import RQOrchestratorConfig
from docling_jobkit.orchestrators.rq.worker import run_worker
import tempfile
from pathlib import Path
from docling_jobkit.convert.manager import (
DoclingConverterManagerConfig,
)
from docling_jobkit.orchestrators.rq.orchestrator import (
RQOrchestrator,
RQOrchestratorConfig,
)
from docling_serve.rq_instrumentation import setup_rq_worker_instrumentation
from docling_serve.rq_worker_instrumented import InstrumentedRQWorker
# Configure logging for RQ worker
if docling_serve_settings.log_level:
logging.basicConfig(
level=getattr(logging, docling_serve_settings.log_level.value)
)
else:
logging.basicConfig(level=logging.WARNING)
# Set up OpenTelemetry for the worker process
if docling_serve_settings.otel_enable_traces:
setup_rq_worker_instrumentation()
rq_config = RQOrchestratorConfig(
redis_url=docling_serve_settings.eng_rq_redis_url,
results_prefix=docling_serve_settings.eng_rq_results_prefix,
sub_channel=docling_serve_settings.eng_rq_sub_channel,
scratch_dir=get_scratch(),
results_ttl=docling_serve_settings.eng_rq_results_ttl,
)
cm_config = DoclingConverterManagerConfig(
@@ -392,11 +424,20 @@ def rq_worker() -> Any:
batch_polling_interval_seconds=docling_serve_settings.batch_polling_interval_seconds,
)
run_worker(
rq_config=rq_config,
# Create worker with instrumentation
scratch_dir = rq_config.scratch_dir or Path(tempfile.mkdtemp(prefix="docling_"))
redis_conn, rq_queue = RQOrchestrator.make_rq_queue(rq_config)
worker = InstrumentedRQWorker(
[rq_queue],
connection=redis_conn,
orchestrator_config=rq_config,
cm_config=cm_config,
scratch_dir=scratch_dir,
)
worker.work()
def main() -> None:
app()

View File

@@ -78,8 +78,12 @@ from docling_serve.datamodel.responses import (
)
from docling_serve.helper_functions import DOCLING_VERSIONS, FormDepends
from docling_serve.orchestrator_factory import get_async_orchestrator
from docling_serve.otel_instrumentation import (
get_metrics_endpoint_content,
setup_otel_instrumentation,
)
from docling_serve.response_preparation import prepare_response
from docling_serve.settings import docling_serve_settings
from docling_serve.settings import AsyncEngine, docling_serve_settings
from docling_serve.storage import get_scratch
from docling_serve.websocket_notifier import WebsocketNotifier
@@ -176,6 +180,22 @@ def create_app(): # noqa: C901
version=version,
)
# Setup OpenTelemetry instrumentation
redis_url = (
docling_serve_settings.eng_rq_redis_url
if docling_serve_settings.eng_kind == AsyncEngine.RQ
else None
)
setup_otel_instrumentation(
app,
service_name=docling_serve_settings.otel_service_name,
enable_metrics=docling_serve_settings.otel_enable_metrics,
enable_traces=docling_serve_settings.otel_enable_traces,
enable_prometheus=docling_serve_settings.otel_enable_prometheus,
enable_otlp_metrics=docling_serve_settings.otel_enable_otlp_metrics,
redis_url=redis_url,
)
origins = docling_serve_settings.cors_origins
methods = docling_serve_settings.cors_methods
headers = docling_serve_settings.cors_headers
@@ -414,6 +434,16 @@ def create_app(): # noqa: C901
)
return DOCLING_VERSIONS
# Prometheus metrics endpoint
@app.get("/metrics", tags=["health"], include_in_schema=False)
def metrics():
from fastapi.responses import PlainTextResponse
return PlainTextResponse(
content=get_metrics_endpoint_content(),
media_type="text/plain; version=0.0.4",
)
# Convert a document from URL(s)
@app.post(
"/v1/convert/source",
@@ -633,7 +663,7 @@ def create_app(): # noqa: C901
chunking_options: Annotated[
opt_cls,
FormDepends(
HybridChunkerOptions,
opt_cls,
prefix="chunking_",
excluded_fields=["chunker"],
),
@@ -745,7 +775,7 @@ def create_app(): # noqa: C901
chunking_options: Annotated[
opt_cls,
FormDepends(
HybridChunkerOptions,
opt_cls,
prefix="chunking_",
excluded_fields=["chunker"],
),

View File

@@ -1,7 +1,7 @@
import json
import logging
from functools import lru_cache
from typing import Any, Optional
from typing import Any
import redis.asyncio as redis
@@ -23,7 +23,7 @@ class RedisTaskStatusMixin:
_task_result_keys: dict[str, str]
config: Any
def __init__(self, *args, **kwargs):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.redis_prefix = "docling:tasks:"
self._redis_pool = redis.ConnectionPool.from_url(
@@ -91,7 +91,7 @@ class RedisTaskStatusMixin:
_log.warning(f"Task {task_id} not found")
raise
async def _get_task_from_redis(self, task_id: str) -> Optional[Task]:
async def _get_task_from_redis(self, task_id: str) -> Task | None:
try:
async with redis.Redis(connection_pool=self._redis_pool) as r:
task_data = await r.get(f"{self.redis_prefix}{task_id}:metadata")
@@ -115,7 +115,7 @@ class RedisTaskStatusMixin:
_log.error(f"Redis get task {task_id}: {e}")
return None
async def _get_task_from_rq_direct(self, task_id: str) -> Optional[Task]:
async def _get_task_from_rq_direct(self, task_id: str) -> Task | None:
try:
_log.debug(f"Checking RQ for task {task_id}")
@@ -202,9 +202,11 @@ class RedisTaskStatusMixin:
data: dict[str, Any] = {
"task_id": task.task_id,
"task_type": task.task_type.value
if hasattr(task.task_type, "value")
else str(task.task_type),
"task_type": (
task.task_type.value
if hasattr(task.task_type, "value")
else str(task.task_type)
),
"task_status": task.task_status.value,
"processing_meta": meta,
}
@@ -304,14 +306,99 @@ def get_async_orchestrator() -> BaseOrchestrator:
RQOrchestratorConfig,
)
from docling_serve.rq_instrumentation import wrap_rq_queue_for_tracing
class RedisAwareRQOrchestrator(RedisTaskStatusMixin, RQOrchestrator): # type: ignore[misc]
pass
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
# Wrap RQ queue to inject trace context into jobs
if docling_serve_settings.otel_enable_traces:
wrap_rq_queue_for_tracing(self._rq_queue)
async def enqueue(self, **kwargs: Any) -> Task: # type: ignore[override]
"""Override enqueue to use instrumented job function when tracing is enabled."""
import base64
import uuid
import warnings
from docling.datamodel.base_models import DocumentStream
from docling_jobkit.datamodel.chunking import ChunkingExportOptions
from docling_jobkit.datamodel.http_inputs import FileSource, HttpSource
from docling_jobkit.datamodel.task import Task, TaskSource, TaskTarget
from docling_jobkit.datamodel.task_meta import TaskType
# Extract parameters
sources: list[TaskSource] = kwargs.get("sources", [])
target: TaskTarget = kwargs["target"]
task_type: TaskType = kwargs.get("task_type", TaskType.CONVERT)
options = kwargs.get("options")
convert_options = kwargs.get("convert_options")
chunking_options = kwargs.get("chunking_options")
chunking_export_options = kwargs.get("chunking_export_options")
if options is not None and convert_options is None:
convert_options = options
warnings.warn(
"'options' is deprecated and will be removed in a future version. "
"Use 'conversion_options' instead.",
DeprecationWarning,
stacklevel=2,
)
task_id = str(uuid.uuid4())
rq_sources: list[HttpSource | FileSource] = []
for source in sources:
if isinstance(source, DocumentStream):
encoded_doc = base64.b64encode(source.stream.read()).decode()
rq_sources.append(
FileSource(filename=source.name, base64_string=encoded_doc)
)
elif isinstance(source, (HttpSource | FileSource)):
rq_sources.append(source)
chunking_export_options = (
chunking_export_options or ChunkingExportOptions()
)
task = Task(
task_id=task_id,
task_type=task_type,
sources=rq_sources,
convert_options=convert_options,
chunking_options=chunking_options,
chunking_export_options=chunking_export_options,
target=target,
)
self.tasks.update({task.task_id: task})
task_data = task.model_dump(mode="json", serialize_as_any=True)
# Use instrumented job function if tracing is enabled
if docling_serve_settings.otel_enable_traces:
job_func = "docling_serve.rq_job_wrapper.instrumented_docling_task"
else:
job_func = "docling_jobkit.orchestrators.rq.worker.docling_task"
self._rq_queue.enqueue(
job_func,
kwargs={"task_data": task_data},
job_id=task_id,
timeout=14400,
)
await self.init_task_tracking(task)
# Store in Redis
await self._store_task_in_redis(task)
return task
rq_config = RQOrchestratorConfig(
redis_url=docling_serve_settings.eng_rq_redis_url,
results_prefix=docling_serve_settings.eng_rq_results_prefix,
sub_channel=docling_serve_settings.eng_rq_sub_channel,
scratch_dir=get_scratch(),
results_ttl=docling_serve_settings.eng_rq_results_ttl,
)
return RedisAwareRQOrchestrator(config=rq_config)

View File

@@ -0,0 +1,138 @@
"""OpenTelemetry instrumentation for docling-serve with metrics and traces."""
import logging
from opentelemetry import metrics, trace
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.exporter.prometheus import PrometheusMetricReader
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.trace.sampling import Decision, Sampler, SamplingResult
from opentelemetry.trace import SpanKind
from opentelemetry.util.types import Attributes
from prometheus_client import REGISTRY
from redis import Redis
from docling_serve.rq_metrics_collector import RQCollector
logger = logging.getLogger(__name__)
FILTERED_PATHS = {"/metrics", "/health", "/healthz", "/readyz", "/livez"}
class HealthMetricsFilterSampler(Sampler):
"""
Sampler that filters out traces for health and metrics endpoints.
Drops spans for /metrics, /health, /healthz, /readyz, /livez regardless
of query parameters. All other endpoints are sampled normally (always on).
"""
def should_sample(
self,
parent_context,
trace_id: int,
name: str,
kind: SpanKind | None = None,
attributes: Attributes | None = None,
links=None,
trace_state=None,
) -> SamplingResult:
if attributes:
http_target = attributes.get("http.target") or attributes.get("url.path")
if http_target:
path = str(http_target).split("?")[0]
if path in FILTERED_PATHS:
return SamplingResult(Decision.DROP)
return SamplingResult(Decision.RECORD_AND_SAMPLE)
def get_description(self) -> str:
"""Return description of the sampler."""
return (
"HealthMetricsFilterSampler: drops traces for health and metrics endpoints"
)
def setup_otel_instrumentation(
app,
service_name: str = "docling-serve",
enable_metrics: bool = True,
enable_traces: bool = True,
enable_prometheus: bool = True,
enable_otlp_metrics: bool = False,
redis_url: str | None = None,
):
"""
Set up OpenTelemetry instrumentation for FastAPI app.
Args:
app: FastAPI application instance
service_name: Service name for OTEL resource
enable_metrics: Enable OTEL metrics
enable_traces: Enable OTEL traces
enable_prometheus: Enable Prometheus metrics export
enable_otlp_metrics: Enable OTLP metrics export (for OTEL collector)
redis_url: Redis URL for RQ metrics (if using RQ engine)
"""
resource = Resource(attributes={SERVICE_NAME: service_name})
# Setup traces
if enable_traces:
logger.info("Setting up OpenTelemetry traces with health/metrics filtering")
sampler = HealthMetricsFilterSampler()
trace_provider = TracerProvider(resource=resource, sampler=sampler)
trace_provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter()))
trace.set_tracer_provider(trace_provider)
# Setup metrics
if enable_metrics:
logger.info("Setting up OpenTelemetry metrics")
metric_readers: list = []
# Prometheus metrics reader (for scraping endpoint)
if enable_prometheus:
logger.info("Enabling Prometheus metrics export")
prometheus_reader = PrometheusMetricReader()
metric_readers.append(prometheus_reader)
# OTLP metrics exporter (for OTEL collector)
if enable_otlp_metrics:
logger.info("Enabling OTLP metrics export")
otlp_reader = PeriodicExportingMetricReader(OTLPMetricExporter())
metric_readers.append(otlp_reader)
if metric_readers:
meter_provider = MeterProvider(
resource=resource,
metric_readers=metric_readers,
)
metrics.set_meter_provider(meter_provider)
# Instrument FastAPI
logger.info("Instrumenting FastAPI with OpenTelemetry")
FastAPIInstrumentor.instrument_app(app)
# Register RQ metrics if Redis URL is provided
if redis_url and enable_prometheus:
logger.info(f"Registering RQ metrics collector for Redis at {redis_url}")
connection = Redis.from_url(redis_url)
REGISTRY.register(RQCollector(connection))
def get_metrics_endpoint_content():
"""
Get Prometheus metrics content for /metrics endpoint.
Returns:
Prometheus-formatted metrics content
"""
from prometheus_client import REGISTRY, generate_latest
return generate_latest(REGISTRY)

View File

@@ -0,0 +1,197 @@
"""RQ instrumentation for distributed tracing across API and workers."""
import functools
import logging
from collections.abc import Callable
from typing import Any
from opentelemetry import trace
from opentelemetry.context import Context
from opentelemetry.propagate import extract, inject
from opentelemetry.trace import SpanKind, Status, StatusCode
logger = logging.getLogger(__name__)
# Global tracer instance
_tracer = trace.get_tracer(__name__)
def get_rq_tracer() -> trace.Tracer:
"""Get the RQ tracer instance."""
return _tracer
def inject_trace_context(job_kwargs: dict[str, Any]) -> dict[str, Any]:
"""
Inject current trace context into RQ job kwargs.
This is called when enqueueing a job to propagate the trace context
from the API request to the RQ worker.
Args:
job_kwargs: Job keyword arguments
Returns:
Modified job kwargs with trace context in metadata
"""
# Create carrier dict to hold trace context
carrier: dict[str, str] = {}
# Inject current context into carrier
inject(carrier)
# Store carrier in job metadata
if carrier:
job_kwargs.setdefault("meta", {})
job_kwargs["meta"]["otel_context"] = carrier
logger.debug(f"Injected trace context into job: {carrier}")
return job_kwargs
def extract_trace_context(job) -> Context | None:
"""
Extract trace context from RQ job metadata.
This is called in the worker to continue the trace that was started
in the API.
Args:
job: RQ Job instance
Returns:
OpenTelemetry context extracted from job metadata, or None if not found
"""
carrier = {}
# Extract carrier from job metadata
if hasattr(job, "meta") and job.meta:
carrier = job.meta.get("otel_context", {})
logger.debug(f"Extracted trace context from job: {carrier}")
# Extract context from carrier
return extract(carrier) if carrier else None
def instrument_rq_job(func: Callable) -> Callable:
"""
Decorator to instrument RQ job functions with tracing.
This creates a span for the job execution and links it to the parent
trace from the API request.
Args:
func: Job function to instrument
Returns:
Instrumented function
"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
# Get job instance (RQ passes it as first arg in some contexts)
job = kwargs.get("job")
# Extract trace context from job
parent_context = extract_trace_context(job) if job else None
# Create span with parent context
tracer = get_rq_tracer()
span_name = f"rq.job.{func.__name__}"
with tracer.start_as_current_span(
span_name,
context=parent_context,
kind=SpanKind.CONSUMER,
) as span:
try:
# Add job metadata as span attributes
if job:
span.set_attribute("rq.job.id", job.id)
span.set_attribute("rq.job.func_name", job.func_name)
span.set_attribute("rq.queue.name", job.origin)
if hasattr(job, "description"):
span.set_attribute("rq.job.description", job.description)
# Execute the actual job function
result = func(*args, **kwargs)
# Mark span as successful
span.set_status(Status(StatusCode.OK))
return result
except Exception as e:
# Record exception and mark span as failed
span.record_exception(e)
span.set_status(Status(StatusCode.ERROR, str(e)))
raise
return wrapper
def setup_rq_worker_instrumentation():
"""
Set up OpenTelemetry instrumentation for RQ workers.
This should be called when starting an RQ worker to ensure traces
are properly initialized in the worker process.
"""
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
# Check if tracer provider is already set
if not isinstance(trace.get_tracer_provider(), TracerProvider):
logger.info("Setting up OpenTelemetry for RQ worker")
# Create resource with service name
resource = Resource(attributes={SERVICE_NAME: "docling-serve-worker"})
# Set up trace provider
trace_provider = TracerProvider(resource=resource)
trace_provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter()))
trace.set_tracer_provider(trace_provider)
# Update global tracer
global _tracer
_tracer = trace.get_tracer(__name__)
logger.info("OpenTelemetry setup complete for RQ worker")
else:
logger.debug("OpenTelemetry already configured for RQ worker")
def wrap_rq_queue_for_tracing(rq_queue: Any) -> None:
"""
Wrap RQ queue's enqueue method to inject trace context into jobs.
This monkey-patches the queue's enqueue method to automatically inject
the current trace context into job metadata.
Args:
rq_queue: RQ Queue instance to wrap
"""
original_enqueue = rq_queue.enqueue
def traced_enqueue(*args: Any, **kwargs: Any) -> Any:
"""Wrapped enqueue that injects trace context."""
# Get or create meta dict for the job
meta = kwargs.get("meta", {})
# Inject trace context into meta
carrier: dict[str, str] = {}
inject(carrier)
if carrier:
meta["otel_context"] = carrier
kwargs["meta"] = meta
logger.debug(f"Injected trace context into RQ job: {carrier}")
# Call original enqueue
return original_enqueue(*args, **kwargs)
# Replace enqueue method
rq_queue.enqueue = traced_enqueue
logger.info("RQ queue wrapped for distributed tracing")

View File

@@ -0,0 +1,215 @@
"""Instrumented wrapper for RQ job functions with OpenTelemetry tracing."""
import logging
import shutil
from pathlib import Path
from typing import Any, Union
import msgpack
from opentelemetry import trace
from opentelemetry.trace import SpanKind, Status, StatusCode
from rq import get_current_job
from docling.datamodel.base_models import DocumentStream
from docling_jobkit.convert.chunking import process_chunk_results
from docling_jobkit.convert.manager import DoclingConverterManager
from docling_jobkit.convert.results import process_export_results
from docling_jobkit.datamodel.http_inputs import FileSource, HttpSource
from docling_jobkit.datamodel.task import Task
from docling_jobkit.datamodel.task_meta import TaskStatus, TaskType
from docling_jobkit.orchestrators.rq.orchestrator import (
RQOrchestratorConfig,
_TaskUpdate,
)
from docling_jobkit.orchestrators.rq.worker import make_msgpack_safe
from docling_serve.rq_instrumentation import extract_trace_context
logger = logging.getLogger(__name__)
def instrumented_docling_task( # noqa: C901
task_data: dict,
conversion_manager: DoclingConverterManager,
orchestrator_config: RQOrchestratorConfig,
scratch_dir: Path,
):
"""
Instrumented wrapper for docling_task that extracts and activates trace context.
This function extracts the OpenTelemetry trace context from the RQ job metadata
and activates it before calling the actual task function, enabling end-to-end
distributed tracing from API to worker. It also adds detailed sub-spans for each
processing stage to identify performance bottlenecks.
"""
job = get_current_job()
assert job is not None
conn = job.connection
task = Task.model_validate(task_data)
task_id = task.task_id
# Extract parent trace context from job metadata
parent_context = extract_trace_context(job) if job else None
# Get tracer
tracer = trace.get_tracer(__name__)
# Create main job span with parent context (this creates the link to the API trace)
with tracer.start_as_current_span(
"rq.job.docling_task",
context=parent_context,
kind=SpanKind.CONSUMER,
) as span:
try:
# Add job attributes
span.set_attribute("rq.job.id", job.id)
if job.func_name:
span.set_attribute("rq.job.func_name", job.func_name)
span.set_attribute("rq.queue.name", job.origin)
# Add task attributes
span.set_attribute("docling.task.id", task_id)
span.set_attribute("docling.task.type", str(task.task_type.value))
span.set_attribute("docling.task.num_sources", len(task.sources))
logger.info(
f"Executing docling_task {task_id} with "
f"trace_id={span.get_span_context().trace_id:032x} "
f"span_id={span.get_span_context().span_id:016x}"
)
# Notify task started
with tracer.start_as_current_span("notify.task_started"):
conn.publish(
orchestrator_config.sub_channel,
_TaskUpdate(
task_id=task_id,
task_status=TaskStatus.STARTED,
).model_dump_json(),
)
workdir = scratch_dir / task_id
# Prepare sources with detailed tracing
with tracer.start_as_current_span("prepare_sources") as prep_span:
convert_sources: list[Union[str, DocumentStream]] = []
headers: dict[str, Any] | None = None
for idx, source in enumerate(task.sources):
if isinstance(source, DocumentStream):
convert_sources.append(source)
prep_span.add_event(
f"source_{idx}_prepared",
{"type": "DocumentStream", "name": source.name},
)
elif isinstance(source, FileSource):
convert_sources.append(source.to_document_stream())
prep_span.add_event(
f"source_{idx}_prepared",
{"type": "FileSource", "filename": source.filename},
)
elif isinstance(source, HttpSource):
convert_sources.append(str(source.url))
if headers is None and source.headers:
headers = source.headers
prep_span.add_event(
f"source_{idx}_prepared",
{"type": "HttpSource", "url": str(source.url)},
)
prep_span.set_attribute("num_sources", len(convert_sources))
if not conversion_manager:
raise RuntimeError("No converter")
if not task.convert_options:
raise RuntimeError("No conversion options")
# Document conversion with detailed tracing
with tracer.start_as_current_span("convert_documents") as conv_span:
conv_span.set_attribute("num_sources", len(convert_sources))
conv_span.set_attribute("has_headers", headers is not None)
conv_results = conversion_manager.convert_documents(
sources=convert_sources,
options=task.convert_options,
headers=headers,
)
# Result processing with detailed tracing
with tracer.start_as_current_span("process_results") as proc_span:
proc_span.set_attribute("task_type", str(task.task_type.value))
if task.task_type == TaskType.CONVERT:
with tracer.start_as_current_span("process_export_results"):
processed_results = process_export_results(
task=task,
conv_results=conv_results,
work_dir=workdir,
)
elif task.task_type == TaskType.CHUNK:
with tracer.start_as_current_span("process_chunk_results"):
processed_results = process_chunk_results(
task=task,
conv_results=conv_results,
work_dir=workdir,
)
# Serialize and store results
with tracer.start_as_current_span("serialize_and_store") as store_span:
safe_data = make_msgpack_safe(processed_results.model_dump())
packed = msgpack.packb(safe_data, use_bin_type=True)
store_span.set_attribute("result_size_bytes", len(packed))
result_key = f"{orchestrator_config.results_prefix}:{task_id}"
conn.setex(result_key, orchestrator_config.results_ttl, packed)
store_span.set_attribute("result_key", result_key)
# Notify task success
with tracer.start_as_current_span("notify.task_success"):
conn.publish(
orchestrator_config.sub_channel,
_TaskUpdate(
task_id=task_id,
task_status=TaskStatus.SUCCESS,
result_key=result_key,
).model_dump_json(),
)
# Clean up
with tracer.start_as_current_span("cleanup"):
if workdir.exists():
shutil.rmtree(workdir)
# Mark span as successful
span.set_status(Status(StatusCode.OK))
logger.info(f"Docling task {task_id} completed successfully")
return result_key
except Exception as e:
# Notify task failure
try:
conn.publish(
orchestrator_config.sub_channel,
_TaskUpdate(
task_id=task_id,
task_status=TaskStatus.FAILURE,
).model_dump_json(),
)
except Exception:
pass
# Clean up on error
workdir = scratch_dir / task_id
if workdir.exists():
try:
shutil.rmtree(workdir)
except Exception:
pass
# Record exception and mark span as failed
logger.error(f"Docling task {task_id} failed: {e}", exc_info=True)
span.record_exception(e)
span.set_status(Status(StatusCode.ERROR, str(e)))
raise

View File

@@ -0,0 +1,132 @@
# Heavily based on https://github.com/mdawar/rq-exporter
import logging
from prometheus_client import Summary
from prometheus_client.core import CounterMetricFamily, GaugeMetricFamily
from prometheus_client.registry import Collector
from redis import Redis
from rq import Queue, Worker
from rq.job import JobStatus
logger = logging.getLogger(__name__)
def get_redis_connection(url: str):
return Redis.from_url(url)
def get_workers_stats(connection):
"""Get the RQ workers stats."""
workers = Worker.all(connection)
return [
{
"name": w.name,
"queues": w.queue_names(),
"state": w.get_state(),
"successful_job_count": w.successful_job_count,
"failed_job_count": w.failed_job_count,
"total_working_time": w.total_working_time,
}
for w in workers
]
def get_queue_jobs(connection, queue_name):
"""Get the jobs by status of a Queue."""
queue = Queue(connection=connection, name=queue_name)
return {
JobStatus.QUEUED: queue.count,
JobStatus.STARTED: queue.started_job_registry.count,
JobStatus.FINISHED: queue.finished_job_registry.count,
JobStatus.FAILED: queue.failed_job_registry.count,
JobStatus.DEFERRED: queue.deferred_job_registry.count,
JobStatus.SCHEDULED: queue.scheduled_job_registry.count,
}
def get_jobs_by_queue(connection):
"""Get the current jobs by queue"""
queues = Queue.all(connection)
return {q.name: get_queue_jobs(connection, q.name) for q in queues}
class RQCollector(Collector):
"""RQ stats collector."""
def __init__(self, connection=None):
self.connection = connection
# RQ data collection count and time in seconds
self.summary = Summary(
"rq_request_processing_seconds", "Time spent collecting RQ data"
)
def collect(self):
"""Collect RQ Metrics."""
logger.debug("Collecting the RQ metrics...")
with self.summary.time():
rq_workers = GaugeMetricFamily(
"rq_workers",
"RQ workers",
labels=["name", "state", "queues"],
)
rq_workers_success = CounterMetricFamily(
"rq_workers_success",
"RQ workers success count",
labels=["name", "queues"],
)
rq_workers_failed = CounterMetricFamily(
"rq_workers_failed",
"RQ workers fail count",
labels=["name", "queues"],
)
rq_workers_working_time = CounterMetricFamily(
"rq_workers_working_time",
"RQ workers spent seconds",
labels=["name", "queues"],
)
rq_jobs = GaugeMetricFamily(
"rq_jobs",
"RQ jobs by state",
labels=["queue", "status"],
)
workers = get_workers_stats(self.connection)
for worker in workers:
label_queues = ",".join(worker["queues"])
rq_workers.add_metric(
[worker["name"], worker["state"], label_queues],
1,
)
rq_workers_success.add_metric(
[worker["name"], label_queues],
worker["successful_job_count"],
)
rq_workers_failed.add_metric(
[worker["name"], label_queues],
worker["failed_job_count"],
)
rq_workers_working_time.add_metric(
[worker["name"], label_queues],
worker["total_working_time"],
)
yield rq_workers
yield rq_workers_success
yield rq_workers_failed
yield rq_workers_working_time
for queue_name, jobs in get_jobs_by_queue(self.connection).items():
for status, count in jobs.items():
rq_jobs.add_metric([queue_name, status], count)
yield rq_jobs
logger.debug("RQ metrics collection finished")

View File

@@ -0,0 +1,106 @@
"""Instrumented RQ worker with OpenTelemetry tracing support."""
import logging
from pathlib import Path
from opentelemetry import trace
from opentelemetry.trace import SpanKind, Status, StatusCode
from docling_jobkit.convert.manager import (
DoclingConverterManagerConfig,
)
from docling_jobkit.orchestrators.rq.orchestrator import RQOrchestratorConfig
from docling_jobkit.orchestrators.rq.worker import CustomRQWorker
from docling_serve.rq_instrumentation import extract_trace_context
logger = logging.getLogger(__name__)
class InstrumentedRQWorker(CustomRQWorker):
"""RQ Worker with OpenTelemetry tracing instrumentation."""
def __init__(
self,
*args,
orchestrator_config: RQOrchestratorConfig,
cm_config: DoclingConverterManagerConfig,
scratch_dir: Path,
**kwargs,
):
super().__init__(
*args,
orchestrator_config=orchestrator_config,
cm_config=cm_config,
scratch_dir=scratch_dir,
**kwargs,
)
self.tracer = trace.get_tracer(__name__)
def perform_job(self, job, queue):
"""
Perform job with distributed tracing support.
This extracts the trace context from the job metadata and creates
a span that continues the trace from the API request.
"""
# Extract parent trace context from job metadata
parent_context = extract_trace_context(job)
# Create span name from job function
func_name = job.func_name if hasattr(job, "func_name") else "unknown"
span_name = f"rq.job.{func_name}"
# Start span with parent context
with self.tracer.start_as_current_span(
span_name,
context=parent_context,
kind=SpanKind.CONSUMER,
) as span:
try:
# Add job attributes to span
span.set_attribute("rq.job.id", job.id)
span.set_attribute("rq.job.func_name", func_name)
span.set_attribute("rq.queue.name", queue.name)
if hasattr(job, "description") and job.description:
span.set_attribute("rq.job.description", job.description)
if hasattr(job, "timeout") and job.timeout:
span.set_attribute("rq.job.timeout", job.timeout)
# Add job kwargs info
if hasattr(job, "kwargs") and job.kwargs:
# Add conversion manager before executing
job.kwargs["conversion_manager"] = self.conversion_manager
job.kwargs["orchestrator_config"] = self.orchestrator_config
job.kwargs["scratch_dir"] = self.scratch_dir
# Log task info if available
task_type = job.kwargs.get("task_type")
if task_type:
span.set_attribute("docling.task.type", str(task_type))
sources = job.kwargs.get("sources", [])
if sources:
span.set_attribute("docling.task.num_sources", len(sources))
logger.info(
f"Executing job {job.id} with trace_id={span.get_span_context().trace_id:032x}"
)
# Execute the actual job
result = super(CustomRQWorker, self).perform_job(job, queue)
# Mark span as successful
span.set_status(Status(StatusCode.OK))
logger.debug(f"Job {job.id} completed successfully")
return result
except Exception as e:
# Record exception and mark span as failed
logger.error(f"Job {job.id} failed: {e}", exc_info=True)
span.record_exception(e)
span.set_status(Status(StatusCode.ERROR, str(e)))
raise

View File

@@ -3,7 +3,7 @@ import sys
from pathlib import Path
from typing import Optional, Union
from pydantic import AnyUrl, model_validator
from pydantic import AnyUrl, field_validator, model_validator
from pydantic_settings import BaseSettings, SettingsConfigDict
from typing_extensions import Self
@@ -25,6 +25,12 @@ class UvicornSettings(BaseSettings):
workers: Union[int, None] = None
class LogLevel(str, enum.Enum):
WARNING = "WARNING"
INFO = "INFO"
DEBUG = "DEBUG"
class AsyncEngine(str, enum.Enum):
LOCAL = "local"
KFP = "kfp"
@@ -41,6 +47,7 @@ class DoclingServeSettings(BaseSettings):
enable_ui: bool = False
api_host: str = "localhost"
log_level: Optional[LogLevel] = None
artifacts_path: Optional[Path] = None
static_path: Optional[Path] = None
scratch_path: Optional[Path] = None
@@ -80,6 +87,7 @@ class DoclingServeSettings(BaseSettings):
eng_rq_redis_url: str = ""
eng_rq_results_prefix: str = "docling:results"
eng_rq_sub_channel: str = "docling:updates"
eng_rq_results_ttl: int = 3_600 * 4 # 4 hours default
# KFP engine
eng_kfp_endpoint: Optional[AnyUrl] = None
eng_kfp_token: Optional[str] = None
@@ -90,6 +98,23 @@ class DoclingServeSettings(BaseSettings):
eng_kfp_experimental: bool = False
# OpenTelemetry settings
otel_enable_metrics: bool = True
otel_enable_traces: bool = False
otel_enable_prometheus: bool = True
otel_enable_otlp_metrics: bool = False
otel_service_name: str = "docling-serve"
@field_validator("log_level", mode="before")
@classmethod
def validate_log_level(cls, v: Optional[str]) -> Optional[str]:
"""Validate and normalize log level to uppercase for case-insensitive support."""
if v is None:
return v
if isinstance(v, str):
return v.upper()
return v
@model_validator(mode="after")
def engine_settings(self) -> Self:
# Validate KFP engine settings

View File

@@ -77,9 +77,9 @@ def create_ui_app(process_file, process_url, task_result, task_status_poll) -> F
ConvertDocumentsRequestOptions, FormDepends(ConvertDocumentsRequestOptions)
],
files: Annotated[list[UploadFile], Form()],
url: Annotated[str, Form()],
page_min: Annotated[str, Form()],
page_max: Annotated[str, Form()],
url: Annotated[str, Form()] = "",
page_min: Annotated[str, Form()] = "",
page_max: Annotated[str, Form()] = "",
):
# Refined model options and behavior.
if len(page_min) > 0:

View File

@@ -35,6 +35,7 @@ THe following table describes the options to configure the Docling Serve app.
| CLI option | ENV | Default | Description |
| -----------|-----|---------|-------------|
| `-v, --verbose` | `DOCLING_SERVE_LOG_LEVEL` | `WARNING` | Set the verbosity level. CLI: `-v` for INFO, `-vv` for DEBUG. ENV: `WARNING`, `INFO`, or `DEBUG` (case-insensitive). CLI flag takes precedence over ENV. |
| `--artifacts-path` | `DOCLING_SERVE_ARTIFACTS_PATH` | unset | If set to a valid directory, the model weights will be loaded from this path |
| | `DOCLING_SERVE_STATIC_PATH` | unset | If set to a valid directory, the static assets for the docs and UI will be loaded from this path |
| | `DOCLING_SERVE_SCRATCH_PATH` | | If set, this directory will be used as scratch workspace, e.g. storing the results before they get requested. If unset, a temporary created is created for this purpose. |
@@ -98,6 +99,7 @@ The following table describes the options to configure the Docling Serve RQ engi
| `DOCLING_SERVE_ENG_RQ_REDIS_URL` | (required) | The connection Redis url, e.g. `redis://localhost:6373/` |
| `DOCLING_SERVE_ENG_RQ_RESULTS_PREFIX` | `docling:results` | The prefix used for storing the results in Redis. |
| `DOCLING_SERVE_ENG_RQ_SUB_CHANNEL` | `docling:updates` | The channel key name used for storing communicating updates between the workers and the orchestrator. |
| `DOCLING_SERVE_ENG_RQ_RESULTS_TTL` | `14400` (4 hours) | Time To Live (in seconds) for RQ job results in Redis. This controls how long job results are kept before being automatically deleted. |
#### KFP engine
@@ -112,9 +114,22 @@ The following table describes the options to configure the Docling Serve KFP eng
| `DOCLING_SERVE_ENG_KFP_SELF_CALLBACK_TOKEN_PATH` | | The token used for authenticating the progress callback. For cluster-internal workloads, use `/run/secrets/kubernetes.io/serviceaccount/token`. |
| `DOCLING_SERVE_ENG_KFP_SELF_CALLBACK_CA_CERT_PATH` | | The CA certificate for the progress callback. For cluster-inetrnal workloads, use `/var/run/secrets/kubernetes.io/serviceaccount/service-ca.crt`. |
#### Gradio UI
### Gradio UI
When using Gradio UI and using the option to output conversion as file, Gradio uses cache to prevent files to be overwritten ([more info here](https://www.gradio.app/guides/file-access#the-gradio-cache)), and we defined the cache clean frequency of one hour to clean files older than 10hours. For situations that files need to be available to download from UI older than 10 hours, there is two options:
- Increase the older age of files to clean [here](https://github.com/docling-project/docling-serve/blob/main/docling_serve/gradio_ui.py#L483) to suffice the age desired;
- Or set the clean up manually by defining the temporary dir of Gradio to use the same as `DOCLING_SERVE_SCRATCH_PATH` absolute path. This can be achieved by setting the environment variable `GRADIO_TEMP_DIR`, that can be done via command line `export GRADIO_TEMP_DIR="<same_path_as_scratch>"` or in `Dockerfile` using `ENV GRADIO_TEMP_DIR="<same_path_as_scratch>"`. After this, set the clean of cache to `None` [here](https://github.com/docling-project/docling-serve/blob/main/docling_serve/gradio_ui.py#L483). Now, the clean up of `DOCLING_SERVE_SCRATCH_PATH` will also clean the Gradio temporary dir. (If you use this option, please remember when reversing changes to remove the environment variable `GRADIO_TEMP_DIR`, otherwise may lead to files not be available to download).
### Telemetry
THe following table describes the telemetry options for the Docling Serve app. Some deployment examples are available in [examples/OTEL.md](../examples/OTEL.md).
ENV | Default | Description |
|-----|---------|-------------|
| `DOCLING_SERVE_OTEL_ENABLE_METRICS` | true | Enable metrics collection. |
| `DOCLING_SERVE_OTEL_ENABLE_TRACES` | false | Enable trace collection. Requires a valid value for `OTEL_EXPORTER_OTLP_ENDPOINT`. |
| `DOCLING_SERVE_OTEL_ENABLE_PROMETHEUS` | true | Enable Prometheus /metrics endpoint. |
| `DOCLING_SERVE_OTEL_ENABLE_OTLP_METRICS` | `false` | Enable OTLP metrics export. |
| `DOCLING_SERVE_OTEL_SERVICE_NAME` | docling-serve | Service identification. |
| `OTEL_EXPORTER_OTLP_ENDPOINT` | | OTLP endpoint (for traces and optional metrics). |

View File

@@ -12,7 +12,7 @@ On top of the source of file (see below), both endpoints support the same parame
| Field Name | Type | Description |
|------------|------|-------------|
| `from_formats` | List[InputFormat] | Input format(s) to convert from. String or list of strings. Allowed values: `docx`, `pptx`, `html`, `image`, `pdf`, `asciidoc`, `md`, `csv`, `xlsx`, `xml_uspto`, `xml_jats`, `mets_gbs`, `json_docling`, `audio`, `vtt`. Optional, defaults to all formats. |
| `to_formats` | List[OutputFormat] | Output format(s) to convert to. String or list of strings. Allowed values: `md`, `json`, `html`, `html_split_page`, `text`, `doctags`. Optional, defaults to Markdown. |
| `to_formats` | List[OutputFormat] | Output format(s) to convert to. String or list of strings. Allowed values: `md`, `json`, `yaml`, `html`, `html_split_page`, `text`, `doctags`. Optional, defaults to Markdown. |
| `image_export_mode` | ImageRefMode | Image export mode for the document (in case of JSON, Markdown or HTML). Allowed values: `placeholder`, `embedded`, `referenced`. Optional, defaults to Embedded. |
| `do_ocr` | bool | If enabled, the bitmap content will be processed using OCR. Boolean. Optional, defaults to true |
| `force_ocr` | bool | If enabled, replace existing text with OCR-generated text over content. Boolean. Optional, defaults to false. |
@@ -32,6 +32,7 @@ On top of the source of file (see below), both endpoints support the same parame
| `do_code_enrichment` | bool | If enabled, perform OCR code enrichment. Boolean. Optional, defaults to false. |
| `do_formula_enrichment` | bool | If enabled, perform formula OCR, return LaTeX code. Boolean. Optional, defaults to false. |
| `do_picture_classification` | bool | If enabled, classify pictures in documents. Boolean. Optional, defaults to false. |
| `do_chart_extraction` | bool | If enabled, extract numeric data from charts. Boolean. Optional, defaults to false. |
| `do_picture_description` | bool | If enabled, describe pictures in documents. Boolean. Optional, defaults to false. |
| `picture_description_area_threshold` | float | Minimum percentage of the area for a picture to be processed with the models. |
| `picture_description_local` | PictureDescriptionLocal or NoneType | Options for running a local vision-language model in the picture description. The parameters refer to a model hosted on Hugging Face. This parameter is mutually exclusive with `picture_description_api`. |

144
examples/OTEL.md Normal file
View File

@@ -0,0 +1,144 @@
# OpenTelemetry Integration for Docling Serve
Docling Serve includes built-in OpenTelemetry instrumentation for metrics and distributed tracing.
## Features
- **Metrics**: Prometheus-compatible metrics endpoint at `/metrics`
- **Traces**: OTLP trace export to OpenTelemetry collectors
- **FastAPI Auto-instrumentation**: HTTP request metrics and traces
- **RQ Metrics**: Worker and job queue metrics (when using RQ engine)
## Configuration
All settings are controlled via environment variables:
```bash
# Enable/disable features
DOCLING_SERVE_OTEL_ENABLE_METRICS=true # Enable metrics collection
DOCLING_SERVE_OTEL_ENABLE_TRACES=true # Enable trace collection
DOCLING_SERVE_OTEL_ENABLE_PROMETHEUS=true # Enable Prometheus /metrics endpoint
DOCLING_SERVE_OTEL_ENABLE_OTLP_METRICS=false # Enable OTLP metrics export
# Service identification
DOCLING_SERVE_OTEL_SERVICE_NAME=docling-serve
# OTLP endpoint (for traces and optional metrics)
OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4317
```
## Quick Start
### Option 1: Direct Prometheus Scraping
1. Start docling-serve with default settings:
```bash
uv run docling-serve
```
2. Add to your `prometheus.yml`:
```yaml
scrape_configs:
- job_name: 'docling-serve'
static_configs:
- targets: ['localhost:5001']
```
3. Access metrics at `http://localhost:5001/metrics`
### Option 2: Full OTEL Stack with Docker Compose
1. Use the provided compose file:
```bash
cd examples
mkdit tempo-data
docker-compose -f docker-compose-otel.yaml up
```
2. This starts:
- **docling-serve**: API server with UI
- **docling-worker**: RQ worker for distributed processing (scales independently)
- **redis**: Message queue for RQ
- **otel-collector**: Receives and routes telemetry
- **prometheus**: Metrics storage
- **tempo**: Trace storage
- **grafana**: Visualization UI
3. Access:
- Docling Serve UI: `http://localhost:5001/ui`
- Metrics endpoint: `http://localhost:5001/metrics`
- Grafana: `http://localhost:3000` (pre-configured with Prometheus & Tempo)
- Prometheus: `http://localhost:9090`
- Tempo: `http://localhost:3200`
4. Scale workers (optional):
```bash
docker-compose -f docker-compose-otel.yaml up --scale docling-worker=3
```
## Available Metrics
### HTTP Metrics (from OpenTelemetry FastAPI instrumentation)
- `http_server_request_duration` - Request duration histogram
- `http_server_active_requests` - Active requests gauge
- `http_server_request_size` - Request size histogram
- `http_server_response_size` - Response size histogram
### RQ Metrics (when using RQ engine)
- `rq_workers` - Number of workers by state
- `rq_workers_success` - Successful job count per worker
- `rq_workers_failed` - Failed job count per worker
- `rq_workers_working_time` - Total working time per worker
- `rq_jobs` - Job counts by queue and status
- `rq_request_processing_seconds` - RQ metrics collection time
## Traces
Traces are automatically generated for:
- All HTTP requests to FastAPI endpoints
- Document conversion operations
- **RQ job execution (distributed tracing)**: When using RQ engine, traces propagate from API requests to worker jobs, providing end-to-end visibility across the distributed system
View traces in Grafana Tempo or any OTLP-compatible backend.
### Distributed Tracing in RQ Mode
When running with the RQ engine (`DOCLING_SERVE_ENG_KIND=rq`), traces automatically propagate from the API to RQ workers:
1. **API Request**: FastAPI creates a trace when a document conversion request arrives
2. **Job Enqueue**: The trace context is injected into the RQ job metadata
3. **Worker Execution**: The RQ worker extracts the trace context and continues the trace
4. **End-to-End View**: You can see the complete request flow from API to worker in Grafana
This allows you to:
- Track document processing latency across API and workers
- Identify bottlenecks in the conversion pipeline
- Debug distributed processing issues
- Monitor queue wait times and processing times separately
## Example Files
See the `examples/` directory:
- `prometheus-scrape.yaml` - Prometheus scrape configuration examples
- `docker-compose-otel.yaml` - Full observability stack
- `otel-collector-config.yaml` - OTEL collector configuration
- `prometheus.yaml` - Prometheus configuration
- `tempo.yaml` - Tempo trace storage configuration
- `grafana-datasources.yaml` - Grafana data source provisioning
## Production Considerations
1. **Security**: Add authentication to the `/metrics` endpoint if needed
2. **Performance**: Metrics collection has minimal overhead (<1ms per scrape)
3. **Storage**: Configure retention policies in Prometheus/Tempo
4. **Sampling**: Configure trace sampling for high-volume services
5. **Labels**: Keep cardinality low to avoid metric explosion
## Disabling OTEL
To disable all OTEL features:
```bash
DOCLING_SERVE_OTEL_ENABLE_METRICS=false
DOCLING_SERVE_OTEL_ENABLE_TRACES=false
```

View File

@@ -0,0 +1,152 @@
# Docker Compose configuration for Docling Serve with OpenTelemetry
# Includes OTEL collector, Prometheus, Grafana, and Tempo for full observability
services:
# Docling Serve application
docling-serve:
build:
context: ..
dockerfile: Containerfile
platform: "linux/arm64"
ports:
- "5001:5001"
environment:
# Basic settings
- DOCLING_SERVE_ENABLE_UI=1
# OpenTelemetry settings
- OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4317
- OTEL_SERVICE_NAME=docling-serve
- DOCLING_SERVE_OTEL_ENABLE_METRICS=true
- DOCLING_SERVE_OTEL_ENABLE_TRACES=true
- DOCLING_SERVE_OTEL_ENABLE_PROMETHEUS=true
- DOCLING_SERVE_OTEL_ENABLE_OTLP_METRICS=true
# RQ settings for distributed processing
- DOCLING_SERVE_ENG_KIND=rq
- DOCLING_SERVE_ENG_RQ_REDIS_URL=redis://redis:6379/0
depends_on:
- otel-collector
- redis
networks:
- docling-otel
# Docling Serve RQ Worker
docling-worker:
build:
context: ..
dockerfile: Containerfile
platform: "linux/arm64"
command: ["docling-serve", "rq-worker"]
environment:
# OpenTelemetry settings
- OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4317
- OTEL_SERVICE_NAME=docling-serve-worker
- DOCLING_SERVE_OTEL_ENABLE_METRICS=false
- DOCLING_SERVE_OTEL_ENABLE_TRACES=true
# RQ settings
- DOCLING_SERVE_ENG_KIND=rq
- DOCLING_SERVE_ENG_RQ_REDIS_URL=redis://redis:6379/0
depends_on:
- otel-collector
- redis
networks:
- docling-otel
# Scale workers as needed
deploy:
replicas: 1
# OpenTelemetry Collector
otel-collector:
image: otel/opentelemetry-collector-contrib:latest
command: ["--config=/etc/otel-collector-config.yaml"]
volumes:
- ./otel-collector-config.yaml:/etc/otel-collector-config.yaml
ports:
- "4317:4317" # OTLP gRPC
- "4318:4318" # OTLP HTTP
- "8888:8888" # Prometheus metrics (collector's own metrics)
- "8889:8889" # Prometheus exporter
networks:
- docling-otel
# Prometheus for metrics storage
prometheus:
image: prom/prometheus:latest
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.path=/prometheus'
- '--web.console.libraries=/usr/share/prometheus/console_libraries'
- '--web.console.templates=/usr/share/prometheus/consoles'
volumes:
- ./prometheus.yaml:/etc/prometheus/prometheus.yml
- prometheus-data:/prometheus
ports:
- "9090:9090"
networks:
- docling-otel
# Tempo for trace storage
tempo-init:
image: busybox:latest
user: root
entrypoint:
- "chown"
- "10001:10001"
- "/var/tempo"
volumes:
- ./tempo-data:/var/tempo
tempo:
image: grafana/tempo:latest
command: ["-config.file=/etc/tempo.yaml"]
volumes:
- ./tempo.yaml:/etc/tempo.yaml
- ./tempo-data:/tmp/tempo
ports:
- "3200:3200" # Tempo
- "4319:4317" # OTLP gRPC (alternate port to avoid conflict)
networks:
- docling-otel
depends_on:
- tempo-init
# Grafana for visualization
grafana:
image: grafana/grafana:latest
environment:
- GF_AUTH_ANONYMOUS_ENABLED=true
- GF_AUTH_ANONYMOUS_ORG_ROLE=Admin
- GF_AUTH_DISABLE_LOGIN_FORM=true
volumes:
- grafana-data:/var/lib/grafana
- ./grafana-datasources.yaml:/etc/grafana/provisioning/datasources/datasources.yaml
ports:
- "3000:3000"
depends_on:
- prometheus
- tempo
networks:
- docling-otel
# Redis for RQ distributed processing
redis:
image: redis:7-alpine
ports:
- "6379:6379"
networks:
- docling-otel
volumes:
prometheus-data:
grafana-data:
networks:
docling-otel:
driver: bridge

View File

@@ -0,0 +1,33 @@
# Grafana datasources configuration
# Automatically provisions Prometheus and Tempo as data sources
apiVersion: 1
datasources:
- name: Prometheus
type: prometheus
access: proxy
url: http://prometheus:9090
isDefault: true
editable: true
jsonData:
timeInterval: 15s
httpMethod: POST
version: 1
- name: Tempo
type: tempo
access: proxy
url: http://tempo:3200
editable: true
jsonData:
httpMethod: GET
tracesToLogs:
datasourceUid: Loki
tags: ['job', 'instance', 'pod']
tracesToMetrics:
datasourceUid: Prometheus
tags: [{ key: 'service.name', value: 'service' }]
serviceMap:
datasourceUid: Prometheus
version: 1

View File

@@ -0,0 +1,58 @@
# OpenTelemetry Collector Configuration
# Receives traces and metrics from docling-serve and exports to Prometheus/Tempo
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
http:
endpoint: 0.0.0.0:4318
processors:
batch:
timeout: 10s
send_batch_size: 1024
memory_limiter:
check_interval: 1s
limit_mib: 512
# Add resource attributes
resource:
attributes:
- key: service.namespace
value: docling
action: insert
exporters:
# Export traces to Tempo
otlp/tempo:
endpoint: tempo:4317
tls:
insecure: true
# Export metrics to Prometheus
prometheus:
endpoint: "0.0.0.0:8889"
namespace: docling
const_labels:
environment: production
# Debug logging (optional, remove in production)
debug:
verbosity: detailed
sampling_initial: 5
sampling_thereafter: 200
service:
pipelines:
traces:
receivers: [otlp]
processors: [memory_limiter, batch, resource]
exporters: [otlp/tempo, debug]
metrics:
receivers: [otlp]
processors: [memory_limiter, batch, resource]
exporters: [prometheus, debug]

View File

@@ -0,0 +1,106 @@
# Prometheus Scrape Configuration for Docling Serve
# Add this to your Prometheus configuration file (prometheus.yml)
scrape_configs:
- job_name: 'docling-serve'
# Scrape interval
scrape_interval: 15s
scrape_timeout: 10s
# Metrics path (default is /metrics)
metrics_path: /metrics
# Static targets configuration
static_configs:
- targets:
# Replace with your docling-serve instance(s)
- 'localhost:5001'
- 'docling-serve-1.example.com:5001'
- 'docling-serve-2.example.com:5001'
# Optional labels to add to all metrics from this job
labels:
environment: 'production'
service: 'docling-serve'
# Optional: Add authentication if API key is required
# basic_auth:
# username: ''
# password: 'your-api-key'
# Optional: TLS configuration
# tls_config:
# ca_file: /path/to/ca.crt
# cert_file: /path/to/client.crt
# key_file: /path/to/client.key
# insecure_skip_verify: false
---
# VictoriaMetrics Scrape Configuration
# For use with VictoriaMetrics, configuration is compatible with Prometheus
# vmagent configuration example:
# vmagent \
# -promscrape.config=prometheus-scrape.yaml \
# -remoteWrite.url=http://victoriametrics:8428/api/v1/write
---
# Kubernetes Service Discovery Example
# For auto-discovery of docling-serve pods in Kubernetes
scrape_configs:
- job_name: 'docling-serve-k8s'
kubernetes_sd_configs:
- role: pod
namespaces:
names:
- default
- docling-serve
relabel_configs:
# Only scrape pods with label app=docling-serve
- source_labels: [__meta_kubernetes_pod_label_app]
action: keep
regex: docling-serve
# Use pod name as instance label
- source_labels: [__meta_kubernetes_pod_name]
target_label: pod
# Use namespace as label
- source_labels: [__meta_kubernetes_namespace]
target_label: namespace
# Set metrics path
- target_label: __metrics_path__
replacement: /metrics
# Set port to scrape
- source_labels: [__address__]
action: replace
regex: ([^:]+)(?::\d+)?
replacement: $1:5001
target_label: __address__
---
# Available Metrics from Docling Serve:
#
# FastAPI/HTTP Metrics (from OpenTelemetry):
# - http.server.request.duration - HTTP request duration histogram
# - http.server.active_requests - Active HTTP requests gauge
# - http.server.request.size - HTTP request size histogram
# - http.server.response.size - HTTP response size histogram
#
# RQ Metrics (when using RQ engine):
# - rq_workers - Number of RQ workers by state
# - rq_workers_success - Successful job count per worker
# - rq_workers_failed - Failed job count per worker
# - rq_workers_working_time - Total working time per worker
# - rq_jobs - Job counts by queue and status
# - rq_request_processing_seconds - Time spent collecting RQ metrics
#
# System Metrics (via Python OpenTelemetry):
# - process.runtime.cpython.cpu.utilization - CPU utilization
# - process.runtime.cpython.memory - Memory usage
# - process.runtime.cpython.gc.count - Garbage collection count

30
examples/prometheus.yaml Normal file
View File

@@ -0,0 +1,30 @@
# Prometheus configuration for scraping metrics
global:
scrape_interval: 15s
evaluation_interval: 15s
external_labels:
cluster: 'docling-cluster'
replica: '1'
scrape_configs:
# Scrape docling-serve metrics endpoint directly
- job_name: 'docling-serve'
static_configs:
- targets: ['docling-serve:5001']
labels:
service: 'docling-serve'
# Scrape OTEL collector's own metrics
- job_name: 'otel-collector'
static_configs:
- targets: ['otel-collector:8888']
labels:
service: 'otel-collector'
# Scrape metrics exported by OTEL collector
- job_name: 'otel-metrics'
static_configs:
- targets: ['otel-collector:8889']
labels:
service: 'docling-otel-metrics'

46
examples/tempo.yaml Normal file
View File

@@ -0,0 +1,46 @@
# Tempo configuration for trace storage
server:
http_listen_port: 3200
distributor:
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
http:
endpoint: 0.0.0.0:4318
ingester:
trace_idle_period: 10s
max_block_bytes: 1048576
max_block_duration: 5m
compactor:
compaction:
compaction_window: 1h
max_compaction_objects: 1000000
block_retention: 1h
compacted_block_retention: 10m
storage:
trace:
backend: local
local:
path: /tmp/tempo/blocks
wal:
path: /tmp/tempo/wal
pool:
max_workers: 100
queue_depth: 10000
metrics_generator:
registry:
external_labels:
source: tempo
storage:
path: /tmp/tempo/generator/wal
remote_write:
- url: http://prometheus:9090/api/v1/write
send_exemplars: true

View File

@@ -1,6 +1,6 @@
[project]
name = "docling-serve"
version = "1.8.0" # DO NOT EDIT, updated automatically
version = "1.12.0" # DO NOT EDIT, updated automatically
description = "Running Docling as a service"
license = {text = "MIT"}
authors = [
@@ -36,7 +36,7 @@ dependencies = [
"docling~=2.38",
"docling-core>=2.45.0",
"docling-jobkit[kfp,rq,vlm]>=1.8.0,<2.0.0",
"fastapi[standard]<0.119.0", # ~=0.115
"fastapi[standard]<0.129.0", # ~=0.115
"httpx~=0.28",
"pydantic~=2.10",
"pydantic-settings~=2.4",
@@ -46,11 +46,17 @@ dependencies = [
"websockets~=14.0",
"scalar-fastapi>=1.0.3",
"docling-mcp>=1.0.0",
"opentelemetry-api>=1.36.0,<2.0.0",
"opentelemetry-sdk>=1.36.0,<2.0.0",
"opentelemetry-exporter-otlp>=1.15.0",
"opentelemetry-instrumentation-fastapi>=0.57b0,<0.58",
"opentelemetry-exporter-prometheus>=0.57b0",
"prometheus-client>=0.21.0",
]
[project.optional-dependencies]
ui = [
"python-jsx==0.4.0",
"python-jsx>=0.4.0",
]
tesserocr = [
"tesserocr~=2.7"
@@ -69,6 +75,9 @@ flash-attn = [
[dependency-groups]
dev = [
"asgi-lifespan~=2.0",
"httpx",
"pydantic",
"pydantic-settings",
"mypy~=1.11",
"pre-commit-uv~=4.1",
"pypdf>=6.0.0",
@@ -87,6 +96,9 @@ pypi = [
cpu = [
"torch>=2.7.1",
"torchvision>=0.22.1",
# Linux x86_64
# torchvision/uv issue https://github.com/astral-sh/uv/issues/16386#issuecomment-3726000085
"torchvision!=0.24.1,!=0.24.0; sys_platform == 'linux' and platform_machine == 'x86_64'",
]
# cu124 = [
@@ -284,6 +296,8 @@ module = [
"mlx_vlm.*",
"mlx.*",
"scalar_fastapi.*",
"gradio.*",
"msgpack.*",
]
ignore_missing_imports = true

View File

@@ -1,3 +1,4 @@
import inspect
import re
from typing import Annotated, Any, Union, get_args, get_origin
@@ -152,7 +153,9 @@ def generate_model_doc(model: type[BaseModel]) -> str:
doc += f"| `{field_name}` | {field_type} | {description} |\n"
for field_type in _unroll_types(base_type):
if issubclass(field_type, BaseModel):
if inspect.isclass(field_type) and issubclass(
field_type, BaseModel
):
models_stack.append(field_type)
# stop iterating the base classes

6692
uv.lock generated

File diff suppressed because one or more lines are too long