mirror of
https://github.com/docling-project/docling-serve.git
synced 2026-03-21 16:40:06 +00:00
feat: OpenTelemetry support for traces and metrics (#456)
Signed-off-by: Pawel Rein <pawel.rein@prezi.com>
This commit is contained in:
@@ -38,3 +38,7 @@ Panos
|
||||
Vagenas
|
||||
Staar
|
||||
Livathinos
|
||||
(?i)redis
|
||||
(?i)prometheus
|
||||
(?i)grafana
|
||||
(?i)tempo
|
||||
|
||||
@@ -4,7 +4,7 @@ import platform
|
||||
import sys
|
||||
import warnings
|
||||
from pathlib import Path
|
||||
from typing import Annotated, Any, Optional, Union
|
||||
from typing import Annotated, Any, Union
|
||||
|
||||
import typer
|
||||
import uvicorn
|
||||
@@ -205,17 +205,17 @@ def dev(
|
||||
int, typer.Option(help="Timeout for the server response.")
|
||||
] = uvicorn_settings.timeout_keep_alive,
|
||||
ssl_certfile: Annotated[
|
||||
Optional[Path], typer.Option(help="SSL certificate file")
|
||||
Path | None, typer.Option(help="SSL certificate file")
|
||||
] = uvicorn_settings.ssl_certfile,
|
||||
ssl_keyfile: Annotated[
|
||||
Optional[Path], typer.Option(help="SSL key file")
|
||||
Path | None, typer.Option(help="SSL key file")
|
||||
] = uvicorn_settings.ssl_keyfile,
|
||||
ssl_keyfile_password: Annotated[
|
||||
Optional[str], typer.Option(help="SSL keyfile password")
|
||||
str | None, typer.Option(help="SSL keyfile password")
|
||||
] = uvicorn_settings.ssl_keyfile_password,
|
||||
# docling options
|
||||
artifacts_path: Annotated[
|
||||
Optional[Path],
|
||||
Path | None,
|
||||
typer.Option(
|
||||
help=(
|
||||
"If set to a valid directory, "
|
||||
@@ -312,17 +312,17 @@ def run(
|
||||
int, typer.Option(help="Timeout for the server response.")
|
||||
] = uvicorn_settings.timeout_keep_alive,
|
||||
ssl_certfile: Annotated[
|
||||
Optional[Path], typer.Option(help="SSL certificate file")
|
||||
Path | None, typer.Option(help="SSL certificate file")
|
||||
] = uvicorn_settings.ssl_certfile,
|
||||
ssl_keyfile: Annotated[
|
||||
Optional[Path], typer.Option(help="SSL key file")
|
||||
Path | None, typer.Option(help="SSL key file")
|
||||
] = uvicorn_settings.ssl_keyfile,
|
||||
ssl_keyfile_password: Annotated[
|
||||
Optional[str], typer.Option(help="SSL keyfile password")
|
||||
str | None, typer.Option(help="SSL keyfile password")
|
||||
] = uvicorn_settings.ssl_keyfile_password,
|
||||
# docling options
|
||||
artifacts_path: Annotated[
|
||||
Optional[Path],
|
||||
Path | None,
|
||||
typer.Option(
|
||||
help=(
|
||||
"If set to a valid directory, "
|
||||
@@ -367,9 +367,23 @@ def rq_worker() -> Any:
|
||||
"""
|
||||
Run the [bold]Docling JobKit[/bold] RQ worker.
|
||||
"""
|
||||
from docling_jobkit.convert.manager import DoclingConverterManagerConfig
|
||||
from docling_jobkit.orchestrators.rq.orchestrator import RQOrchestratorConfig
|
||||
from docling_jobkit.orchestrators.rq.worker import run_worker
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
from docling_jobkit.convert.manager import (
|
||||
DoclingConverterManagerConfig,
|
||||
)
|
||||
from docling_jobkit.orchestrators.rq.orchestrator import (
|
||||
RQOrchestrator,
|
||||
RQOrchestratorConfig,
|
||||
)
|
||||
|
||||
from docling_serve.rq_instrumentation import setup_rq_worker_instrumentation
|
||||
from docling_serve.rq_worker_instrumented import InstrumentedRQWorker
|
||||
|
||||
# Set up OpenTelemetry for the worker process
|
||||
if docling_serve_settings.otel_enable_traces:
|
||||
setup_rq_worker_instrumentation()
|
||||
|
||||
rq_config = RQOrchestratorConfig(
|
||||
redis_url=docling_serve_settings.eng_rq_redis_url,
|
||||
@@ -393,11 +407,20 @@ def rq_worker() -> Any:
|
||||
batch_polling_interval_seconds=docling_serve_settings.batch_polling_interval_seconds,
|
||||
)
|
||||
|
||||
run_worker(
|
||||
rq_config=rq_config,
|
||||
# Create worker with instrumentation
|
||||
scratch_dir = rq_config.scratch_dir or Path(tempfile.mkdtemp(prefix="docling_"))
|
||||
redis_conn, rq_queue = RQOrchestrator.make_rq_queue(rq_config)
|
||||
|
||||
worker = InstrumentedRQWorker(
|
||||
[rq_queue],
|
||||
connection=redis_conn,
|
||||
orchestrator_config=rq_config,
|
||||
cm_config=cm_config,
|
||||
scratch_dir=scratch_dir,
|
||||
)
|
||||
|
||||
worker.work()
|
||||
|
||||
|
||||
def main() -> None:
|
||||
app()
|
||||
|
||||
@@ -78,8 +78,12 @@ from docling_serve.datamodel.responses import (
|
||||
)
|
||||
from docling_serve.helper_functions import DOCLING_VERSIONS, FormDepends
|
||||
from docling_serve.orchestrator_factory import get_async_orchestrator
|
||||
from docling_serve.otel_instrumentation import (
|
||||
get_metrics_endpoint_content,
|
||||
setup_otel_instrumentation,
|
||||
)
|
||||
from docling_serve.response_preparation import prepare_response
|
||||
from docling_serve.settings import docling_serve_settings
|
||||
from docling_serve.settings import AsyncEngine, docling_serve_settings
|
||||
from docling_serve.storage import get_scratch
|
||||
from docling_serve.websocket_notifier import WebsocketNotifier
|
||||
|
||||
@@ -176,6 +180,22 @@ def create_app(): # noqa: C901
|
||||
version=version,
|
||||
)
|
||||
|
||||
# Setup OpenTelemetry instrumentation
|
||||
redis_url = (
|
||||
docling_serve_settings.eng_rq_redis_url
|
||||
if docling_serve_settings.eng_kind == AsyncEngine.RQ
|
||||
else None
|
||||
)
|
||||
setup_otel_instrumentation(
|
||||
app,
|
||||
service_name=docling_serve_settings.otel_service_name,
|
||||
enable_metrics=docling_serve_settings.otel_enable_metrics,
|
||||
enable_traces=docling_serve_settings.otel_enable_traces,
|
||||
enable_prometheus=docling_serve_settings.otel_enable_prometheus,
|
||||
enable_otlp_metrics=docling_serve_settings.otel_enable_otlp_metrics,
|
||||
redis_url=redis_url,
|
||||
)
|
||||
|
||||
origins = docling_serve_settings.cors_origins
|
||||
methods = docling_serve_settings.cors_methods
|
||||
headers = docling_serve_settings.cors_headers
|
||||
@@ -447,6 +467,16 @@ def create_app(): # noqa: C901
|
||||
)
|
||||
return DOCLING_VERSIONS
|
||||
|
||||
# Prometheus metrics endpoint
|
||||
@app.get("/metrics", tags=["health"], include_in_schema=False)
|
||||
def metrics():
|
||||
from fastapi.responses import PlainTextResponse
|
||||
|
||||
return PlainTextResponse(
|
||||
content=get_metrics_endpoint_content(),
|
||||
media_type="text/plain; version=0.0.4",
|
||||
)
|
||||
|
||||
# Convert a document from URL(s)
|
||||
@app.post(
|
||||
"/v1/convert/source",
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import json
|
||||
import logging
|
||||
from functools import lru_cache
|
||||
from typing import Any, Optional
|
||||
from typing import Any
|
||||
|
||||
import redis.asyncio as redis
|
||||
|
||||
@@ -23,7 +23,7 @@ class RedisTaskStatusMixin:
|
||||
_task_result_keys: dict[str, str]
|
||||
config: Any
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
def __init__(self, *args: Any, **kwargs: Any) -> None:
|
||||
super().__init__(*args, **kwargs)
|
||||
self.redis_prefix = "docling:tasks:"
|
||||
self._redis_pool = redis.ConnectionPool.from_url(
|
||||
@@ -91,7 +91,7 @@ class RedisTaskStatusMixin:
|
||||
_log.warning(f"Task {task_id} not found")
|
||||
raise
|
||||
|
||||
async def _get_task_from_redis(self, task_id: str) -> Optional[Task]:
|
||||
async def _get_task_from_redis(self, task_id: str) -> Task | None:
|
||||
try:
|
||||
async with redis.Redis(connection_pool=self._redis_pool) as r:
|
||||
task_data = await r.get(f"{self.redis_prefix}{task_id}:metadata")
|
||||
@@ -115,7 +115,7 @@ class RedisTaskStatusMixin:
|
||||
_log.error(f"Redis get task {task_id}: {e}")
|
||||
return None
|
||||
|
||||
async def _get_task_from_rq_direct(self, task_id: str) -> Optional[Task]:
|
||||
async def _get_task_from_rq_direct(self, task_id: str) -> Task | None:
|
||||
try:
|
||||
_log.debug(f"Checking RQ for task {task_id}")
|
||||
|
||||
@@ -306,8 +306,92 @@ def get_async_orchestrator() -> BaseOrchestrator:
|
||||
RQOrchestratorConfig,
|
||||
)
|
||||
|
||||
from docling_serve.rq_instrumentation import wrap_rq_queue_for_tracing
|
||||
|
||||
class RedisAwareRQOrchestrator(RedisTaskStatusMixin, RQOrchestrator): # type: ignore[misc]
|
||||
pass
|
||||
def __init__(self, *args: Any, **kwargs: Any) -> None:
|
||||
super().__init__(*args, **kwargs)
|
||||
# Wrap RQ queue to inject trace context into jobs
|
||||
if docling_serve_settings.otel_enable_traces:
|
||||
wrap_rq_queue_for_tracing(self._rq_queue)
|
||||
|
||||
async def enqueue(self, **kwargs: Any) -> Task: # type: ignore[override]
|
||||
"""Override enqueue to use instrumented job function when tracing is enabled."""
|
||||
import base64
|
||||
import uuid
|
||||
import warnings
|
||||
|
||||
from docling.datamodel.base_models import DocumentStream
|
||||
from docling_jobkit.datamodel.chunking import ChunkingExportOptions
|
||||
from docling_jobkit.datamodel.http_inputs import FileSource, HttpSource
|
||||
from docling_jobkit.datamodel.task import Task, TaskSource, TaskTarget
|
||||
from docling_jobkit.datamodel.task_meta import TaskType
|
||||
|
||||
# Extract parameters
|
||||
sources: list[TaskSource] = kwargs.get("sources", [])
|
||||
target: TaskTarget = kwargs["target"]
|
||||
task_type: TaskType = kwargs.get("task_type", TaskType.CONVERT)
|
||||
options = kwargs.get("options")
|
||||
convert_options = kwargs.get("convert_options")
|
||||
chunking_options = kwargs.get("chunking_options")
|
||||
chunking_export_options = kwargs.get("chunking_export_options")
|
||||
|
||||
if options is not None and convert_options is None:
|
||||
convert_options = options
|
||||
warnings.warn(
|
||||
"'options' is deprecated and will be removed in a future version. "
|
||||
"Use 'conversion_options' instead.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
|
||||
task_id = str(uuid.uuid4())
|
||||
rq_sources: list[HttpSource | FileSource] = []
|
||||
for source in sources:
|
||||
if isinstance(source, DocumentStream):
|
||||
encoded_doc = base64.b64encode(source.stream.read()).decode()
|
||||
rq_sources.append(
|
||||
FileSource(filename=source.name, base64_string=encoded_doc)
|
||||
)
|
||||
elif isinstance(source, (HttpSource | FileSource)):
|
||||
rq_sources.append(source)
|
||||
|
||||
chunking_export_options = (
|
||||
chunking_export_options or ChunkingExportOptions()
|
||||
)
|
||||
|
||||
task = Task(
|
||||
task_id=task_id,
|
||||
task_type=task_type,
|
||||
sources=rq_sources,
|
||||
convert_options=convert_options,
|
||||
chunking_options=chunking_options,
|
||||
chunking_export_options=chunking_export_options,
|
||||
target=target,
|
||||
)
|
||||
|
||||
self.tasks.update({task.task_id: task})
|
||||
task_data = task.model_dump(mode="json", serialize_as_any=True)
|
||||
|
||||
# Use instrumented job function if tracing is enabled
|
||||
if docling_serve_settings.otel_enable_traces:
|
||||
job_func = "docling_serve.rq_job_wrapper.instrumented_docling_task"
|
||||
else:
|
||||
job_func = "docling_jobkit.orchestrators.rq.worker.docling_task"
|
||||
|
||||
self._rq_queue.enqueue(
|
||||
job_func,
|
||||
kwargs={"task_data": task_data},
|
||||
job_id=task_id,
|
||||
timeout=14400,
|
||||
)
|
||||
|
||||
await self.init_task_tracking(task)
|
||||
|
||||
# Store in Redis
|
||||
await self._store_task_in_redis(task)
|
||||
|
||||
return task
|
||||
|
||||
rq_config = RQOrchestratorConfig(
|
||||
redis_url=docling_serve_settings.eng_rq_redis_url,
|
||||
|
||||
138
docling_serve/otel_instrumentation.py
Normal file
138
docling_serve/otel_instrumentation.py
Normal file
@@ -0,0 +1,138 @@
|
||||
"""OpenTelemetry instrumentation for docling-serve with metrics and traces."""
|
||||
|
||||
import logging
|
||||
|
||||
from opentelemetry import metrics, trace
|
||||
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
|
||||
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
|
||||
from opentelemetry.exporter.prometheus import PrometheusMetricReader
|
||||
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
|
||||
from opentelemetry.sdk.metrics import MeterProvider
|
||||
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
|
||||
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
|
||||
from opentelemetry.sdk.trace import TracerProvider
|
||||
from opentelemetry.sdk.trace.export import BatchSpanProcessor
|
||||
from opentelemetry.sdk.trace.sampling import Decision, Sampler, SamplingResult
|
||||
from opentelemetry.trace import SpanKind
|
||||
from opentelemetry.util.types import Attributes
|
||||
from prometheus_client import REGISTRY
|
||||
from redis import Redis
|
||||
|
||||
from docling_serve.rq_metrics_collector import RQCollector
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
FILTERED_PATHS = {"/metrics", "/health", "/healthz", "/readyz", "/livez"}
|
||||
|
||||
|
||||
class HealthMetricsFilterSampler(Sampler):
|
||||
"""
|
||||
Sampler that filters out traces for health and metrics endpoints.
|
||||
|
||||
Drops spans for /metrics, /health, /healthz, /readyz, /livez regardless
|
||||
of query parameters. All other endpoints are sampled normally (always on).
|
||||
"""
|
||||
|
||||
def should_sample(
|
||||
self,
|
||||
parent_context,
|
||||
trace_id: int,
|
||||
name: str,
|
||||
kind: SpanKind | None = None,
|
||||
attributes: Attributes | None = None,
|
||||
links=None,
|
||||
trace_state=None,
|
||||
) -> SamplingResult:
|
||||
if attributes:
|
||||
http_target = attributes.get("http.target") or attributes.get("url.path")
|
||||
if http_target:
|
||||
path = str(http_target).split("?")[0]
|
||||
if path in FILTERED_PATHS:
|
||||
return SamplingResult(Decision.DROP)
|
||||
|
||||
return SamplingResult(Decision.RECORD_AND_SAMPLE)
|
||||
|
||||
def get_description(self) -> str:
|
||||
"""Return description of the sampler."""
|
||||
return (
|
||||
"HealthMetricsFilterSampler: drops traces for health and metrics endpoints"
|
||||
)
|
||||
|
||||
|
||||
def setup_otel_instrumentation(
|
||||
app,
|
||||
service_name: str = "docling-serve",
|
||||
enable_metrics: bool = True,
|
||||
enable_traces: bool = True,
|
||||
enable_prometheus: bool = True,
|
||||
enable_otlp_metrics: bool = False,
|
||||
redis_url: str | None = None,
|
||||
):
|
||||
"""
|
||||
Set up OpenTelemetry instrumentation for FastAPI app.
|
||||
|
||||
Args:
|
||||
app: FastAPI application instance
|
||||
service_name: Service name for OTEL resource
|
||||
enable_metrics: Enable OTEL metrics
|
||||
enable_traces: Enable OTEL traces
|
||||
enable_prometheus: Enable Prometheus metrics export
|
||||
enable_otlp_metrics: Enable OTLP metrics export (for OTEL collector)
|
||||
redis_url: Redis URL for RQ metrics (if using RQ engine)
|
||||
"""
|
||||
resource = Resource(attributes={SERVICE_NAME: service_name})
|
||||
|
||||
# Setup traces
|
||||
if enable_traces:
|
||||
logger.info("Setting up OpenTelemetry traces with health/metrics filtering")
|
||||
sampler = HealthMetricsFilterSampler()
|
||||
trace_provider = TracerProvider(resource=resource, sampler=sampler)
|
||||
trace_provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter()))
|
||||
trace.set_tracer_provider(trace_provider)
|
||||
|
||||
# Setup metrics
|
||||
if enable_metrics:
|
||||
logger.info("Setting up OpenTelemetry metrics")
|
||||
metric_readers: list = []
|
||||
|
||||
# Prometheus metrics reader (for scraping endpoint)
|
||||
if enable_prometheus:
|
||||
logger.info("Enabling Prometheus metrics export")
|
||||
prometheus_reader = PrometheusMetricReader()
|
||||
metric_readers.append(prometheus_reader)
|
||||
|
||||
# OTLP metrics exporter (for OTEL collector)
|
||||
if enable_otlp_metrics:
|
||||
logger.info("Enabling OTLP metrics export")
|
||||
otlp_reader = PeriodicExportingMetricReader(OTLPMetricExporter())
|
||||
metric_readers.append(otlp_reader)
|
||||
|
||||
if metric_readers:
|
||||
meter_provider = MeterProvider(
|
||||
resource=resource,
|
||||
metric_readers=metric_readers,
|
||||
)
|
||||
metrics.set_meter_provider(meter_provider)
|
||||
|
||||
# Instrument FastAPI
|
||||
logger.info("Instrumenting FastAPI with OpenTelemetry")
|
||||
FastAPIInstrumentor.instrument_app(app)
|
||||
|
||||
# Register RQ metrics if Redis URL is provided
|
||||
if redis_url and enable_prometheus:
|
||||
logger.info(f"Registering RQ metrics collector for Redis at {redis_url}")
|
||||
connection = Redis.from_url(redis_url)
|
||||
REGISTRY.register(RQCollector(connection))
|
||||
|
||||
|
||||
def get_metrics_endpoint_content():
|
||||
"""
|
||||
Get Prometheus metrics content for /metrics endpoint.
|
||||
|
||||
Returns:
|
||||
Prometheus-formatted metrics content
|
||||
"""
|
||||
from prometheus_client import REGISTRY, generate_latest
|
||||
|
||||
return generate_latest(REGISTRY)
|
||||
197
docling_serve/rq_instrumentation.py
Normal file
197
docling_serve/rq_instrumentation.py
Normal file
@@ -0,0 +1,197 @@
|
||||
"""RQ instrumentation for distributed tracing across API and workers."""
|
||||
|
||||
import functools
|
||||
import logging
|
||||
from collections.abc import Callable
|
||||
from typing import Any
|
||||
|
||||
from opentelemetry import trace
|
||||
from opentelemetry.context import Context
|
||||
from opentelemetry.propagate import extract, inject
|
||||
from opentelemetry.trace import SpanKind, Status, StatusCode
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Global tracer instance
|
||||
_tracer = trace.get_tracer(__name__)
|
||||
|
||||
|
||||
def get_rq_tracer() -> trace.Tracer:
|
||||
"""Get the RQ tracer instance."""
|
||||
return _tracer
|
||||
|
||||
|
||||
def inject_trace_context(job_kwargs: dict[str, Any]) -> dict[str, Any]:
|
||||
"""
|
||||
Inject current trace context into RQ job kwargs.
|
||||
|
||||
This is called when enqueueing a job to propagate the trace context
|
||||
from the API request to the RQ worker.
|
||||
|
||||
Args:
|
||||
job_kwargs: Job keyword arguments
|
||||
|
||||
Returns:
|
||||
Modified job kwargs with trace context in metadata
|
||||
"""
|
||||
# Create carrier dict to hold trace context
|
||||
carrier: dict[str, str] = {}
|
||||
|
||||
# Inject current context into carrier
|
||||
inject(carrier)
|
||||
|
||||
# Store carrier in job metadata
|
||||
if carrier:
|
||||
job_kwargs.setdefault("meta", {})
|
||||
job_kwargs["meta"]["otel_context"] = carrier
|
||||
logger.debug(f"Injected trace context into job: {carrier}")
|
||||
|
||||
return job_kwargs
|
||||
|
||||
|
||||
def extract_trace_context(job) -> Context | None:
|
||||
"""
|
||||
Extract trace context from RQ job metadata.
|
||||
|
||||
This is called in the worker to continue the trace that was started
|
||||
in the API.
|
||||
|
||||
Args:
|
||||
job: RQ Job instance
|
||||
|
||||
Returns:
|
||||
OpenTelemetry context extracted from job metadata, or None if not found
|
||||
"""
|
||||
carrier = {}
|
||||
|
||||
# Extract carrier from job metadata
|
||||
if hasattr(job, "meta") and job.meta:
|
||||
carrier = job.meta.get("otel_context", {})
|
||||
logger.debug(f"Extracted trace context from job: {carrier}")
|
||||
|
||||
# Extract context from carrier
|
||||
return extract(carrier) if carrier else None
|
||||
|
||||
|
||||
def instrument_rq_job(func: Callable) -> Callable:
|
||||
"""
|
||||
Decorator to instrument RQ job functions with tracing.
|
||||
|
||||
This creates a span for the job execution and links it to the parent
|
||||
trace from the API request.
|
||||
|
||||
Args:
|
||||
func: Job function to instrument
|
||||
|
||||
Returns:
|
||||
Instrumented function
|
||||
"""
|
||||
|
||||
@functools.wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
# Get job instance (RQ passes it as first arg in some contexts)
|
||||
job = kwargs.get("job")
|
||||
|
||||
# Extract trace context from job
|
||||
parent_context = extract_trace_context(job) if job else None
|
||||
|
||||
# Create span with parent context
|
||||
tracer = get_rq_tracer()
|
||||
span_name = f"rq.job.{func.__name__}"
|
||||
|
||||
with tracer.start_as_current_span(
|
||||
span_name,
|
||||
context=parent_context,
|
||||
kind=SpanKind.CONSUMER,
|
||||
) as span:
|
||||
try:
|
||||
# Add job metadata as span attributes
|
||||
if job:
|
||||
span.set_attribute("rq.job.id", job.id)
|
||||
span.set_attribute("rq.job.func_name", job.func_name)
|
||||
span.set_attribute("rq.queue.name", job.origin)
|
||||
if hasattr(job, "description"):
|
||||
span.set_attribute("rq.job.description", job.description)
|
||||
|
||||
# Execute the actual job function
|
||||
result = func(*args, **kwargs)
|
||||
|
||||
# Mark span as successful
|
||||
span.set_status(Status(StatusCode.OK))
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
# Record exception and mark span as failed
|
||||
span.record_exception(e)
|
||||
span.set_status(Status(StatusCode.ERROR, str(e)))
|
||||
raise
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
def setup_rq_worker_instrumentation():
|
||||
"""
|
||||
Set up OpenTelemetry instrumentation for RQ workers.
|
||||
|
||||
This should be called when starting an RQ worker to ensure traces
|
||||
are properly initialized in the worker process.
|
||||
"""
|
||||
from opentelemetry import trace
|
||||
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
|
||||
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
|
||||
from opentelemetry.sdk.trace import TracerProvider
|
||||
from opentelemetry.sdk.trace.export import BatchSpanProcessor
|
||||
|
||||
# Check if tracer provider is already set
|
||||
if not isinstance(trace.get_tracer_provider(), TracerProvider):
|
||||
logger.info("Setting up OpenTelemetry for RQ worker")
|
||||
|
||||
# Create resource with service name
|
||||
resource = Resource(attributes={SERVICE_NAME: "docling-serve-worker"})
|
||||
|
||||
# Set up trace provider
|
||||
trace_provider = TracerProvider(resource=resource)
|
||||
trace_provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter()))
|
||||
trace.set_tracer_provider(trace_provider)
|
||||
|
||||
# Update global tracer
|
||||
global _tracer
|
||||
_tracer = trace.get_tracer(__name__)
|
||||
|
||||
logger.info("OpenTelemetry setup complete for RQ worker")
|
||||
else:
|
||||
logger.debug("OpenTelemetry already configured for RQ worker")
|
||||
|
||||
|
||||
def wrap_rq_queue_for_tracing(rq_queue: Any) -> None:
|
||||
"""
|
||||
Wrap RQ queue's enqueue method to inject trace context into jobs.
|
||||
|
||||
This monkey-patches the queue's enqueue method to automatically inject
|
||||
the current trace context into job metadata.
|
||||
|
||||
Args:
|
||||
rq_queue: RQ Queue instance to wrap
|
||||
"""
|
||||
original_enqueue = rq_queue.enqueue
|
||||
|
||||
def traced_enqueue(*args: Any, **kwargs: Any) -> Any:
|
||||
"""Wrapped enqueue that injects trace context."""
|
||||
# Get or create meta dict for the job
|
||||
meta = kwargs.get("meta", {})
|
||||
|
||||
# Inject trace context into meta
|
||||
carrier: dict[str, str] = {}
|
||||
inject(carrier)
|
||||
|
||||
if carrier:
|
||||
meta["otel_context"] = carrier
|
||||
kwargs["meta"] = meta
|
||||
logger.debug(f"Injected trace context into RQ job: {carrier}")
|
||||
|
||||
# Call original enqueue
|
||||
return original_enqueue(*args, **kwargs)
|
||||
|
||||
# Replace enqueue method
|
||||
rq_queue.enqueue = traced_enqueue
|
||||
logger.info("RQ queue wrapped for distributed tracing")
|
||||
215
docling_serve/rq_job_wrapper.py
Normal file
215
docling_serve/rq_job_wrapper.py
Normal file
@@ -0,0 +1,215 @@
|
||||
"""Instrumented wrapper for RQ job functions with OpenTelemetry tracing."""
|
||||
|
||||
import logging
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
from typing import Any, Union
|
||||
|
||||
import msgpack
|
||||
from opentelemetry import trace
|
||||
from opentelemetry.trace import SpanKind, Status, StatusCode
|
||||
from rq import get_current_job
|
||||
|
||||
from docling.datamodel.base_models import DocumentStream
|
||||
from docling_jobkit.convert.chunking import process_chunk_results
|
||||
from docling_jobkit.convert.manager import DoclingConverterManager
|
||||
from docling_jobkit.convert.results import process_export_results
|
||||
from docling_jobkit.datamodel.http_inputs import FileSource, HttpSource
|
||||
from docling_jobkit.datamodel.task import Task
|
||||
from docling_jobkit.datamodel.task_meta import TaskStatus, TaskType
|
||||
from docling_jobkit.orchestrators.rq.orchestrator import (
|
||||
RQOrchestratorConfig,
|
||||
_TaskUpdate,
|
||||
)
|
||||
from docling_jobkit.orchestrators.rq.worker import make_msgpack_safe
|
||||
|
||||
from docling_serve.rq_instrumentation import extract_trace_context
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def instrumented_docling_task( # noqa: C901
|
||||
task_data: dict,
|
||||
conversion_manager: DoclingConverterManager,
|
||||
orchestrator_config: RQOrchestratorConfig,
|
||||
scratch_dir: Path,
|
||||
):
|
||||
"""
|
||||
Instrumented wrapper for docling_task that extracts and activates trace context.
|
||||
|
||||
This function extracts the OpenTelemetry trace context from the RQ job metadata
|
||||
and activates it before calling the actual task function, enabling end-to-end
|
||||
distributed tracing from API to worker. It also adds detailed sub-spans for each
|
||||
processing stage to identify performance bottlenecks.
|
||||
"""
|
||||
job = get_current_job()
|
||||
assert job is not None
|
||||
|
||||
conn = job.connection
|
||||
task = Task.model_validate(task_data)
|
||||
task_id = task.task_id
|
||||
|
||||
# Extract parent trace context from job metadata
|
||||
parent_context = extract_trace_context(job) if job else None
|
||||
|
||||
# Get tracer
|
||||
tracer = trace.get_tracer(__name__)
|
||||
|
||||
# Create main job span with parent context (this creates the link to the API trace)
|
||||
with tracer.start_as_current_span(
|
||||
"rq.job.docling_task",
|
||||
context=parent_context,
|
||||
kind=SpanKind.CONSUMER,
|
||||
) as span:
|
||||
try:
|
||||
# Add job attributes
|
||||
span.set_attribute("rq.job.id", job.id)
|
||||
if job.func_name:
|
||||
span.set_attribute("rq.job.func_name", job.func_name)
|
||||
span.set_attribute("rq.queue.name", job.origin)
|
||||
|
||||
# Add task attributes
|
||||
span.set_attribute("docling.task.id", task_id)
|
||||
span.set_attribute("docling.task.type", str(task.task_type.value))
|
||||
span.set_attribute("docling.task.num_sources", len(task.sources))
|
||||
|
||||
logger.info(
|
||||
f"Executing docling_task {task_id} with "
|
||||
f"trace_id={span.get_span_context().trace_id:032x} "
|
||||
f"span_id={span.get_span_context().span_id:016x}"
|
||||
)
|
||||
|
||||
# Notify task started
|
||||
with tracer.start_as_current_span("notify.task_started"):
|
||||
conn.publish(
|
||||
orchestrator_config.sub_channel,
|
||||
_TaskUpdate(
|
||||
task_id=task_id,
|
||||
task_status=TaskStatus.STARTED,
|
||||
).model_dump_json(),
|
||||
)
|
||||
|
||||
workdir = scratch_dir / task_id
|
||||
|
||||
# Prepare sources with detailed tracing
|
||||
with tracer.start_as_current_span("prepare_sources") as prep_span:
|
||||
convert_sources: list[Union[str, DocumentStream]] = []
|
||||
headers: dict[str, Any] | None = None
|
||||
|
||||
for idx, source in enumerate(task.sources):
|
||||
if isinstance(source, DocumentStream):
|
||||
convert_sources.append(source)
|
||||
prep_span.add_event(
|
||||
f"source_{idx}_prepared",
|
||||
{"type": "DocumentStream", "name": source.name},
|
||||
)
|
||||
elif isinstance(source, FileSource):
|
||||
convert_sources.append(source.to_document_stream())
|
||||
prep_span.add_event(
|
||||
f"source_{idx}_prepared",
|
||||
{"type": "FileSource", "filename": source.filename},
|
||||
)
|
||||
elif isinstance(source, HttpSource):
|
||||
convert_sources.append(str(source.url))
|
||||
if headers is None and source.headers:
|
||||
headers = source.headers
|
||||
prep_span.add_event(
|
||||
f"source_{idx}_prepared",
|
||||
{"type": "HttpSource", "url": str(source.url)},
|
||||
)
|
||||
|
||||
prep_span.set_attribute("num_sources", len(convert_sources))
|
||||
|
||||
if not conversion_manager:
|
||||
raise RuntimeError("No converter")
|
||||
if not task.convert_options:
|
||||
raise RuntimeError("No conversion options")
|
||||
|
||||
# Document conversion with detailed tracing
|
||||
with tracer.start_as_current_span("convert_documents") as conv_span:
|
||||
conv_span.set_attribute("num_sources", len(convert_sources))
|
||||
conv_span.set_attribute("has_headers", headers is not None)
|
||||
|
||||
conv_results = conversion_manager.convert_documents(
|
||||
sources=convert_sources,
|
||||
options=task.convert_options,
|
||||
headers=headers,
|
||||
)
|
||||
|
||||
# Result processing with detailed tracing
|
||||
with tracer.start_as_current_span("process_results") as proc_span:
|
||||
proc_span.set_attribute("task_type", str(task.task_type.value))
|
||||
|
||||
if task.task_type == TaskType.CONVERT:
|
||||
with tracer.start_as_current_span("process_export_results"):
|
||||
processed_results = process_export_results(
|
||||
task=task,
|
||||
conv_results=conv_results,
|
||||
work_dir=workdir,
|
||||
)
|
||||
elif task.task_type == TaskType.CHUNK:
|
||||
with tracer.start_as_current_span("process_chunk_results"):
|
||||
processed_results = process_chunk_results(
|
||||
task=task,
|
||||
conv_results=conv_results,
|
||||
work_dir=workdir,
|
||||
)
|
||||
|
||||
# Serialize and store results
|
||||
with tracer.start_as_current_span("serialize_and_store") as store_span:
|
||||
safe_data = make_msgpack_safe(processed_results.model_dump())
|
||||
packed = msgpack.packb(safe_data, use_bin_type=True)
|
||||
store_span.set_attribute("result_size_bytes", len(packed))
|
||||
|
||||
result_key = f"{orchestrator_config.results_prefix}:{task_id}"
|
||||
conn.setex(result_key, orchestrator_config.results_ttl, packed)
|
||||
store_span.set_attribute("result_key", result_key)
|
||||
|
||||
# Notify task success
|
||||
with tracer.start_as_current_span("notify.task_success"):
|
||||
conn.publish(
|
||||
orchestrator_config.sub_channel,
|
||||
_TaskUpdate(
|
||||
task_id=task_id,
|
||||
task_status=TaskStatus.SUCCESS,
|
||||
result_key=result_key,
|
||||
).model_dump_json(),
|
||||
)
|
||||
|
||||
# Clean up
|
||||
with tracer.start_as_current_span("cleanup"):
|
||||
if workdir.exists():
|
||||
shutil.rmtree(workdir)
|
||||
|
||||
# Mark span as successful
|
||||
span.set_status(Status(StatusCode.OK))
|
||||
logger.info(f"Docling task {task_id} completed successfully")
|
||||
|
||||
return result_key
|
||||
|
||||
except Exception as e:
|
||||
# Notify task failure
|
||||
try:
|
||||
conn.publish(
|
||||
orchestrator_config.sub_channel,
|
||||
_TaskUpdate(
|
||||
task_id=task_id,
|
||||
task_status=TaskStatus.FAILURE,
|
||||
).model_dump_json(),
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Clean up on error
|
||||
workdir = scratch_dir / task_id
|
||||
if workdir.exists():
|
||||
try:
|
||||
shutil.rmtree(workdir)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Record exception and mark span as failed
|
||||
logger.error(f"Docling task {task_id} failed: {e}", exc_info=True)
|
||||
span.record_exception(e)
|
||||
span.set_status(Status(StatusCode.ERROR, str(e)))
|
||||
raise
|
||||
132
docling_serve/rq_metrics_collector.py
Normal file
132
docling_serve/rq_metrics_collector.py
Normal file
@@ -0,0 +1,132 @@
|
||||
# Heavily based on https://github.com/mdawar/rq-exporter
|
||||
import logging
|
||||
|
||||
from prometheus_client import Summary
|
||||
from prometheus_client.core import CounterMetricFamily, GaugeMetricFamily
|
||||
from prometheus_client.registry import Collector
|
||||
from redis import Redis
|
||||
from rq import Queue, Worker
|
||||
from rq.job import JobStatus
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def get_redis_connection(url: str):
|
||||
return Redis.from_url(url)
|
||||
|
||||
|
||||
def get_workers_stats(connection):
|
||||
"""Get the RQ workers stats."""
|
||||
|
||||
workers = Worker.all(connection)
|
||||
|
||||
return [
|
||||
{
|
||||
"name": w.name,
|
||||
"queues": w.queue_names(),
|
||||
"state": w.get_state(),
|
||||
"successful_job_count": w.successful_job_count,
|
||||
"failed_job_count": w.failed_job_count,
|
||||
"total_working_time": w.total_working_time,
|
||||
}
|
||||
for w in workers
|
||||
]
|
||||
|
||||
|
||||
def get_queue_jobs(connection, queue_name):
|
||||
"""Get the jobs by status of a Queue."""
|
||||
|
||||
queue = Queue(connection=connection, name=queue_name)
|
||||
|
||||
return {
|
||||
JobStatus.QUEUED: queue.count,
|
||||
JobStatus.STARTED: queue.started_job_registry.count,
|
||||
JobStatus.FINISHED: queue.finished_job_registry.count,
|
||||
JobStatus.FAILED: queue.failed_job_registry.count,
|
||||
JobStatus.DEFERRED: queue.deferred_job_registry.count,
|
||||
JobStatus.SCHEDULED: queue.scheduled_job_registry.count,
|
||||
}
|
||||
|
||||
|
||||
def get_jobs_by_queue(connection):
|
||||
"""Get the current jobs by queue"""
|
||||
|
||||
queues = Queue.all(connection)
|
||||
|
||||
return {q.name: get_queue_jobs(connection, q.name) for q in queues}
|
||||
|
||||
|
||||
class RQCollector(Collector):
|
||||
"""RQ stats collector."""
|
||||
|
||||
def __init__(self, connection=None):
|
||||
self.connection = connection
|
||||
|
||||
# RQ data collection count and time in seconds
|
||||
self.summary = Summary(
|
||||
"rq_request_processing_seconds", "Time spent collecting RQ data"
|
||||
)
|
||||
|
||||
def collect(self):
|
||||
"""Collect RQ Metrics."""
|
||||
logger.debug("Collecting the RQ metrics...")
|
||||
|
||||
with self.summary.time():
|
||||
rq_workers = GaugeMetricFamily(
|
||||
"rq_workers",
|
||||
"RQ workers",
|
||||
labels=["name", "state", "queues"],
|
||||
)
|
||||
rq_workers_success = CounterMetricFamily(
|
||||
"rq_workers_success",
|
||||
"RQ workers success count",
|
||||
labels=["name", "queues"],
|
||||
)
|
||||
rq_workers_failed = CounterMetricFamily(
|
||||
"rq_workers_failed",
|
||||
"RQ workers fail count",
|
||||
labels=["name", "queues"],
|
||||
)
|
||||
rq_workers_working_time = CounterMetricFamily(
|
||||
"rq_workers_working_time",
|
||||
"RQ workers spent seconds",
|
||||
labels=["name", "queues"],
|
||||
)
|
||||
rq_jobs = GaugeMetricFamily(
|
||||
"rq_jobs",
|
||||
"RQ jobs by state",
|
||||
labels=["queue", "status"],
|
||||
)
|
||||
|
||||
workers = get_workers_stats(self.connection)
|
||||
for worker in workers:
|
||||
label_queues = ",".join(worker["queues"])
|
||||
rq_workers.add_metric(
|
||||
[worker["name"], worker["state"], label_queues],
|
||||
1,
|
||||
)
|
||||
rq_workers_success.add_metric(
|
||||
[worker["name"], label_queues],
|
||||
worker["successful_job_count"],
|
||||
)
|
||||
rq_workers_failed.add_metric(
|
||||
[worker["name"], label_queues],
|
||||
worker["failed_job_count"],
|
||||
)
|
||||
rq_workers_working_time.add_metric(
|
||||
[worker["name"], label_queues],
|
||||
worker["total_working_time"],
|
||||
)
|
||||
|
||||
yield rq_workers
|
||||
yield rq_workers_success
|
||||
yield rq_workers_failed
|
||||
yield rq_workers_working_time
|
||||
|
||||
for queue_name, jobs in get_jobs_by_queue(self.connection).items():
|
||||
for status, count in jobs.items():
|
||||
rq_jobs.add_metric([queue_name, status], count)
|
||||
|
||||
yield rq_jobs
|
||||
|
||||
logger.debug("RQ metrics collection finished")
|
||||
106
docling_serve/rq_worker_instrumented.py
Normal file
106
docling_serve/rq_worker_instrumented.py
Normal file
@@ -0,0 +1,106 @@
|
||||
"""Instrumented RQ worker with OpenTelemetry tracing support."""
|
||||
|
||||
import logging
|
||||
from pathlib import Path
|
||||
|
||||
from opentelemetry import trace
|
||||
from opentelemetry.trace import SpanKind, Status, StatusCode
|
||||
|
||||
from docling_jobkit.convert.manager import (
|
||||
DoclingConverterManagerConfig,
|
||||
)
|
||||
from docling_jobkit.orchestrators.rq.orchestrator import RQOrchestratorConfig
|
||||
from docling_jobkit.orchestrators.rq.worker import CustomRQWorker
|
||||
|
||||
from docling_serve.rq_instrumentation import extract_trace_context
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class InstrumentedRQWorker(CustomRQWorker):
|
||||
"""RQ Worker with OpenTelemetry tracing instrumentation."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*args,
|
||||
orchestrator_config: RQOrchestratorConfig,
|
||||
cm_config: DoclingConverterManagerConfig,
|
||||
scratch_dir: Path,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(
|
||||
*args,
|
||||
orchestrator_config=orchestrator_config,
|
||||
cm_config=cm_config,
|
||||
scratch_dir=scratch_dir,
|
||||
**kwargs,
|
||||
)
|
||||
self.tracer = trace.get_tracer(__name__)
|
||||
|
||||
def perform_job(self, job, queue):
|
||||
"""
|
||||
Perform job with distributed tracing support.
|
||||
|
||||
This extracts the trace context from the job metadata and creates
|
||||
a span that continues the trace from the API request.
|
||||
"""
|
||||
# Extract parent trace context from job metadata
|
||||
parent_context = extract_trace_context(job)
|
||||
|
||||
# Create span name from job function
|
||||
func_name = job.func_name if hasattr(job, "func_name") else "unknown"
|
||||
span_name = f"rq.job.{func_name}"
|
||||
|
||||
# Start span with parent context
|
||||
with self.tracer.start_as_current_span(
|
||||
span_name,
|
||||
context=parent_context,
|
||||
kind=SpanKind.CONSUMER,
|
||||
) as span:
|
||||
try:
|
||||
# Add job attributes to span
|
||||
span.set_attribute("rq.job.id", job.id)
|
||||
span.set_attribute("rq.job.func_name", func_name)
|
||||
span.set_attribute("rq.queue.name", queue.name)
|
||||
|
||||
if hasattr(job, "description") and job.description:
|
||||
span.set_attribute("rq.job.description", job.description)
|
||||
|
||||
if hasattr(job, "timeout") and job.timeout:
|
||||
span.set_attribute("rq.job.timeout", job.timeout)
|
||||
|
||||
# Add job kwargs info
|
||||
if hasattr(job, "kwargs") and job.kwargs:
|
||||
# Add conversion manager before executing
|
||||
job.kwargs["conversion_manager"] = self.conversion_manager
|
||||
job.kwargs["orchestrator_config"] = self.orchestrator_config
|
||||
job.kwargs["scratch_dir"] = self.scratch_dir
|
||||
|
||||
# Log task info if available
|
||||
task_type = job.kwargs.get("task_type")
|
||||
if task_type:
|
||||
span.set_attribute("docling.task.type", str(task_type))
|
||||
|
||||
sources = job.kwargs.get("sources", [])
|
||||
if sources:
|
||||
span.set_attribute("docling.task.num_sources", len(sources))
|
||||
|
||||
logger.info(
|
||||
f"Executing job {job.id} with trace_id={span.get_span_context().trace_id:032x}"
|
||||
)
|
||||
|
||||
# Execute the actual job
|
||||
result = super(CustomRQWorker, self).perform_job(job, queue)
|
||||
|
||||
# Mark span as successful
|
||||
span.set_status(Status(StatusCode.OK))
|
||||
logger.debug(f"Job {job.id} completed successfully")
|
||||
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
# Record exception and mark span as failed
|
||||
logger.error(f"Job {job.id} failed: {e}", exc_info=True)
|
||||
span.record_exception(e)
|
||||
span.set_status(Status(StatusCode.ERROR, str(e)))
|
||||
raise
|
||||
@@ -91,6 +91,13 @@ class DoclingServeSettings(BaseSettings):
|
||||
|
||||
eng_kfp_experimental: bool = False
|
||||
|
||||
# OpenTelemetry settings
|
||||
otel_enable_metrics: bool = True
|
||||
otel_enable_traces: bool = True
|
||||
otel_enable_prometheus: bool = True
|
||||
otel_enable_otlp_metrics: bool = False
|
||||
otel_service_name: str = "docling-serve"
|
||||
|
||||
@model_validator(mode="after")
|
||||
def engine_settings(self) -> Self:
|
||||
# Validate KFP engine settings
|
||||
|
||||
144
examples/OTEL.md
Normal file
144
examples/OTEL.md
Normal file
@@ -0,0 +1,144 @@
|
||||
# OpenTelemetry Integration for Docling Serve
|
||||
|
||||
Docling Serve includes built-in OpenTelemetry instrumentation for metrics and distributed tracing.
|
||||
|
||||
## Features
|
||||
|
||||
- **Metrics**: Prometheus-compatible metrics endpoint at `/metrics`
|
||||
- **Traces**: OTLP trace export to OpenTelemetry collectors
|
||||
- **FastAPI Auto-instrumentation**: HTTP request metrics and traces
|
||||
- **RQ Metrics**: Worker and job queue metrics (when using RQ engine)
|
||||
|
||||
## Configuration
|
||||
|
||||
All settings are controlled via environment variables:
|
||||
|
||||
```bash
|
||||
# Enable/disable features
|
||||
DOCLING_SERVE_OTEL_ENABLE_METRICS=true # Enable metrics collection
|
||||
DOCLING_SERVE_OTEL_ENABLE_TRACES=true # Enable trace collection
|
||||
DOCLING_SERVE_OTEL_ENABLE_PROMETHEUS=true # Enable Prometheus /metrics endpoint
|
||||
DOCLING_SERVE_OTEL_ENABLE_OTLP_METRICS=false # Enable OTLP metrics export
|
||||
|
||||
# Service identification
|
||||
DOCLING_SERVE_OTEL_SERVICE_NAME=docling-serve
|
||||
|
||||
# OTLP endpoint (for traces and optional metrics)
|
||||
OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4317
|
||||
```
|
||||
|
||||
## Quick Start
|
||||
|
||||
### Option 1: Direct Prometheus Scraping
|
||||
|
||||
1. Start docling-serve with default settings:
|
||||
```bash
|
||||
uv run docling-serve
|
||||
```
|
||||
|
||||
2. Add to your `prometheus.yml`:
|
||||
```yaml
|
||||
scrape_configs:
|
||||
- job_name: 'docling-serve'
|
||||
static_configs:
|
||||
- targets: ['localhost:5001']
|
||||
```
|
||||
|
||||
3. Access metrics at `http://localhost:5001/metrics`
|
||||
|
||||
### Option 2: Full OTEL Stack with Docker Compose
|
||||
|
||||
1. Use the provided compose file:
|
||||
```bash
|
||||
cd examples
|
||||
mkdit tempo-data
|
||||
docker-compose -f docker-compose-otel.yaml up
|
||||
```
|
||||
|
||||
2. This starts:
|
||||
- **docling-serve**: API server with UI
|
||||
- **docling-worker**: RQ worker for distributed processing (scales independently)
|
||||
- **redis**: Message queue for RQ
|
||||
- **otel-collector**: Receives and routes telemetry
|
||||
- **prometheus**: Metrics storage
|
||||
- **tempo**: Trace storage
|
||||
- **grafana**: Visualization UI
|
||||
|
||||
3. Access:
|
||||
- Docling Serve UI: `http://localhost:5001/ui`
|
||||
- Metrics endpoint: `http://localhost:5001/metrics`
|
||||
- Grafana: `http://localhost:3000` (pre-configured with Prometheus & Tempo)
|
||||
- Prometheus: `http://localhost:9090`
|
||||
- Tempo: `http://localhost:3200`
|
||||
|
||||
4. Scale workers (optional):
|
||||
```bash
|
||||
docker-compose -f docker-compose-otel.yaml up --scale docling-worker=3
|
||||
```
|
||||
|
||||
## Available Metrics
|
||||
|
||||
### HTTP Metrics (from OpenTelemetry FastAPI instrumentation)
|
||||
- `http_server_request_duration` - Request duration histogram
|
||||
- `http_server_active_requests` - Active requests gauge
|
||||
- `http_server_request_size` - Request size histogram
|
||||
- `http_server_response_size` - Response size histogram
|
||||
|
||||
### RQ Metrics (when using RQ engine)
|
||||
- `rq_workers` - Number of workers by state
|
||||
- `rq_workers_success` - Successful job count per worker
|
||||
- `rq_workers_failed` - Failed job count per worker
|
||||
- `rq_workers_working_time` - Total working time per worker
|
||||
- `rq_jobs` - Job counts by queue and status
|
||||
- `rq_request_processing_seconds` - RQ metrics collection time
|
||||
|
||||
## Traces
|
||||
|
||||
Traces are automatically generated for:
|
||||
- All HTTP requests to FastAPI endpoints
|
||||
- Document conversion operations
|
||||
- **RQ job execution (distributed tracing)**: When using RQ engine, traces propagate from API requests to worker jobs, providing end-to-end visibility across the distributed system
|
||||
|
||||
View traces in Grafana Tempo or any OTLP-compatible backend.
|
||||
|
||||
### Distributed Tracing in RQ Mode
|
||||
|
||||
When running with the RQ engine (`DOCLING_SERVE_ENG_KIND=rq`), traces automatically propagate from the API to RQ workers:
|
||||
|
||||
1. **API Request**: FastAPI creates a trace when a document conversion request arrives
|
||||
2. **Job Enqueue**: The trace context is injected into the RQ job metadata
|
||||
3. **Worker Execution**: The RQ worker extracts the trace context and continues the trace
|
||||
4. **End-to-End View**: You can see the complete request flow from API to worker in Grafana
|
||||
|
||||
This allows you to:
|
||||
- Track document processing latency across API and workers
|
||||
- Identify bottlenecks in the conversion pipeline
|
||||
- Debug distributed processing issues
|
||||
- Monitor queue wait times and processing times separately
|
||||
|
||||
## Example Files
|
||||
|
||||
See the `examples/` directory:
|
||||
- `prometheus-scrape.yaml` - Prometheus scrape configuration examples
|
||||
- `docker-compose-otel.yaml` - Full observability stack
|
||||
- `otel-collector-config.yaml` - OTEL collector configuration
|
||||
- `prometheus.yaml` - Prometheus configuration
|
||||
- `tempo.yaml` - Tempo trace storage configuration
|
||||
- `grafana-datasources.yaml` - Grafana data source provisioning
|
||||
|
||||
## Production Considerations
|
||||
|
||||
1. **Security**: Add authentication to the `/metrics` endpoint if needed
|
||||
2. **Performance**: Metrics collection has minimal overhead (<1ms per scrape)
|
||||
3. **Storage**: Configure retention policies in Prometheus/Tempo
|
||||
4. **Sampling**: Configure trace sampling for high-volume services
|
||||
5. **Labels**: Keep cardinality low to avoid metric explosion
|
||||
|
||||
## Disabling OTEL
|
||||
|
||||
To disable all OTEL features:
|
||||
|
||||
```bash
|
||||
DOCLING_SERVE_OTEL_ENABLE_METRICS=false
|
||||
DOCLING_SERVE_OTEL_ENABLE_TRACES=false
|
||||
```
|
||||
152
examples/docker-compose-otel.yaml
Normal file
152
examples/docker-compose-otel.yaml
Normal file
@@ -0,0 +1,152 @@
|
||||
# Docker Compose configuration for Docling Serve with OpenTelemetry
|
||||
# Includes OTEL collector, Prometheus, Grafana, and Tempo for full observability
|
||||
|
||||
services:
|
||||
# Docling Serve application
|
||||
docling-serve:
|
||||
build:
|
||||
context: ..
|
||||
dockerfile: Containerfile
|
||||
platform: "linux/arm64"
|
||||
ports:
|
||||
- "5001:5001"
|
||||
environment:
|
||||
# Basic settings
|
||||
- DOCLING_SERVE_ENABLE_UI=1
|
||||
|
||||
# OpenTelemetry settings
|
||||
- OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4317
|
||||
- OTEL_SERVICE_NAME=docling-serve
|
||||
- DOCLING_SERVE_OTEL_ENABLE_METRICS=true
|
||||
- DOCLING_SERVE_OTEL_ENABLE_TRACES=true
|
||||
- DOCLING_SERVE_OTEL_ENABLE_PROMETHEUS=true
|
||||
- DOCLING_SERVE_OTEL_ENABLE_OTLP_METRICS=true
|
||||
|
||||
# RQ settings for distributed processing
|
||||
- DOCLING_SERVE_ENG_KIND=rq
|
||||
- DOCLING_SERVE_ENG_RQ_REDIS_URL=redis://redis:6379/0
|
||||
|
||||
depends_on:
|
||||
- otel-collector
|
||||
- redis
|
||||
|
||||
networks:
|
||||
- docling-otel
|
||||
|
||||
# Docling Serve RQ Worker
|
||||
docling-worker:
|
||||
build:
|
||||
context: ..
|
||||
dockerfile: Containerfile
|
||||
platform: "linux/arm64"
|
||||
command: ["docling-serve", "rq-worker"]
|
||||
environment:
|
||||
# OpenTelemetry settings
|
||||
- OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4317
|
||||
- OTEL_SERVICE_NAME=docling-serve-worker
|
||||
- DOCLING_SERVE_OTEL_ENABLE_METRICS=false
|
||||
- DOCLING_SERVE_OTEL_ENABLE_TRACES=true
|
||||
|
||||
# RQ settings
|
||||
- DOCLING_SERVE_ENG_KIND=rq
|
||||
- DOCLING_SERVE_ENG_RQ_REDIS_URL=redis://redis:6379/0
|
||||
|
||||
depends_on:
|
||||
- otel-collector
|
||||
- redis
|
||||
|
||||
networks:
|
||||
- docling-otel
|
||||
|
||||
# Scale workers as needed
|
||||
deploy:
|
||||
replicas: 1
|
||||
|
||||
# OpenTelemetry Collector
|
||||
otel-collector:
|
||||
image: otel/opentelemetry-collector-contrib:latest
|
||||
command: ["--config=/etc/otel-collector-config.yaml"]
|
||||
volumes:
|
||||
- ./otel-collector-config.yaml:/etc/otel-collector-config.yaml
|
||||
ports:
|
||||
- "4317:4317" # OTLP gRPC
|
||||
- "4318:4318" # OTLP HTTP
|
||||
- "8888:8888" # Prometheus metrics (collector's own metrics)
|
||||
- "8889:8889" # Prometheus exporter
|
||||
networks:
|
||||
- docling-otel
|
||||
|
||||
# Prometheus for metrics storage
|
||||
prometheus:
|
||||
image: prom/prometheus:latest
|
||||
command:
|
||||
- '--config.file=/etc/prometheus/prometheus.yml'
|
||||
- '--storage.tsdb.path=/prometheus'
|
||||
- '--web.console.libraries=/usr/share/prometheus/console_libraries'
|
||||
- '--web.console.templates=/usr/share/prometheus/consoles'
|
||||
volumes:
|
||||
- ./prometheus.yaml:/etc/prometheus/prometheus.yml
|
||||
- prometheus-data:/prometheus
|
||||
ports:
|
||||
- "9090:9090"
|
||||
networks:
|
||||
- docling-otel
|
||||
|
||||
# Tempo for trace storage
|
||||
tempo-init:
|
||||
image: busybox:latest
|
||||
user: root
|
||||
entrypoint:
|
||||
- "chown"
|
||||
- "10001:10001"
|
||||
- "/var/tempo"
|
||||
volumes:
|
||||
- ./tempo-data:/var/tempo
|
||||
|
||||
tempo:
|
||||
image: grafana/tempo:latest
|
||||
command: ["-config.file=/etc/tempo.yaml"]
|
||||
volumes:
|
||||
- ./tempo.yaml:/etc/tempo.yaml
|
||||
- ./tempo-data:/tmp/tempo
|
||||
ports:
|
||||
- "3200:3200" # Tempo
|
||||
- "4319:4317" # OTLP gRPC (alternate port to avoid conflict)
|
||||
networks:
|
||||
- docling-otel
|
||||
depends_on:
|
||||
- tempo-init
|
||||
|
||||
# Grafana for visualization
|
||||
grafana:
|
||||
image: grafana/grafana:latest
|
||||
environment:
|
||||
- GF_AUTH_ANONYMOUS_ENABLED=true
|
||||
- GF_AUTH_ANONYMOUS_ORG_ROLE=Admin
|
||||
- GF_AUTH_DISABLE_LOGIN_FORM=true
|
||||
volumes:
|
||||
- grafana-data:/var/lib/grafana
|
||||
- ./grafana-datasources.yaml:/etc/grafana/provisioning/datasources/datasources.yaml
|
||||
ports:
|
||||
- "3000:3000"
|
||||
depends_on:
|
||||
- prometheus
|
||||
- tempo
|
||||
networks:
|
||||
- docling-otel
|
||||
|
||||
# Redis for RQ distributed processing
|
||||
redis:
|
||||
image: redis:7-alpine
|
||||
ports:
|
||||
- "6379:6379"
|
||||
networks:
|
||||
- docling-otel
|
||||
|
||||
volumes:
|
||||
prometheus-data:
|
||||
grafana-data:
|
||||
|
||||
networks:
|
||||
docling-otel:
|
||||
driver: bridge
|
||||
33
examples/grafana-datasources.yaml
Normal file
33
examples/grafana-datasources.yaml
Normal file
@@ -0,0 +1,33 @@
|
||||
# Grafana datasources configuration
|
||||
# Automatically provisions Prometheus and Tempo as data sources
|
||||
|
||||
apiVersion: 1
|
||||
|
||||
datasources:
|
||||
- name: Prometheus
|
||||
type: prometheus
|
||||
access: proxy
|
||||
url: http://prometheus:9090
|
||||
isDefault: true
|
||||
editable: true
|
||||
jsonData:
|
||||
timeInterval: 15s
|
||||
httpMethod: POST
|
||||
version: 1
|
||||
|
||||
- name: Tempo
|
||||
type: tempo
|
||||
access: proxy
|
||||
url: http://tempo:3200
|
||||
editable: true
|
||||
jsonData:
|
||||
httpMethod: GET
|
||||
tracesToLogs:
|
||||
datasourceUid: Loki
|
||||
tags: ['job', 'instance', 'pod']
|
||||
tracesToMetrics:
|
||||
datasourceUid: Prometheus
|
||||
tags: [{ key: 'service.name', value: 'service' }]
|
||||
serviceMap:
|
||||
datasourceUid: Prometheus
|
||||
version: 1
|
||||
58
examples/otel-collector-config.yaml
Normal file
58
examples/otel-collector-config.yaml
Normal file
@@ -0,0 +1,58 @@
|
||||
# OpenTelemetry Collector Configuration
|
||||
# Receives traces and metrics from docling-serve and exports to Prometheus/Tempo
|
||||
|
||||
receivers:
|
||||
otlp:
|
||||
protocols:
|
||||
grpc:
|
||||
endpoint: 0.0.0.0:4317
|
||||
http:
|
||||
endpoint: 0.0.0.0:4318
|
||||
|
||||
processors:
|
||||
batch:
|
||||
timeout: 10s
|
||||
send_batch_size: 1024
|
||||
|
||||
memory_limiter:
|
||||
check_interval: 1s
|
||||
limit_mib: 512
|
||||
|
||||
# Add resource attributes
|
||||
resource:
|
||||
attributes:
|
||||
- key: service.namespace
|
||||
value: docling
|
||||
action: insert
|
||||
|
||||
exporters:
|
||||
# Export traces to Tempo
|
||||
otlp/tempo:
|
||||
endpoint: tempo:4317
|
||||
tls:
|
||||
insecure: true
|
||||
|
||||
# Export metrics to Prometheus
|
||||
prometheus:
|
||||
endpoint: "0.0.0.0:8889"
|
||||
namespace: docling
|
||||
const_labels:
|
||||
environment: production
|
||||
|
||||
# Debug logging (optional, remove in production)
|
||||
debug:
|
||||
verbosity: detailed
|
||||
sampling_initial: 5
|
||||
sampling_thereafter: 200
|
||||
|
||||
service:
|
||||
pipelines:
|
||||
traces:
|
||||
receivers: [otlp]
|
||||
processors: [memory_limiter, batch, resource]
|
||||
exporters: [otlp/tempo, debug]
|
||||
|
||||
metrics:
|
||||
receivers: [otlp]
|
||||
processors: [memory_limiter, batch, resource]
|
||||
exporters: [prometheus, debug]
|
||||
106
examples/prometheus-scrape.yaml
Normal file
106
examples/prometheus-scrape.yaml
Normal file
@@ -0,0 +1,106 @@
|
||||
# Prometheus Scrape Configuration for Docling Serve
|
||||
# Add this to your Prometheus configuration file (prometheus.yml)
|
||||
|
||||
scrape_configs:
|
||||
- job_name: 'docling-serve'
|
||||
|
||||
# Scrape interval
|
||||
scrape_interval: 15s
|
||||
scrape_timeout: 10s
|
||||
|
||||
# Metrics path (default is /metrics)
|
||||
metrics_path: /metrics
|
||||
|
||||
# Static targets configuration
|
||||
static_configs:
|
||||
- targets:
|
||||
# Replace with your docling-serve instance(s)
|
||||
- 'localhost:5001'
|
||||
- 'docling-serve-1.example.com:5001'
|
||||
- 'docling-serve-2.example.com:5001'
|
||||
|
||||
# Optional labels to add to all metrics from this job
|
||||
labels:
|
||||
environment: 'production'
|
||||
service: 'docling-serve'
|
||||
|
||||
# Optional: Add authentication if API key is required
|
||||
# basic_auth:
|
||||
# username: ''
|
||||
# password: 'your-api-key'
|
||||
|
||||
# Optional: TLS configuration
|
||||
# tls_config:
|
||||
# ca_file: /path/to/ca.crt
|
||||
# cert_file: /path/to/client.crt
|
||||
# key_file: /path/to/client.key
|
||||
# insecure_skip_verify: false
|
||||
|
||||
---
|
||||
# VictoriaMetrics Scrape Configuration
|
||||
# For use with VictoriaMetrics, configuration is compatible with Prometheus
|
||||
|
||||
# vmagent configuration example:
|
||||
# vmagent \
|
||||
# -promscrape.config=prometheus-scrape.yaml \
|
||||
# -remoteWrite.url=http://victoriametrics:8428/api/v1/write
|
||||
|
||||
---
|
||||
# Kubernetes Service Discovery Example
|
||||
# For auto-discovery of docling-serve pods in Kubernetes
|
||||
|
||||
scrape_configs:
|
||||
- job_name: 'docling-serve-k8s'
|
||||
kubernetes_sd_configs:
|
||||
- role: pod
|
||||
namespaces:
|
||||
names:
|
||||
- default
|
||||
- docling-serve
|
||||
|
||||
relabel_configs:
|
||||
# Only scrape pods with label app=docling-serve
|
||||
- source_labels: [__meta_kubernetes_pod_label_app]
|
||||
action: keep
|
||||
regex: docling-serve
|
||||
|
||||
# Use pod name as instance label
|
||||
- source_labels: [__meta_kubernetes_pod_name]
|
||||
target_label: pod
|
||||
|
||||
# Use namespace as label
|
||||
- source_labels: [__meta_kubernetes_namespace]
|
||||
target_label: namespace
|
||||
|
||||
# Set metrics path
|
||||
- target_label: __metrics_path__
|
||||
replacement: /metrics
|
||||
|
||||
# Set port to scrape
|
||||
- source_labels: [__address__]
|
||||
action: replace
|
||||
regex: ([^:]+)(?::\d+)?
|
||||
replacement: $1:5001
|
||||
target_label: __address__
|
||||
|
||||
---
|
||||
# Available Metrics from Docling Serve:
|
||||
#
|
||||
# FastAPI/HTTP Metrics (from OpenTelemetry):
|
||||
# - http.server.request.duration - HTTP request duration histogram
|
||||
# - http.server.active_requests - Active HTTP requests gauge
|
||||
# - http.server.request.size - HTTP request size histogram
|
||||
# - http.server.response.size - HTTP response size histogram
|
||||
#
|
||||
# RQ Metrics (when using RQ engine):
|
||||
# - rq_workers - Number of RQ workers by state
|
||||
# - rq_workers_success - Successful job count per worker
|
||||
# - rq_workers_failed - Failed job count per worker
|
||||
# - rq_workers_working_time - Total working time per worker
|
||||
# - rq_jobs - Job counts by queue and status
|
||||
# - rq_request_processing_seconds - Time spent collecting RQ metrics
|
||||
#
|
||||
# System Metrics (via Python OpenTelemetry):
|
||||
# - process.runtime.cpython.cpu.utilization - CPU utilization
|
||||
# - process.runtime.cpython.memory - Memory usage
|
||||
# - process.runtime.cpython.gc.count - Garbage collection count
|
||||
30
examples/prometheus.yaml
Normal file
30
examples/prometheus.yaml
Normal file
@@ -0,0 +1,30 @@
|
||||
# Prometheus configuration for scraping metrics
|
||||
|
||||
global:
|
||||
scrape_interval: 15s
|
||||
evaluation_interval: 15s
|
||||
external_labels:
|
||||
cluster: 'docling-cluster'
|
||||
replica: '1'
|
||||
|
||||
scrape_configs:
|
||||
# Scrape docling-serve metrics endpoint directly
|
||||
- job_name: 'docling-serve'
|
||||
static_configs:
|
||||
- targets: ['docling-serve:5001']
|
||||
labels:
|
||||
service: 'docling-serve'
|
||||
|
||||
# Scrape OTEL collector's own metrics
|
||||
- job_name: 'otel-collector'
|
||||
static_configs:
|
||||
- targets: ['otel-collector:8888']
|
||||
labels:
|
||||
service: 'otel-collector'
|
||||
|
||||
# Scrape metrics exported by OTEL collector
|
||||
- job_name: 'otel-metrics'
|
||||
static_configs:
|
||||
- targets: ['otel-collector:8889']
|
||||
labels:
|
||||
service: 'docling-otel-metrics'
|
||||
46
examples/tempo.yaml
Normal file
46
examples/tempo.yaml
Normal file
@@ -0,0 +1,46 @@
|
||||
# Tempo configuration for trace storage
|
||||
|
||||
server:
|
||||
http_listen_port: 3200
|
||||
|
||||
distributor:
|
||||
receivers:
|
||||
otlp:
|
||||
protocols:
|
||||
grpc:
|
||||
endpoint: 0.0.0.0:4317
|
||||
http:
|
||||
endpoint: 0.0.0.0:4318
|
||||
|
||||
ingester:
|
||||
trace_idle_period: 10s
|
||||
max_block_bytes: 1048576
|
||||
max_block_duration: 5m
|
||||
|
||||
compactor:
|
||||
compaction:
|
||||
compaction_window: 1h
|
||||
max_compaction_objects: 1000000
|
||||
block_retention: 1h
|
||||
compacted_block_retention: 10m
|
||||
|
||||
storage:
|
||||
trace:
|
||||
backend: local
|
||||
local:
|
||||
path: /tmp/tempo/blocks
|
||||
wal:
|
||||
path: /tmp/tempo/wal
|
||||
pool:
|
||||
max_workers: 100
|
||||
queue_depth: 10000
|
||||
|
||||
metrics_generator:
|
||||
registry:
|
||||
external_labels:
|
||||
source: tempo
|
||||
storage:
|
||||
path: /tmp/tempo/generator/wal
|
||||
remote_write:
|
||||
- url: http://prometheus:9090/api/v1/write
|
||||
send_exemplars: true
|
||||
@@ -46,6 +46,12 @@ dependencies = [
|
||||
"websockets~=14.0",
|
||||
"scalar-fastapi>=1.0.3",
|
||||
"docling-mcp>=1.0.0",
|
||||
"opentelemetry-api>=1.36.0,<2.0.0",
|
||||
"opentelemetry-sdk>=1.36.0,<2.0.0",
|
||||
"opentelemetry-exporter-otlp>=1.15.0",
|
||||
"opentelemetry-instrumentation-fastapi>=0.57b0,<0.58",
|
||||
"opentelemetry-exporter-prometheus>=0.57b0",
|
||||
"prometheus-client>=0.21.0",
|
||||
]
|
||||
|
||||
[project.optional-dependencies]
|
||||
@@ -287,6 +293,8 @@ module = [
|
||||
"mlx_vlm.*",
|
||||
"mlx.*",
|
||||
"scalar_fastapi.*",
|
||||
"gradio.*",
|
||||
"msgpack.*",
|
||||
]
|
||||
ignore_missing_imports = true
|
||||
|
||||
|
||||
Reference in New Issue
Block a user