feat!: use orchestrators from jobkit (#248)

Signed-off-by: Michele Dolfi <dol@zurich.ibm.com>
This commit is contained in:
Michele Dolfi
2025-07-10 15:47:22 +02:00
committed by GitHub
parent e63197e89e
commit daa924a77e
30 changed files with 813 additions and 1997 deletions

View File

@@ -30,6 +30,7 @@ logger = logging.getLogger(__name__)
def version_callback(value: bool) -> None:
if value:
docling_serve_version = importlib.metadata.version("docling_serve")
docling_jobkit_version = importlib.metadata.version("docling-jobkit")
docling_version = importlib.metadata.version("docling")
docling_core_version = importlib.metadata.version("docling-core")
docling_ibm_models_version = importlib.metadata.version("docling-ibm-models")
@@ -38,6 +39,7 @@ def version_callback(value: bool) -> None:
py_impl_version = sys.implementation.cache_tag
py_lang_version = platform.python_version()
console.print(f"Docling Serve version: {docling_serve_version}")
console.print(f"Docling Jobkit version: {docling_jobkit_version}")
console.print(f"Docling version: {docling_version}")
console.print(f"Docling Core version: {docling_core_version}")
console.print(f"Docling IBM Models version: {docling_ibm_models_version}")

View File

@@ -28,12 +28,18 @@ from fastapi.staticfiles import StaticFiles
from scalar_fastapi import get_scalar_api_reference
from docling.datamodel.base_models import DocumentStream
from docling_serve.datamodel.callback import (
from docling_jobkit.datamodel.callback import (
ProgressCallbackRequest,
ProgressCallbackResponse,
)
from docling_serve.datamodel.convert import ConvertDocumentsOptions
from docling_jobkit.datamodel.task import Task, TaskSource
from docling_jobkit.orchestrators.base_orchestrator import (
BaseOrchestrator,
ProgressInvalid,
TaskNotFoundError,
)
from docling_serve.datamodel.convert import ConvertDocumentsRequestOptions
from docling_serve.datamodel.requests import (
ConvertDocumentFileSourcesRequest,
ConvertDocumentHttpSourcesRequest,
@@ -47,17 +53,12 @@ from docling_serve.datamodel.responses import (
TaskStatusResponse,
WebsocketMessage,
)
from docling_serve.datamodel.task import Task, TaskSource
from docling_serve.docling_conversion import _get_converter_from_hash
from docling_serve.engines.async_orchestrator import (
BaseAsyncOrchestrator,
ProgressInvalid,
)
from docling_serve.engines.async_orchestrator_factory import get_async_orchestrator
from docling_serve.engines.base_orchestrator import TaskNotFoundError
from docling_serve.helper_functions import FormDepends
from docling_serve.orchestrator_factory import get_async_orchestrator
from docling_serve.response_preparation import prepare_response
from docling_serve.settings import docling_serve_settings
from docling_serve.storage import get_scratch
from docling_serve.websocker_notifier import WebsocketNotifier
# Set up custom logging as we'll be intermixes with FastAPI/Uvicorn's logging
@@ -95,9 +96,12 @@ _log = logging.getLogger(__name__)
# Context manager to initialize and clean up the lifespan of the FastAPI app
@asynccontextmanager
async def lifespan(app: FastAPI):
orchestrator = get_async_orchestrator()
scratch_dir = get_scratch()
orchestrator = get_async_orchestrator()
notifier = WebsocketNotifier(orchestrator)
orchestrator.bind_notifier(notifier)
# Warm up processing cache
if docling_serve_settings.load_models_at_boot:
await orchestrator.warm_up_caches()
@@ -230,7 +234,7 @@ def create_app(): # noqa: C901
########################
async def _enque_source(
orchestrator: BaseAsyncOrchestrator, conversion_request: ConvertDocumentsRequest
orchestrator: BaseOrchestrator, conversion_request: ConvertDocumentsRequest
) -> Task:
sources: list[TaskSource] = []
if isinstance(conversion_request, ConvertDocumentFileSourcesRequest):
@@ -244,9 +248,9 @@ def create_app(): # noqa: C901
return task
async def _enque_file(
orchestrator: BaseAsyncOrchestrator,
orchestrator: BaseOrchestrator,
files: list[UploadFile],
options: ConvertDocumentsOptions,
options: ConvertDocumentsRequestOptions,
) -> Task:
_log.info(f"Received {len(files)} files for processing.")
@@ -261,9 +265,7 @@ def create_app(): # noqa: C901
task = await orchestrator.enqueue(sources=file_sources, options=options)
return task
async def _wait_task_complete(
orchestrator: BaseAsyncOrchestrator, task_id: str
) -> bool:
async def _wait_task_complete(orchestrator: BaseOrchestrator, task_id: str) -> bool:
start_time = time.monotonic()
while True:
task = await orchestrator.task_status(task_id=task_id)
@@ -309,32 +311,28 @@ def create_app(): # noqa: C901
)
async def process_url(
background_tasks: BackgroundTasks,
orchestrator: Annotated[BaseAsyncOrchestrator, Depends(get_async_orchestrator)],
orchestrator: Annotated[BaseOrchestrator, Depends(get_async_orchestrator)],
conversion_request: ConvertDocumentsRequest,
):
task = await _enque_source(
orchestrator=orchestrator, conversion_request=conversion_request
)
success = await _wait_task_complete(
completed = await _wait_task_complete(
orchestrator=orchestrator, task_id=task.task_id
)
if not success:
if not completed:
# TODO: abort task!
return HTTPException(
status_code=504,
detail=f"Conversion is taking too long. The maximum wait time is configure as DOCLING_SERVE_MAX_SYNC_WAIT={docling_serve_settings.max_sync_wait}.",
)
result = await orchestrator.task_result(
task_id=task.task_id, background_tasks=background_tasks
task = await orchestrator.get_raw_task(task_id=task.task_id)
response = await prepare_response(
task=task, orchestrator=orchestrator, background_tasks=background_tasks
)
if result is None:
raise HTTPException(
status_code=404,
detail="Task result not found. Please wait for a completion status.",
)
return result
return response
# Convert a document from file(s)
@app.post(
@@ -348,35 +346,31 @@ def create_app(): # noqa: C901
)
async def process_file(
background_tasks: BackgroundTasks,
orchestrator: Annotated[BaseAsyncOrchestrator, Depends(get_async_orchestrator)],
orchestrator: Annotated[BaseOrchestrator, Depends(get_async_orchestrator)],
files: list[UploadFile],
options: Annotated[
ConvertDocumentsOptions, FormDepends(ConvertDocumentsOptions)
ConvertDocumentsRequestOptions, FormDepends(ConvertDocumentsRequestOptions)
],
):
task = await _enque_file(
orchestrator=orchestrator, files=files, options=options
)
success = await _wait_task_complete(
completed = await _wait_task_complete(
orchestrator=orchestrator, task_id=task.task_id
)
if not success:
if not completed:
# TODO: abort task!
return HTTPException(
status_code=504,
detail=f"Conversion is taking too long. The maximum wait time is configure as DOCLING_SERVE_MAX_SYNC_WAIT={docling_serve_settings.max_sync_wait}.",
)
result = await orchestrator.task_result(
task_id=task.task_id, background_tasks=background_tasks
task = await orchestrator.get_raw_task(task_id=task.task_id)
response = await prepare_response(
task=task, orchestrator=orchestrator, background_tasks=background_tasks
)
if result is None:
raise HTTPException(
status_code=404,
detail="Task result not found. Please wait for a completion status.",
)
return result
return response
# Convert a document from URL(s) using the async api
@app.post(
@@ -384,7 +378,7 @@ def create_app(): # noqa: C901
response_model=TaskStatusResponse,
)
async def process_url_async(
orchestrator: Annotated[BaseAsyncOrchestrator, Depends(get_async_orchestrator)],
orchestrator: Annotated[BaseOrchestrator, Depends(get_async_orchestrator)],
conversion_request: ConvertDocumentsRequest,
):
task = await _enque_source(
@@ -406,11 +400,11 @@ def create_app(): # noqa: C901
response_model=TaskStatusResponse,
)
async def process_file_async(
orchestrator: Annotated[BaseAsyncOrchestrator, Depends(get_async_orchestrator)],
orchestrator: Annotated[BaseOrchestrator, Depends(get_async_orchestrator)],
background_tasks: BackgroundTasks,
files: list[UploadFile],
options: Annotated[
ConvertDocumentsOptions, FormDepends(ConvertDocumentsOptions)
ConvertDocumentsRequestOptions, FormDepends(ConvertDocumentsRequestOptions)
],
):
task = await _enque_file(
@@ -432,7 +426,7 @@ def create_app(): # noqa: C901
response_model=TaskStatusResponse,
)
async def task_status_poll(
orchestrator: Annotated[BaseAsyncOrchestrator, Depends(get_async_orchestrator)],
orchestrator: Annotated[BaseOrchestrator, Depends(get_async_orchestrator)],
task_id: str,
wait: Annotated[
float, Query(help="Number of seconds to wait for a completed status.")
@@ -456,9 +450,10 @@ def create_app(): # noqa: C901
)
async def task_status_ws(
websocket: WebSocket,
orchestrator: Annotated[BaseAsyncOrchestrator, Depends(get_async_orchestrator)],
orchestrator: Annotated[BaseOrchestrator, Depends(get_async_orchestrator)],
task_id: str,
):
assert isinstance(orchestrator.notifier, WebsocketNotifier)
await websocket.accept()
if task_id not in orchestrator.tasks:
@@ -473,7 +468,7 @@ def create_app(): # noqa: C901
task = orchestrator.tasks[task_id]
# Track active WebSocket connections for this job
orchestrator.task_subscribers[task_id].add(websocket)
orchestrator.notifier.task_subscribers[task_id].add(websocket)
try:
task_queue_position = await orchestrator.get_queue_position(task_id=task_id)
@@ -511,7 +506,7 @@ def create_app(): # noqa: C901
_log.info(f"WebSocket disconnected for job {task_id}")
finally:
orchestrator.task_subscribers[task_id].remove(websocket)
orchestrator.notifier.task_subscribers[task_id].remove(websocket)
# Task result
@app.get(
@@ -524,19 +519,18 @@ def create_app(): # noqa: C901
},
)
async def task_result(
orchestrator: Annotated[BaseAsyncOrchestrator, Depends(get_async_orchestrator)],
orchestrator: Annotated[BaseOrchestrator, Depends(get_async_orchestrator)],
background_tasks: BackgroundTasks,
task_id: str,
):
result = await orchestrator.task_result(
task_id=task_id, background_tasks=background_tasks
)
if result is None:
raise HTTPException(
status_code=404,
detail="Task result not found. Please wait for a completion status.",
try:
task = await orchestrator.get_raw_task(task_id=task_id)
response = await prepare_response(
task=task, orchestrator=orchestrator, background_tasks=background_tasks
)
return result
return response
except TaskNotFoundError:
raise HTTPException(status_code=404, detail="Task not found.")
# Update task progress
@app.post(
@@ -544,7 +538,7 @@ def create_app(): # noqa: C901
response_model=ProgressCallbackResponse,
)
async def callback_task_progress(
orchestrator: Annotated[BaseAsyncOrchestrator, Depends(get_async_orchestrator)],
orchestrator: Annotated[BaseOrchestrator, Depends(get_async_orchestrator)],
request: ProgressCallbackRequest,
):
try:
@@ -564,8 +558,10 @@ def create_app(): # noqa: C901
"/v1alpha/clear/converters",
response_model=ClearResponse,
)
async def clear_converters():
_get_converter_from_hash.cache_clear()
async def clear_converters(
orchestrator: Annotated[BaseOrchestrator, Depends(get_async_orchestrator)],
):
await orchestrator.clear_converters()
return ClearResponse()
# Clean results
@@ -574,7 +570,7 @@ def create_app(): # noqa: C901
response_model=ClearResponse,
)
async def clear_results(
orchestrator: Annotated[BaseAsyncOrchestrator, Depends(get_async_orchestrator)],
orchestrator: Annotated[BaseOrchestrator, Depends(get_async_orchestrator)],
older_then: float = 3600,
):
await orchestrator.clear_results(older_than=older_then)

View File

@@ -1,50 +0,0 @@
import enum
from typing import Annotated, Literal
from pydantic import BaseModel, Field
class ProgressKind(str, enum.Enum):
SET_NUM_DOCS = "set_num_docs"
UPDATE_PROCESSED = "update_processed"
class BaseProgress(BaseModel):
kind: ProgressKind
class ProgressSetNumDocs(BaseProgress):
kind: Literal[ProgressKind.SET_NUM_DOCS] = ProgressKind.SET_NUM_DOCS
num_docs: int
class SucceededDocsItem(BaseModel):
source: str
class FailedDocsItem(BaseModel):
source: str
error: str
class ProgressUpdateProcessed(BaseProgress):
kind: Literal[ProgressKind.UPDATE_PROCESSED] = ProgressKind.UPDATE_PROCESSED
num_processed: int
num_succeeded: int
num_failed: int
docs_succeeded: list[SucceededDocsItem]
docs_failed: list[FailedDocsItem]
class ProgressCallbackRequest(BaseModel):
task_id: str
progress: Annotated[
ProgressSetNumDocs | ProgressUpdateProcessed, Field(discriminator="kind")
]
class ProgressCallbackResponse(BaseModel):
status: Literal["ack"] = "ack"

View File

@@ -1,24 +1,13 @@
# Define the input options for the API
from typing import Annotated, Any, Optional
from typing import Annotated
from pydantic import AnyUrl, BaseModel, Field, model_validator
from typing_extensions import Self
from pydantic import Field
from docling.datamodel.base_models import InputFormat, OutputFormat
from docling.datamodel.pipeline_options import (
EasyOcrOptions,
PdfBackend,
PictureDescriptionBaseOptions,
ProcessingPipeline,
TableFormerMode,
TableStructureOptions,
)
from docling.datamodel.settings import (
DEFAULT_PAGE_RANGE,
PageRange,
)
from docling.models.factories import get_ocr_factory
from docling_core.types.doc import ImageRefMode
from docling_jobkit.datamodel.convert import ConvertDocumentsOptions
from docling_serve.settings import docling_serve_settings
@@ -28,154 +17,7 @@ ocr_factory = get_ocr_factory(
ocr_engines_enum = ocr_factory.get_enum()
class PictureDescriptionLocal(BaseModel):
repo_id: Annotated[
str,
Field(
description="Repository id from the Hugging Face Hub.",
examples=[
"HuggingFaceTB/SmolVLM-256M-Instruct",
"ibm-granite/granite-vision-3.2-2b",
],
),
]
prompt: Annotated[
str,
Field(
description="Prompt used when calling the vision-language model.",
examples=[
"Describe this image in a few sentences.",
"This is a figure from a document. Provide a detailed description of it.",
],
),
] = "Describe this image in a few sentences."
generation_config: Annotated[
dict[str, Any],
Field(
description="Config from https://huggingface.co/docs/transformers/en/main_classes/text_generation#transformers.GenerationConfig",
examples=[{"max_new_tokens": 200, "do_sample": False}],
),
] = {"max_new_tokens": 200, "do_sample": False}
class PictureDescriptionApi(BaseModel):
url: Annotated[
AnyUrl,
Field(
description="Endpoint which accepts openai-api compatible requests.",
examples=[
AnyUrl(
"http://localhost:8000/v1/chat/completions"
), # example of a local vllm api
AnyUrl(
"http://localhost:11434/v1/chat/completions"
), # example of ollama
],
),
]
headers: Annotated[
dict[str, str],
Field(
description="Headers used for calling the API endpoint. For example, it could include authentication headers."
),
] = {}
params: Annotated[
dict[str, Any],
Field(
description="Model parameters.",
examples=[
{ # on vllm
"model": "HuggingFaceTB/SmolVLM-256M-Instruct",
"max_completion_tokens": 200,
},
{ # on vllm
"model": "ibm-granite/granite-vision-3.2-2b",
"max_completion_tokens": 200,
},
{ # on ollama
"model": "granite3.2-vision:2b"
},
],
),
] = {}
timeout: Annotated[float, Field(description="Timeout for the API request.")] = 20
prompt: Annotated[
str,
Field(
description="Prompt used when calling the vision-language model.",
examples=[
"Describe this image in a few sentences.",
"This is a figures from a document. Provide a detailed description of it.",
],
),
] = "Describe this image in a few sentences."
class ConvertDocumentsOptions(BaseModel):
from_formats: Annotated[
list[InputFormat],
Field(
description=(
"Input format(s) to convert from. String or list of strings. "
f"Allowed values: {', '.join([v.value for v in InputFormat])}. "
"Optional, defaults to all formats."
),
examples=[[v.value for v in InputFormat]],
),
] = list(InputFormat)
to_formats: Annotated[
list[OutputFormat],
Field(
description=(
"Output format(s) to convert to. String or list of strings. "
f"Allowed values: {', '.join([v.value for v in OutputFormat])}. "
"Optional, defaults to Markdown."
),
examples=[
[OutputFormat.MARKDOWN],
[OutputFormat.MARKDOWN, OutputFormat.JSON],
[v.value for v in OutputFormat],
],
),
] = [OutputFormat.MARKDOWN]
image_export_mode: Annotated[
ImageRefMode,
Field(
description=(
"Image export mode for the document (in case of JSON,"
" Markdown or HTML). "
f"Allowed values: {', '.join([v.value for v in ImageRefMode])}. "
"Optional, defaults to Embedded."
),
examples=[ImageRefMode.EMBEDDED.value],
# pattern="embedded|placeholder|referenced",
),
] = ImageRefMode.EMBEDDED
do_ocr: Annotated[
bool,
Field(
description=(
"If enabled, the bitmap content will be processed using OCR. "
"Boolean. Optional, defaults to true"
),
# examples=[True],
),
] = True
force_ocr: Annotated[
bool,
Field(
description=(
"If enabled, replace existing text with OCR-generated "
"text over content. Boolean. Optional, defaults to false."
),
# examples=[False],
),
] = False
class ConvertDocumentsRequestOptions(ConvertDocumentsOptions):
ocr_engine: Annotated[ # type: ignore
ocr_engines_enum,
Field(
@@ -188,57 +30,6 @@ class ConvertDocumentsOptions(BaseModel):
),
] = ocr_engines_enum(EasyOcrOptions.kind) # type: ignore
ocr_lang: Annotated[
Optional[list[str]],
Field(
description=(
"List of languages used by the OCR engine. "
"Note that each OCR engine has "
"different values for the language names. String or list of strings. "
"Optional, defaults to empty."
),
examples=[["fr", "de", "es", "en"]],
),
] = None
pdf_backend: Annotated[
PdfBackend,
Field(
description=(
"The PDF backend to use. String. "
f"Allowed values: {', '.join([v.value for v in PdfBackend])}. "
f"Optional, defaults to {PdfBackend.DLPARSE_V4.value}."
),
examples=[PdfBackend.DLPARSE_V4],
),
] = PdfBackend.DLPARSE_V4
table_mode: Annotated[
TableFormerMode,
Field(
description=(
"Mode to use for table structure, String. "
f"Allowed values: {', '.join([v.value for v in TableFormerMode])}. "
"Optional, defaults to fast."
),
examples=[TableStructureOptions().mode],
# pattern="fast|accurate",
),
] = TableStructureOptions().mode
pipeline: Annotated[
ProcessingPipeline,
Field(description="Choose the pipeline to process PDF or image files."),
] = ProcessingPipeline.STANDARD
page_range: Annotated[
PageRange,
Field(
description="Only convert a range of pages. The page number starts at 1.",
examples=[DEFAULT_PAGE_RANGE, (1, 4)],
),
] = DEFAULT_PAGE_RANGE
document_timeout: Annotated[
float,
Field(
@@ -247,152 +38,3 @@ class ConvertDocumentsOptions(BaseModel):
le=docling_serve_settings.max_document_timeout,
),
] = docling_serve_settings.max_document_timeout
abort_on_error: Annotated[
bool,
Field(
description=(
"Abort on error if enabled. Boolean. Optional, defaults to false."
),
# examples=[False],
),
] = False
return_as_file: Annotated[
bool,
Field(
description=(
"Return the output as a zip file "
"(will happen anyway if multiple files are generated). "
"Boolean. Optional, defaults to false."
),
examples=[False],
),
] = False
do_table_structure: Annotated[
bool,
Field(
description=(
"If enabled, the table structure will be extracted. "
"Boolean. Optional, defaults to true."
),
examples=[True],
),
] = True
include_images: Annotated[
bool,
Field(
description=(
"If enabled, images will be extracted from the document. "
"Boolean. Optional, defaults to true."
),
examples=[True],
),
] = True
images_scale: Annotated[
float,
Field(
description="Scale factor for images. Float. Optional, defaults to 2.0.",
examples=[2.0],
),
] = 2.0
md_page_break_placeholder: Annotated[
str,
Field(
description="Add this placeholder betweek pages in the markdown output.",
examples=["<!-- page-break -->", ""],
),
] = ""
do_code_enrichment: Annotated[
bool,
Field(
description=(
"If enabled, perform OCR code enrichment. "
"Boolean. Optional, defaults to false."
),
examples=[False],
),
] = False
do_formula_enrichment: Annotated[
bool,
Field(
description=(
"If enabled, perform formula OCR, return LaTeX code. "
"Boolean. Optional, defaults to false."
),
examples=[False],
),
] = False
do_picture_classification: Annotated[
bool,
Field(
description=(
"If enabled, classify pictures in documents. "
"Boolean. Optional, defaults to false."
),
examples=[False],
),
] = False
do_picture_description: Annotated[
bool,
Field(
description=(
"If enabled, describe pictures in documents. "
"Boolean. Optional, defaults to false."
),
examples=[False],
),
] = False
picture_description_area_threshold: Annotated[
float,
Field(
description="Minimum percentage of the area for a picture to be processed with the models.",
examples=[PictureDescriptionBaseOptions().picture_area_threshold],
),
] = PictureDescriptionBaseOptions().picture_area_threshold
picture_description_local: Annotated[
Optional[PictureDescriptionLocal],
Field(
description="Options for running a local vision-language model in the picture description. The parameters refer to a model hosted on Hugging Face. This parameter is mutually exclusive with picture_description_api.",
examples=[
PictureDescriptionLocal(repo_id="ibm-granite/granite-vision-3.2-2b"),
PictureDescriptionLocal(repo_id="HuggingFaceTB/SmolVLM-256M-Instruct"),
],
),
] = None
picture_description_api: Annotated[
Optional[PictureDescriptionApi],
Field(
description="API details for using a vision-language model in the picture description. This parameter is mutually exclusive with picture_description_local.",
examples=[
PictureDescriptionApi(
url="http://localhost:11434/v1/chat/completions",
params={"model": "granite3.2-vision:2b"},
)
],
),
] = None
@model_validator(mode="after")
def picture_description_exclusivity(self) -> Self:
# Validate picture description options
if (
self.picture_description_local is not None
and self.picture_description_api is not None
):
raise ValueError(
"The parameters picture_description_local and picture_description_api are mutually exclusive, only one of them can be set."
)
return self

View File

@@ -1,13 +0,0 @@
import enum
class TaskStatus(str, enum.Enum):
SUCCESS = "success"
PENDING = "pending"
STARTED = "started"
FAILURE = "failure"
class AsyncEngine(str, enum.Enum):
LOCAL = "local"
KFP = "kfp"

View File

@@ -1,7 +0,0 @@
from pydantic import AnyUrl, BaseModel
class CallbackSpec(BaseModel):
url: AnyUrl
headers: dict[str, str] = {}
ca_cert: str = ""

View File

@@ -1,52 +1,14 @@
import base64
from io import BytesIO
from typing import Annotated, Any, Union
from typing import Union
from pydantic import AnyHttpUrl, BaseModel, Field
from pydantic import BaseModel
from docling.datamodel.base_models import DocumentStream
from docling_jobkit.datamodel.http_inputs import FileSource, HttpSource
from docling_serve.datamodel.convert import ConvertDocumentsOptions
from docling_serve.datamodel.convert import ConvertDocumentsRequestOptions
class DocumentsConvertBase(BaseModel):
options: ConvertDocumentsOptions = ConvertDocumentsOptions()
class HttpSource(BaseModel):
url: Annotated[
AnyHttpUrl,
Field(
description="HTTP url to process",
examples=["https://arxiv.org/pdf/2206.01062"],
),
]
headers: Annotated[
dict[str, Any],
Field(
description="Additional headers used to fetch the urls, "
"e.g. authorization, agent, etc"
),
] = {}
class FileSource(BaseModel):
base64_string: Annotated[
str,
Field(
description="Content of the file serialized in base64. "
"For example it can be obtained via "
"`base64 -w 0 /path/to/file/pdf-to-convert.pdf`."
),
]
filename: Annotated[
str,
Field(description="Filename of the uploaded document", examples=["file.pdf"]),
]
def to_document_stream(self) -> DocumentStream:
buf = BytesIO(base64.b64decode(self.base64_string))
return DocumentStream(stream=buf, name=self.filename)
options: ConvertDocumentsRequestOptions = ConvertDocumentsRequestOptions()
class ConvertDocumentHttpSourcesRequest(DocumentsConvertBase):

View File

@@ -6,8 +6,7 @@ from pydantic import BaseModel
from docling.datamodel.document import ConversionStatus, ErrorItem
from docling.utils.profiling import ProfilingItem
from docling_core.types.doc import DoclingDocument
from docling_serve.datamodel.task_meta import TaskProcessingMeta
from docling_jobkit.datamodel.task_meta import TaskProcessingMeta
# Status

View File

@@ -1,55 +0,0 @@
import datetime
from functools import partial
from pathlib import Path
from typing import Optional, Union
from fastapi.responses import FileResponse
from pydantic import BaseModel, ConfigDict, Field
from docling.datamodel.base_models import DocumentStream
from docling_serve.datamodel.convert import ConvertDocumentsOptions
from docling_serve.datamodel.engines import TaskStatus
from docling_serve.datamodel.requests import FileSource, HttpSource
from docling_serve.datamodel.responses import ConvertDocumentResponse
from docling_serve.datamodel.task_meta import TaskProcessingMeta
TaskSource = Union[HttpSource, FileSource, DocumentStream]
class Task(BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True)
task_id: str
task_status: TaskStatus = TaskStatus.PENDING
sources: list[TaskSource] = []
options: Optional[ConvertDocumentsOptions]
result: Optional[Union[ConvertDocumentResponse, FileResponse]] = None
scratch_dir: Optional[Path] = None
processing_meta: Optional[TaskProcessingMeta] = None
created_at: datetime.datetime = Field(
default_factory=partial(datetime.datetime.now, datetime.timezone.utc)
)
started_at: Optional[datetime.datetime] = None
finished_at: Optional[datetime.datetime] = None
last_update_at: datetime.datetime = Field(
default_factory=partial(datetime.datetime.now, datetime.timezone.utc)
)
def set_status(self, status: TaskStatus):
now = datetime.datetime.now(datetime.timezone.utc)
if status == TaskStatus.STARTED and self.started_at is None:
self.started_at = now
if (
status in [TaskStatus.SUCCESS, TaskStatus.FAILURE]
and self.finished_at is None
):
self.finished_at = now
self.last_update_at = now
self.task_status = status
def is_completed(self) -> bool:
if self.task_status in [TaskStatus.SUCCESS, TaskStatus.FAILURE]:
return True
return False

View File

@@ -1,8 +0,0 @@
from pydantic import BaseModel
class TaskProcessingMeta(BaseModel):
num_docs: int
num_processed: int = 0
num_succeeded: int = 0
num_failed: int = 0

View File

@@ -1,256 +0,0 @@
import hashlib
import json
import logging
import sys
from collections.abc import Iterable, Iterator
from functools import lru_cache
from pathlib import Path
from typing import Any, Optional, Union
from fastapi import HTTPException
from docling.backend.docling_parse_backend import DoclingParseDocumentBackend
from docling.backend.docling_parse_v2_backend import DoclingParseV2DocumentBackend
from docling.backend.docling_parse_v4_backend import DoclingParseV4DocumentBackend
from docling.backend.pdf_backend import PdfDocumentBackend
from docling.backend.pypdfium2_backend import PyPdfiumDocumentBackend
from docling.datamodel.base_models import DocumentStream, InputFormat
from docling.datamodel.document import ConversionResult
from docling.datamodel.pipeline_options import (
OcrOptions,
PdfBackend,
PdfPipelineOptions,
PictureDescriptionApiOptions,
PictureDescriptionVlmOptions,
ProcessingPipeline,
TableFormerMode,
VlmPipelineOptions,
smoldocling_vlm_conversion_options,
smoldocling_vlm_mlx_conversion_options,
)
from docling.document_converter import DocumentConverter, FormatOption, PdfFormatOption
from docling.pipeline.vlm_pipeline import VlmPipeline
from docling_core.types.doc import ImageRefMode
from docling_serve.datamodel.convert import ConvertDocumentsOptions, ocr_factory
from docling_serve.helper_functions import _to_list_of_strings
from docling_serve.settings import docling_serve_settings
_log = logging.getLogger(__name__)
# Custom serializer for PdfFormatOption
# (model_dump_json does not work with some classes)
def _hash_pdf_format_option(pdf_format_option: PdfFormatOption) -> bytes:
data = pdf_format_option.model_dump(serialize_as_any=True)
# pipeline_options are not fully serialized by model_dump, dedicated pass
if pdf_format_option.pipeline_options:
data["pipeline_options"] = pdf_format_option.pipeline_options.model_dump(
serialize_as_any=True, mode="json"
)
# Replace `pipeline_cls` with a string representation
data["pipeline_cls"] = repr(data["pipeline_cls"])
# Replace `backend` with a string representation
data["backend"] = repr(data["backend"])
# Serialize the dictionary to JSON with sorted keys to have consistent hashes
serialized_data = json.dumps(data, sort_keys=True)
options_hash = hashlib.sha1(
serialized_data.encode(), usedforsecurity=False
).digest()
return options_hash
# Cache of DocumentConverter objects
_options_map: dict[bytes, PdfFormatOption] = {}
@lru_cache(maxsize=docling_serve_settings.options_cache_size)
def _get_converter_from_hash(options_hash: bytes) -> DocumentConverter:
pdf_format_option = _options_map[options_hash]
format_options: dict[InputFormat, FormatOption] = {
InputFormat.PDF: pdf_format_option,
InputFormat.IMAGE: pdf_format_option,
}
return DocumentConverter(format_options=format_options)
def get_converter(pdf_format_option: PdfFormatOption) -> DocumentConverter:
options_hash = _hash_pdf_format_option(pdf_format_option)
_options_map[options_hash] = pdf_format_option
return _get_converter_from_hash(options_hash)
def _parse_standard_pdf_opts(
request: ConvertDocumentsOptions, artifacts_path: Optional[Path]
) -> PdfPipelineOptions:
try:
ocr_options: OcrOptions = ocr_factory.create_options(
kind=request.ocr_engine.value, # type: ignore
force_full_page_ocr=request.force_ocr,
)
except ImportError as err:
raise HTTPException(
status_code=400,
detail="The requested OCR engine"
f" (ocr_engine={request.ocr_engine.value})" # type: ignore
" is not available on this system. Please choose another OCR engine "
"or contact your system administrator.\n"
f"{err}",
)
if request.ocr_lang is not None:
if isinstance(request.ocr_lang, str):
ocr_options.lang = _to_list_of_strings(request.ocr_lang)
else:
ocr_options.lang = request.ocr_lang
pipeline_options = PdfPipelineOptions(
artifacts_path=artifacts_path,
enable_remote_services=docling_serve_settings.enable_remote_services,
document_timeout=request.document_timeout,
do_ocr=request.do_ocr,
ocr_options=ocr_options,
do_table_structure=request.do_table_structure,
do_code_enrichment=request.do_code_enrichment,
do_formula_enrichment=request.do_formula_enrichment,
do_picture_classification=request.do_picture_classification,
do_picture_description=request.do_picture_description,
)
pipeline_options.table_structure_options.mode = TableFormerMode(request.table_mode)
if request.image_export_mode != ImageRefMode.PLACEHOLDER:
pipeline_options.generate_page_images = True
if request.image_export_mode == ImageRefMode.REFERENCED:
pipeline_options.generate_picture_images = True
if request.images_scale:
pipeline_options.images_scale = request.images_scale
if request.picture_description_local is not None:
pipeline_options.picture_description_options = (
PictureDescriptionVlmOptions.model_validate(
request.picture_description_local.model_dump()
)
)
if request.picture_description_api is not None:
pipeline_options.picture_description_options = (
PictureDescriptionApiOptions.model_validate(
request.picture_description_api.model_dump()
)
)
pipeline_options.picture_description_options.picture_area_threshold = (
request.picture_description_area_threshold
)
return pipeline_options
def _parse_backend(request: ConvertDocumentsOptions) -> type[PdfDocumentBackend]:
if request.pdf_backend == PdfBackend.DLPARSE_V1:
backend: type[PdfDocumentBackend] = DoclingParseDocumentBackend
elif request.pdf_backend == PdfBackend.DLPARSE_V2:
backend = DoclingParseV2DocumentBackend
elif request.pdf_backend == PdfBackend.DLPARSE_V4:
backend = DoclingParseV4DocumentBackend
elif request.pdf_backend == PdfBackend.PYPDFIUM2:
backend = PyPdfiumDocumentBackend
else:
raise RuntimeError(f"Unexpected PDF backend type {request.pdf_backend}")
return backend
def _parse_vlm_pdf_opts(
request: ConvertDocumentsOptions, artifacts_path: Optional[Path]
) -> VlmPipelineOptions:
pipeline_options = VlmPipelineOptions(
artifacts_path=artifacts_path,
document_timeout=request.document_timeout,
)
pipeline_options.vlm_options = smoldocling_vlm_conversion_options
if sys.platform == "darwin":
try:
import mlx_vlm # noqa: F401
pipeline_options.vlm_options = smoldocling_vlm_mlx_conversion_options
except ImportError:
_log.warning(
"To run SmolDocling faster, please install mlx-vlm:\n"
"pip install mlx-vlm"
)
return pipeline_options
# Computes the PDF pipeline options and returns the PdfFormatOption and its hash
def get_pdf_pipeline_opts(
request: ConvertDocumentsOptions,
) -> PdfFormatOption:
artifacts_path: Optional[Path] = None
if docling_serve_settings.artifacts_path is not None:
if str(docling_serve_settings.artifacts_path.absolute()) == "":
_log.info(
"artifacts_path is an empty path, model weights will be downloaded "
"at runtime."
)
artifacts_path = None
elif docling_serve_settings.artifacts_path.is_dir():
_log.info(
"artifacts_path is set to a valid directory. "
"No model weights will be downloaded at runtime."
)
artifacts_path = docling_serve_settings.artifacts_path
else:
_log.warning(
"artifacts_path is set to an invalid directory. "
"The system will download the model weights at runtime."
)
artifacts_path = None
else:
_log.info(
"artifacts_path is unset. "
"The system will download the model weights at runtime."
)
pipeline_options: Union[PdfPipelineOptions, VlmPipelineOptions]
if request.pipeline == ProcessingPipeline.STANDARD:
pipeline_options = _parse_standard_pdf_opts(request, artifacts_path)
backend = _parse_backend(request)
pdf_format_option = PdfFormatOption(
pipeline_options=pipeline_options,
backend=backend,
)
elif request.pipeline == ProcessingPipeline.VLM:
pipeline_options = _parse_vlm_pdf_opts(request, artifacts_path)
pdf_format_option = PdfFormatOption(
pipeline_cls=VlmPipeline, pipeline_options=pipeline_options
)
else:
raise NotImplementedError(
f"The pipeline {request.pipeline} is not implemented."
)
return pdf_format_option
def convert_documents(
sources: Iterable[Union[Path, str, DocumentStream]],
options: ConvertDocumentsOptions,
headers: Optional[dict[str, Any]] = None,
):
pdf_format_option = get_pdf_pipeline_opts(options)
converter = get_converter(pdf_format_option)
results: Iterator[ConversionResult] = converter.convert_all(
sources,
headers=headers,
page_range=options.page_range,
max_file_size=docling_serve_settings.max_file_size,
max_num_pages=docling_serve_settings.max_num_pages,
)
return results

View File

@@ -1,137 +0,0 @@
# ruff: noqa: E402, UP006, UP035
from typing import Any, Dict, List
from kfp import dsl
PYTHON_BASE_IMAGE = "python:3.12"
@dsl.component(
base_image=PYTHON_BASE_IMAGE,
packages_to_install=[
"pydantic",
"docling-serve @ git+https://github.com/docling-project/docling-serve@feat-kfp-engine",
],
pip_index_urls=["https://download.pytorch.org/whl/cpu", "https://pypi.org/simple"],
)
def generate_chunks(
run_name: str,
request: Dict[str, Any],
batch_size: int,
callbacks: List[Dict[str, Any]],
) -> List[List[Dict[str, Any]]]:
from pydantic import TypeAdapter
from docling_serve.datamodel.callback import (
ProgressCallbackRequest,
ProgressSetNumDocs,
)
from docling_serve.datamodel.kfp import CallbackSpec
from docling_serve.engines.async_kfp.notify import notify_callbacks
CallbacksListType = TypeAdapter(list[CallbackSpec])
sources = request["http_sources"]
splits = [sources[i : i + batch_size] for i in range(0, len(sources), batch_size)]
total = sum(len(chunk) for chunk in splits)
payload = ProgressCallbackRequest(
task_id=run_name, progress=ProgressSetNumDocs(num_docs=total)
)
notify_callbacks(
payload=payload,
callbacks=CallbacksListType.validate_python(callbacks),
)
return splits
@dsl.component(
base_image=PYTHON_BASE_IMAGE,
packages_to_install=[
"pydantic",
"docling-serve @ git+https://github.com/docling-project/docling-serve@feat-kfp-engine",
],
pip_index_urls=["https://download.pytorch.org/whl/cpu", "https://pypi.org/simple"],
)
def convert_batch(
run_name: str,
data_splits: List[Dict[str, Any]],
request: Dict[str, Any],
callbacks: List[Dict[str, Any]],
output_path: dsl.OutputPath("Directory"), # type: ignore
):
from pathlib import Path
from pydantic import AnyUrl, TypeAdapter
from docling_serve.datamodel.callback import (
FailedDocsItem,
ProgressCallbackRequest,
ProgressUpdateProcessed,
SucceededDocsItem,
)
from docling_serve.datamodel.convert import ConvertDocumentsOptions
from docling_serve.datamodel.kfp import CallbackSpec
from docling_serve.datamodel.requests import HttpSource
from docling_serve.engines.async_kfp.notify import notify_callbacks
CallbacksListType = TypeAdapter(list[CallbackSpec])
convert_options = ConvertDocumentsOptions.model_validate(request["options"])
print(convert_options)
output_dir = Path(output_path)
output_dir.mkdir(exist_ok=True, parents=True)
docs_succeeded: list[SucceededDocsItem] = []
docs_failed: list[FailedDocsItem] = []
for source_dict in data_splits:
source = HttpSource.model_validate(source_dict)
filename = Path(str(AnyUrl(source.url).path)).name
output_filename = output_dir / filename
print(f"Writing {output_filename}")
with output_filename.open("w") as f:
f.write(source.model_dump_json())
docs_succeeded.append(SucceededDocsItem(source=source.url))
payload = ProgressCallbackRequest(
task_id=run_name,
progress=ProgressUpdateProcessed(
num_failed=len(docs_failed),
num_processed=len(docs_succeeded) + len(docs_failed),
num_succeeded=len(docs_succeeded),
docs_succeeded=docs_succeeded,
docs_failed=docs_failed,
),
)
print(payload)
notify_callbacks(
payload=payload,
callbacks=CallbacksListType.validate_python(callbacks),
)
@dsl.pipeline()
def process(
batch_size: int,
request: Dict[str, Any],
callbacks: List[Dict[str, Any]] = [],
run_name: str = "",
):
chunks_task = generate_chunks(
run_name=run_name,
request=request,
batch_size=batch_size,
callbacks=callbacks,
)
chunks_task.set_caching_options(False)
with dsl.ParallelFor(chunks_task.output, parallelism=4) as data_splits:
convert_batch(
run_name=run_name,
data_splits=data_splits,
request=request,
callbacks=callbacks,
)

View File

@@ -1,32 +0,0 @@
import ssl
import certifi
import httpx
from docling_serve.datamodel.callback import ProgressCallbackRequest
from docling_serve.datamodel.kfp import CallbackSpec
def notify_callbacks(
payload: ProgressCallbackRequest,
callbacks: list[CallbackSpec],
):
if len(callbacks) == 0:
return
for callback in callbacks:
# https://www.python-httpx.org/advanced/ssl/#configuring-client-instances
if callback.ca_cert:
ctx = ssl.create_default_context(cadata=callback.ca_cert)
else:
ctx = ssl.create_default_context(cafile=certifi.where())
try:
httpx.post(
str(callback.url),
headers=callback.headers,
json=payload.model_dump(mode="json"),
verify=ctx,
)
except httpx.HTTPError as err:
print(f"Error notifying callback {callback.url}: {err}")

View File

@@ -1,235 +0,0 @@
import datetime
import json
import logging
import uuid
from pathlib import Path
from typing import Optional
from kfp_server_api.models import V2beta1RuntimeState
from pydantic import BaseModel, TypeAdapter
from pydantic_settings import SettingsConfigDict
from docling_serve.datamodel.callback import (
ProgressCallbackRequest,
ProgressSetNumDocs,
ProgressUpdateProcessed,
)
from docling_serve.datamodel.convert import ConvertDocumentsOptions
from docling_serve.datamodel.engines import TaskStatus
from docling_serve.datamodel.kfp import CallbackSpec
from docling_serve.datamodel.requests import HttpSource
from docling_serve.datamodel.task import Task, TaskSource
from docling_serve.datamodel.task_meta import TaskProcessingMeta
from docling_serve.engines.async_kfp.kfp_pipeline import process
from docling_serve.engines.async_orchestrator import (
BaseAsyncOrchestrator,
ProgressInvalid,
)
from docling_serve.settings import docling_serve_settings
_log = logging.getLogger(__name__)
class _RunItem(BaseModel):
model_config = SettingsConfigDict(arbitrary_types_allowed=True)
run_id: str
state: str
created_at: datetime.datetime
scheduled_at: datetime.datetime
finished_at: datetime.datetime
class AsyncKfpOrchestrator(BaseAsyncOrchestrator):
def __init__(self):
super().__init__()
import kfp
kfp_endpoint = docling_serve_settings.eng_kfp_endpoint
if kfp_endpoint is None:
raise ValueError("KFP endpoint is required when using the KFP engine.")
kube_sa_token_path = Path("/run/secrets/kubernetes.io/serviceaccount/token")
kube_sa_ca_cert_path = Path(
"/run/secrets/kubernetes.io/serviceaccount/service-ca.crt"
)
ssl_ca_cert = docling_serve_settings.eng_kfp_ca_cert_path
token = docling_serve_settings.eng_kfp_token
if (
ssl_ca_cert is None
and ".svc" in kfp_endpoint.host
and kube_sa_ca_cert_path.exists()
):
ssl_ca_cert = str(kube_sa_ca_cert_path)
if token is None and kube_sa_token_path.exists():
token = kube_sa_token_path.read_text()
self._client = kfp.Client(
host=str(kfp_endpoint),
existing_token=token,
ssl_ca_cert=ssl_ca_cert,
# verify_ssl=False,
)
async def enqueue(
self, sources: list[TaskSource], options: ConvertDocumentsOptions
) -> Task:
callbacks = []
if docling_serve_settings.eng_kfp_self_callback_endpoint is not None:
headers = {}
if docling_serve_settings.eng_kfp_self_callback_token_path is not None:
token = (
docling_serve_settings.eng_kfp_self_callback_token_path.read_text()
)
headers["Authorization"] = f"Bearer {token}"
ca_cert = ""
if docling_serve_settings.eng_kfp_self_callback_ca_cert_path is not None:
ca_cert = docling_serve_settings.eng_kfp_self_callback_ca_cert_path.read_text()
callbacks.append(
CallbackSpec(
url=docling_serve_settings.eng_kfp_self_callback_endpoint,
headers=headers,
ca_cert=ca_cert,
)
)
CallbacksType = TypeAdapter(list[CallbackSpec])
SourcesListType = TypeAdapter(list[HttpSource])
http_sources = [s for s in sources if isinstance(s, HttpSource)]
# hack: since the current kfp backend is not resolving the job_id placeholder,
# we set the run_name and pass it as argument to the job itself.
run_name = f"docling-job-{uuid.uuid4()}"
kfp_run = self._client.create_run_from_pipeline_func(
process,
arguments={
"batch_size": 10,
"sources": SourcesListType.dump_python(http_sources, mode="json"),
"options": options.model_dump(mode="json"),
"callbacks": CallbacksType.dump_python(callbacks, mode="json"),
"run_name": run_name,
},
run_name=run_name,
)
task_id = kfp_run.run_id
task = Task(task_id=task_id, sources=sources, options=options)
await self.init_task_tracking(task)
return task
async def _update_task_from_run(self, task_id: str, wait: float = 0.0):
run_info = self._client.get_run(run_id=task_id)
task = await self.get_raw_task(task_id=task_id)
# RUNTIME_STATE_UNSPECIFIED = "RUNTIME_STATE_UNSPECIFIED"
# PENDING = "PENDING"
# RUNNING = "RUNNING"
# SUCCEEDED = "SUCCEEDED"
# SKIPPED = "SKIPPED"
# FAILED = "FAILED"
# CANCELING = "CANCELING"
# CANCELED = "CANCELED"
# PAUSED = "PAUSED"
if run_info.state == V2beta1RuntimeState.SUCCEEDED:
task.set_status(TaskStatus.SUCCESS)
elif run_info.state == V2beta1RuntimeState.PENDING:
task.set_status(TaskStatus.PENDING)
elif run_info.state == V2beta1RuntimeState.RUNNING:
task.set_status(TaskStatus.STARTED)
else:
task.set_status(TaskStatus.FAILURE)
async def task_status(self, task_id: str, wait: float = 0.0) -> Task:
await self._update_task_from_run(task_id=task_id, wait=wait)
return await self.get_raw_task(task_id=task_id)
async def _get_pending(self) -> list[_RunItem]:
runs: list[_RunItem] = []
next_page: Optional[str] = None
while True:
res = self._client.list_runs(
page_token=next_page,
page_size=20,
filter=json.dumps(
{
"predicates": [
{
"operation": "EQUALS",
"key": "state",
"stringValue": "PENDING",
}
]
}
),
)
if res.runs is not None:
for run in res.runs:
runs.append(
_RunItem(
run_id=run.run_id,
state=run.state,
created_at=run.created_at,
scheduled_at=run.scheduled_at,
finished_at=run.finished_at,
)
)
if res.next_page_token is None:
break
next_page = res.next_page_token
return runs
async def queue_size(self) -> int:
runs = await self._get_pending()
return len(runs)
async def get_queue_position(self, task_id: str) -> Optional[int]:
runs = await self._get_pending()
for pos, run in enumerate(runs, start=1):
if run.run_id == task_id:
return pos
return None
async def process_queue(self):
return
async def warm_up_caches(self):
return
async def _get_run_id(self, run_name: str) -> str:
res = self._client.list_runs(
filter=json.dumps(
{
"predicates": [
{
"operation": "EQUALS",
"key": "name",
"stringValue": run_name,
}
]
}
),
)
if res.runs is not None and len(res.runs) > 0:
return res.runs[0].run_id
raise RuntimeError(f"Run with {run_name=} not found.")
async def receive_task_progress(self, request: ProgressCallbackRequest):
task_id = await self._get_run_id(run_name=request.task_id)
progress = request.progress
task = await self.get_raw_task(task_id=task_id)
if isinstance(progress, ProgressSetNumDocs):
task.processing_meta = TaskProcessingMeta(num_docs=progress.num_docs)
task.task_status = TaskStatus.STARTED
elif isinstance(progress, ProgressUpdateProcessed):
if task.processing_meta is None:
raise ProgressInvalid(
"UpdateProcessed was called before setting the expected number of documents."
)
task.processing_meta.num_processed += progress.num_processed
task.processing_meta.num_succeeded += progress.num_succeeded
task.processing_meta.num_failed += progress.num_failed
task.task_status = TaskStatus.STARTED
# TODO: could be moved to BackgroundTask
await self.notify_task_subscribers(task_id=task_id)

View File

@@ -1,60 +0,0 @@
import asyncio
import logging
import uuid
from typing import Optional
from docling.datamodel.base_models import InputFormat
from docling_serve.datamodel.convert import ConvertDocumentsOptions
from docling_serve.datamodel.task import Task, TaskSource
from docling_serve.docling_conversion import get_converter, get_pdf_pipeline_opts
from docling_serve.engines.async_local.worker import AsyncLocalWorker
from docling_serve.engines.async_orchestrator import BaseAsyncOrchestrator
from docling_serve.settings import docling_serve_settings
_log = logging.getLogger(__name__)
class AsyncLocalOrchestrator(BaseAsyncOrchestrator):
def __init__(self):
super().__init__()
self.task_queue = asyncio.Queue()
self.queue_list: list[str] = []
async def enqueue(
self, sources: list[TaskSource], options: ConvertDocumentsOptions
) -> Task:
task_id = str(uuid.uuid4())
task = Task(task_id=task_id, sources=sources, options=options)
await self.init_task_tracking(task)
self.queue_list.append(task_id)
await self.task_queue.put(task_id)
return task
async def queue_size(self) -> int:
return self.task_queue.qsize()
async def get_queue_position(self, task_id: str) -> Optional[int]:
return (
self.queue_list.index(task_id) + 1 if task_id in self.queue_list else None
)
async def process_queue(self):
# Create a pool of workers
workers = []
for i in range(docling_serve_settings.eng_loc_num_workers):
_log.debug(f"Starting worker {i}")
w = AsyncLocalWorker(i, self)
worker_task = asyncio.create_task(w.loop())
workers.append(worker_task)
# Wait for all workers to complete (they won't, as they run indefinitely)
await asyncio.gather(*workers)
_log.debug("All workers completed.")
async def warm_up_caches(self):
# Converter with default options
pdf_format_option = get_pdf_pipeline_opts(ConvertDocumentsOptions())
converter = get_converter(pdf_format_option)
converter.initialize_pipeline(InputFormat.PDF)

View File

@@ -1,124 +0,0 @@
import asyncio
import logging
import shutil
import time
from typing import TYPE_CHECKING, Any, Optional, Union
from fastapi.responses import FileResponse
from docling.datamodel.base_models import DocumentStream
from docling_serve.datamodel.engines import TaskStatus
from docling_serve.datamodel.requests import FileSource, HttpSource
from docling_serve.docling_conversion import convert_documents
from docling_serve.response_preparation import process_results
from docling_serve.storage import get_scratch
if TYPE_CHECKING:
from docling_serve.engines.async_local.orchestrator import AsyncLocalOrchestrator
_log = logging.getLogger(__name__)
class AsyncLocalWorker:
def __init__(self, worker_id: int, orchestrator: "AsyncLocalOrchestrator"):
self.worker_id = worker_id
self.orchestrator = orchestrator
async def loop(self):
_log.debug(f"Starting loop for worker {self.worker_id}")
while True:
task_id: str = await self.orchestrator.task_queue.get()
self.orchestrator.queue_list.remove(task_id)
if task_id not in self.orchestrator.tasks:
raise RuntimeError(f"Task {task_id} not found.")
task = self.orchestrator.tasks[task_id]
try:
task.set_status(TaskStatus.STARTED)
_log.info(f"Worker {self.worker_id} processing task {task_id}")
# Notify clients about task updates
await self.orchestrator.notify_task_subscribers(task_id)
# Notify clients about queue updates
await self.orchestrator.notify_queue_positions()
# Define a callback function to send progress updates to the client.
# TODO: send partial updates, e.g. when a document in the batch is done
def run_conversion():
convert_sources: list[Union[str, DocumentStream]] = []
headers: Optional[dict[str, Any]] = None
for source in task.sources:
if isinstance(source, DocumentStream):
convert_sources.append(source)
elif isinstance(source, FileSource):
convert_sources.append(source.to_document_stream())
elif isinstance(source, HttpSource):
convert_sources.append(str(source.url))
if headers is None and source.headers:
headers = source.headers
# Note: results are only an iterator->lazy evaluation
results = convert_documents(
sources=convert_sources,
options=task.options,
headers=headers,
)
# The real processing will happen here
work_dir = get_scratch() / task_id
response = process_results(
conversion_options=task.options,
conv_results=results,
work_dir=work_dir,
)
if work_dir.exists():
task.scratch_dir = work_dir
if not isinstance(response, FileResponse):
_log.warning(
f"Task {task_id=} produced content in {work_dir=} but the response is not a file."
)
shutil.rmtree(work_dir, ignore_errors=True)
return response
start_time = time.monotonic()
# Run the prediction in a thread to avoid blocking the event loop.
# Get the current event loop
# loop = asyncio.get_event_loop()
# future = asyncio.run_coroutine_threadsafe(
# run_conversion(),
# loop=loop
# )
# response = future.result()
# Run in a thread
response = await asyncio.to_thread(
run_conversion,
)
processing_time = time.monotonic() - start_time
task.result = response
task.sources = []
task.options = None
task.set_status(TaskStatus.SUCCESS)
_log.info(
f"Worker {self.worker_id} completed job {task_id} "
f"in {processing_time:.2f} seconds"
)
except Exception as e:
_log.error(
f"Worker {self.worker_id} failed to process job {task_id}: {e}"
)
task.set_status(TaskStatus.FAILURE)
finally:
await self.orchestrator.notify_task_subscribers(task_id)
self.orchestrator.task_queue.task_done()
_log.debug(f"Worker {self.worker_id} completely done with {task_id}")

View File

@@ -1,127 +0,0 @@
import asyncio
import datetime
import logging
import shutil
from typing import Union
from fastapi import BackgroundTasks, WebSocket
from fastapi.responses import FileResponse
from docling_serve.datamodel.callback import ProgressCallbackRequest
from docling_serve.datamodel.engines import TaskStatus
from docling_serve.datamodel.responses import (
ConvertDocumentResponse,
MessageKind,
TaskStatusResponse,
WebsocketMessage,
)
from docling_serve.datamodel.task import Task
from docling_serve.engines.base_orchestrator import (
BaseOrchestrator,
OrchestratorError,
TaskNotFoundError,
)
from docling_serve.settings import docling_serve_settings
_log = logging.getLogger(__name__)
class ProgressInvalid(OrchestratorError):
pass
class BaseAsyncOrchestrator(BaseOrchestrator):
def __init__(self):
self.tasks: dict[str, Task] = {}
self.task_subscribers: dict[str, set[WebSocket]] = {}
async def init_task_tracking(self, task: Task):
task_id = task.task_id
self.tasks[task.task_id] = task
self.task_subscribers[task_id] = set()
async def get_raw_task(self, task_id: str) -> Task:
if task_id not in self.tasks:
raise TaskNotFoundError()
return self.tasks[task_id]
async def task_status(self, task_id: str, wait: float = 0.0) -> Task:
return await self.get_raw_task(task_id=task_id)
async def task_result(
self, task_id: str, background_tasks: BackgroundTasks
) -> Union[ConvertDocumentResponse, FileResponse, None]:
try:
task = await self.get_raw_task(task_id=task_id)
if task.is_completed() and docling_serve_settings.single_use_results:
if task.scratch_dir is not None:
background_tasks.add_task(
shutil.rmtree, task.scratch_dir, ignore_errors=True
)
async def _remove_task_impl():
await asyncio.sleep(docling_serve_settings.result_removal_delay)
await self.delete_task(task_id=task.task_id)
async def _remove_task():
asyncio.create_task(_remove_task_impl()) # noqa: RUF006
background_tasks.add_task(_remove_task)
return task.result
except TaskNotFoundError:
return None
async def delete_task(self, task_id: str):
_log.info(f"Deleting {task_id=}")
if task_id in self.task_subscribers:
for websocket in self.task_subscribers[task_id]:
await websocket.close()
del self.task_subscribers[task_id]
if task_id in self.tasks:
del self.tasks[task_id]
async def clear_results(self, older_than: float = 0.0):
cutoff_time = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(
seconds=older_than
)
tasks_to_delete = [
task_id
for task_id, task in self.tasks.items()
if task.finished_at is not None and task.finished_at < cutoff_time
]
for task_id in tasks_to_delete:
await self.delete_task(task_id=task_id)
async def notify_task_subscribers(self, task_id: str):
if task_id not in self.task_subscribers:
raise RuntimeError(f"Task {task_id} does not have a subscribers list.")
task = await self.get_raw_task(task_id=task_id)
task_queue_position = await self.get_queue_position(task_id)
msg = TaskStatusResponse(
task_id=task.task_id,
task_status=task.task_status,
task_position=task_queue_position,
task_meta=task.processing_meta,
)
for websocket in self.task_subscribers[task_id]:
await websocket.send_text(
WebsocketMessage(message=MessageKind.UPDATE, task=msg).model_dump_json()
)
if task.is_completed():
await websocket.close()
async def notify_queue_positions(self):
for task_id in self.task_subscribers.keys():
# notify only pending tasks
if self.tasks[task_id].task_status != TaskStatus.PENDING:
continue
await self.notify_task_subscribers(task_id)
async def receive_task_progress(self, request: ProgressCallbackRequest):
raise NotImplementedError()

View File

@@ -1,21 +0,0 @@
from functools import lru_cache
from docling_serve.datamodel.engines import AsyncEngine
from docling_serve.engines.async_orchestrator import BaseAsyncOrchestrator
from docling_serve.settings import docling_serve_settings
@lru_cache
def get_async_orchestrator() -> BaseAsyncOrchestrator:
if docling_serve_settings.eng_kind == AsyncEngine.LOCAL:
from docling_serve.engines.async_local.orchestrator import (
AsyncLocalOrchestrator,
)
return AsyncLocalOrchestrator()
elif docling_serve_settings.eng_kind == AsyncEngine.KFP:
from docling_serve.engines.async_kfp.orchestrator import AsyncKfpOrchestrator
return AsyncKfpOrchestrator()
raise RuntimeError(f"Engine {docling_serve_settings.eng_kind} not recognized.")

View File

@@ -1,55 +0,0 @@
from abc import ABC, abstractmethod
from typing import Optional, Union
from fastapi import BackgroundTasks
from fastapi.responses import FileResponse
from docling_serve.datamodel.convert import ConvertDocumentsOptions
from docling_serve.datamodel.responses import ConvertDocumentResponse
from docling_serve.datamodel.task import Task, TaskSource
class OrchestratorError(Exception):
pass
class TaskNotFoundError(OrchestratorError):
pass
class BaseOrchestrator(ABC):
@abstractmethod
async def enqueue(
self, sources: list[TaskSource], options: ConvertDocumentsOptions
) -> Task:
pass
@abstractmethod
async def queue_size(self) -> int:
pass
@abstractmethod
async def get_queue_position(self, task_id: str) -> Optional[int]:
pass
@abstractmethod
async def task_status(self, task_id: str, wait: float = 0.0) -> Task:
pass
@abstractmethod
async def task_result(
self, task_id: str, background_tasks: BackgroundTasks
) -> Union[ConvertDocumentResponse, FileResponse, None]:
pass
@abstractmethod
async def clear_results(self, older_than: float = 0.0):
pass
@abstractmethod
async def process_queue(self):
pass
@abstractmethod
async def warm_up_caches(self):
pass

View File

@@ -0,0 +1,52 @@
from functools import lru_cache
from docling_jobkit.orchestrators.base_orchestrator import BaseOrchestrator
from docling_serve.settings import AsyncEngine, docling_serve_settings
@lru_cache
def get_async_orchestrator() -> BaseOrchestrator:
if docling_serve_settings.eng_kind == AsyncEngine.LOCAL:
from docling_jobkit.convert.manager import (
DoclingConverterManager,
DoclingConverterManagerConfig,
)
from docling_jobkit.orchestrators.local.orchestrator import (
LocalOrchestrator,
LocalOrchestratorConfig,
)
local_config = LocalOrchestratorConfig(
num_workers=docling_serve_settings.eng_loc_num_workers,
)
cm_config = DoclingConverterManagerConfig(
artifacts_path=docling_serve_settings.artifacts_path,
options_cache_size=docling_serve_settings.options_cache_size,
enable_remote_services=docling_serve_settings.enable_remote_services,
allow_external_plugins=docling_serve_settings.allow_external_plugins,
max_num_pages=docling_serve_settings.max_num_pages,
max_file_size=docling_serve_settings.max_file_size,
)
cm = DoclingConverterManager(config=cm_config)
return LocalOrchestrator(config=local_config, converter_manager=cm)
elif docling_serve_settings.eng_kind == AsyncEngine.KFP:
from docling_jobkit.orchestrators.kfp.orchestrator import (
KfpOrchestrator,
KfpOrchestratorConfig,
)
kfp_config = KfpOrchestratorConfig(
endpoint=docling_serve_settings.eng_kfp_endpoint,
token=docling_serve_settings.eng_kfp_token,
ca_cert_path=docling_serve_settings.eng_kfp_ca_cert_path,
self_callback_endpoint=docling_serve_settings.eng_kfp_self_callback_endpoint,
self_callback_token_path=docling_serve_settings.eng_kfp_self_callback_token_path,
self_callback_ca_cert_path=docling_serve_settings.eng_kfp_self_callback_ca_cert_path,
)
return KfpOrchestrator(config=kfp_config)
raise RuntimeError(f"Engine {docling_serve_settings.eng_kind} not recognized.")

View File

@@ -1,3 +1,4 @@
import asyncio
import logging
import os
import shutil
@@ -6,15 +7,21 @@ from collections.abc import Iterable
from pathlib import Path
from typing import Union
from fastapi import HTTPException
from fastapi import BackgroundTasks, HTTPException
from fastapi.responses import FileResponse
from docling.datamodel.base_models import OutputFormat
from docling.datamodel.document import ConversionResult, ConversionStatus
from docling_core.types.doc import ImageRefMode
from docling_jobkit.datamodel.convert import ConvertDocumentsOptions
from docling_jobkit.datamodel.task import Task
from docling_jobkit.orchestrators.base_orchestrator import (
BaseOrchestrator,
)
from docling_serve.datamodel.convert import ConvertDocumentsOptions
from docling_serve.datamodel.responses import ConvertDocumentResponse, DocumentResponse
from docling_serve.settings import docling_serve_settings
from docling_serve.storage import get_scratch
_log = logging.getLogger(__name__)
@@ -118,7 +125,7 @@ def _export_documents_as_files(
if export_doctags:
fname = output_dir / f"{doc_filename}.doctags"
_log.info(f"writing Doc Tags output to {fname}")
conv_res.document.save_as_document_tokens(filename=fname)
conv_res.document.save_as_doctags(filename=fname)
else:
_log.warning(f"Document {conv_res.input.file} failed to convert.")
@@ -230,3 +237,46 @@ def process_results(
)
return response
async def prepare_response(
task: Task, orchestrator: BaseOrchestrator, background_tasks: BackgroundTasks
):
if task.results is None:
raise HTTPException(
status_code=404,
detail="Task result not found. Please wait for a completion status.",
)
assert task.options is not None
work_dir = get_scratch() / task.task_id
response = process_results(
conversion_options=task.options,
conv_results=task.results,
work_dir=work_dir,
)
if work_dir.exists():
task.scratch_dir = work_dir
if not isinstance(response, FileResponse):
_log.warning(
f"Task {task.task_id=} produced content in {work_dir=} but the response is not a file."
)
shutil.rmtree(work_dir, ignore_errors=True)
if docling_serve_settings.single_use_results:
if task.scratch_dir is not None:
background_tasks.add_task(
shutil.rmtree, task.scratch_dir, ignore_errors=True
)
async def _remove_task_impl():
await asyncio.sleep(docling_serve_settings.result_removal_delay)
await orchestrator.delete_task(task_id=task.task_id)
async def _remove_task():
asyncio.create_task(_remove_task_impl()) # noqa: RUF006
background_tasks.add_task(_remove_task)
return response

View File

@@ -1,3 +1,4 @@
import enum
import sys
from pathlib import Path
from typing import Optional, Union
@@ -6,8 +7,6 @@ from pydantic import AnyUrl, model_validator
from pydantic_settings import BaseSettings, SettingsConfigDict
from typing_extensions import Self
from docling_serve.datamodel.engines import AsyncEngine
class UvicornSettings(BaseSettings):
model_config = SettingsConfigDict(
@@ -26,6 +25,11 @@ class UvicornSettings(BaseSettings):
workers: Union[int, None] = None
class AsyncEngine(str, enum.Enum):
LOCAL = "local"
KFP = "kfp"
class DoclingServeSettings(BaseSettings):
model_config = SettingsConfigDict(
env_prefix="DOCLING_SERVE_",

View File

@@ -0,0 +1,54 @@
from fastapi import WebSocket
from docling_jobkit.datamodel.task_meta import TaskStatus
from docling_jobkit.orchestrators.base_notifier import BaseNotifier
from docling_jobkit.orchestrators.base_orchestrator import BaseOrchestrator
from docling_serve.datamodel.responses import (
MessageKind,
TaskStatusResponse,
WebsocketMessage,
)
class WebsocketNotifier(BaseNotifier):
def __init__(self, orchestrator: BaseOrchestrator):
super().__init__(orchestrator)
self.task_subscribers: dict[str, set[WebSocket]] = {}
async def add_task(self, task_id: str):
self.task_subscribers[task_id] = set()
async def remove_task(self, task_id: str):
if task_id in self.task_subscribers:
for websocket in self.task_subscribers[task_id]:
await websocket.close()
del self.task_subscribers[task_id]
async def notify_task_subscribers(self, task_id: str):
if task_id not in self.task_subscribers:
raise RuntimeError(f"Task {task_id} does not have a subscribers list.")
task = await self.orchestrator.get_raw_task(task_id=task_id)
task_queue_position = await self.orchestrator.get_queue_position(task_id)
msg = TaskStatusResponse(
task_id=task.task_id,
task_status=task.task_status,
task_position=task_queue_position,
task_meta=task.processing_meta,
)
for websocket in self.task_subscribers[task_id]:
await websocket.send_text(
WebsocketMessage(message=MessageKind.UPDATE, task=msg).model_dump_json()
)
if task.is_completed():
await websocket.close()
async def notify_queue_positions(self):
for task_id in self.task_subscribers.keys():
# notify only pending tasks
if self.orchestrator.tasks[task_id].task_status != TaskStatus.PENDING:
continue
await self.notify_task_subscribers(task_id)

View File

@@ -23,7 +23,7 @@ readme = "README.md"
classifiers = [
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
# "Development Status :: 5 - Production/Stable",
"Development Status :: 5 - Production/Stable",
"Intended Audience :: Developers",
"Typing :: Typed",
"Programming Language :: Python :: 3",
@@ -34,12 +34,11 @@ classifiers = [
]
requires-python = ">=3.10"
dependencies = [
"docling[vlm]~=2.38",
"docling~=2.38",
"docling-core>=2.32.0",
"mlx-vlm~=0.1.12; sys_platform == 'darwin' and platform_machine == 'arm64'",
"docling-jobkit[kfp,vlm]~=1.0",
"fastapi[standard]~=0.115",
"httpx~=0.28",
"kfp[kubernetes]>=2.10.0",
"pydantic~=2.10",
"pydantic-settings~=2.4",
"python-multipart>=0.0.14,<0.1.0",
@@ -223,7 +222,7 @@ ignore = [
max-complexity = 15
[tool.ruff.lint.isort.sections]
"docling" = ["docling", "docling_core"]
"docling" = ["docling", "docling_core", "docling_jobkit"]
[tool.ruff.lint.isort]
combine-as-imports = true

View File

@@ -1,54 +0,0 @@
from docling_serve.datamodel.convert import (
ConvertDocumentsOptions,
PictureDescriptionApi,
)
from docling_serve.docling_conversion import (
_hash_pdf_format_option,
get_pdf_pipeline_opts,
)
def test_options_cache_key():
hashes = set()
opts = ConvertDocumentsOptions()
pipeline_opts = get_pdf_pipeline_opts(opts)
hash = _hash_pdf_format_option(pipeline_opts)
assert hash not in hashes
hashes.add(hash)
opts.do_picture_description = True
pipeline_opts = get_pdf_pipeline_opts(opts)
hash = _hash_pdf_format_option(pipeline_opts)
# pprint(pipeline_opts.pipeline_options.model_dump(serialize_as_any=True))
assert hash not in hashes
hashes.add(hash)
opts.picture_description_api = PictureDescriptionApi(
url="http://localhost",
params={"model": "mymodel"},
prompt="Hello 1",
)
pipeline_opts = get_pdf_pipeline_opts(opts)
hash = _hash_pdf_format_option(pipeline_opts)
# pprint(pipeline_opts.pipeline_options.model_dump(serialize_as_any=True))
assert hash not in hashes
hashes.add(hash)
opts.picture_description_api = PictureDescriptionApi(
url="http://localhost",
params={"model": "your-model"},
prompt="Hello 1",
)
pipeline_opts = get_pdf_pipeline_opts(opts)
hash = _hash_pdf_format_option(pipeline_opts)
# pprint(pipeline_opts.pipeline_options.model_dump(serialize_as_any=True))
assert hash not in hashes
hashes.add(hash)
opts.picture_description_api.prompt = "World"
pipeline_opts = get_pdf_pipeline_opts(opts)
hash = _hash_pdf_format_option(pipeline_opts)
# pprint(pipeline_opts.pipeline_options.model_dump(serialize_as_any=True))
assert hash not in hashes
hashes.add(hash)

860
uv.lock generated

File diff suppressed because one or more lines are too long