feat: Add RQ engine (#315)

Signed-off-by: Michele Dolfi <dol@zurich.ibm.com>
This commit is contained in:
Michele Dolfi
2025-08-14 08:48:31 +02:00
committed by GitHub
parent d584895e11
commit 885f319d3a
10 changed files with 935 additions and 989 deletions

View File

@@ -5,6 +5,8 @@ async
(?i)urls (?i)urls
uvicorn uvicorn
[Ww]ebserver [Ww]ebserver
RQ
(?i)url
keyfile keyfile
[Ww]ebsocket(s?) [Ww]ebsocket(s?)
[Kk]ubernetes [Kk]ubernetes

View File

@@ -11,6 +11,7 @@ import uvicorn
from rich.console import Console from rich.console import Console
from docling_serve.settings import docling_serve_settings, uvicorn_settings from docling_serve.settings import docling_serve_settings, uvicorn_settings
from docling_serve.storage import get_scratch
warnings.filterwarnings(action="ignore", category=UserWarning, module="pydantic|torch") warnings.filterwarnings(action="ignore", category=UserWarning, module="pydantic|torch")
warnings.filterwarnings(action="ignore", category=FutureWarning, module="easyocr") warnings.filterwarnings(action="ignore", category=FutureWarning, module="easyocr")
@@ -361,6 +362,37 @@ def run(
) )
@app.command()
def rq_worker() -> Any:
"""
Run the [bold]Docling JobKit[/bold] RQ worker.
"""
from docling_jobkit.convert.manager import DoclingConverterManagerConfig
from docling_jobkit.orchestrators.rq.orchestrator import RQOrchestratorConfig
from docling_jobkit.orchestrators.rq.worker import run_worker
rq_config = RQOrchestratorConfig(
redis_url=docling_serve_settings.eng_rq_redis_url,
results_prefix=docling_serve_settings.eng_rq_results_prefix,
sub_channel=docling_serve_settings.eng_rq_sub_channel,
scratch_dir=get_scratch(),
)
cm_config = DoclingConverterManagerConfig(
artifacts_path=docling_serve_settings.artifacts_path,
options_cache_size=docling_serve_settings.options_cache_size,
enable_remote_services=docling_serve_settings.enable_remote_services,
allow_external_plugins=docling_serve_settings.allow_external_plugins,
max_num_pages=docling_serve_settings.max_num_pages,
max_file_size=docling_serve_settings.max_file_size,
)
run_worker(
rq_config=rq_config,
cm_config=cm_config,
)
def main() -> None: def main() -> None:
app() app()

View File

@@ -417,9 +417,17 @@ def create_app(): # noqa: C901
detail=f"Conversion is taking too long. The maximum wait time is configure as DOCLING_SERVE_MAX_SYNC_WAIT={docling_serve_settings.max_sync_wait}.", detail=f"Conversion is taking too long. The maximum wait time is configure as DOCLING_SERVE_MAX_SYNC_WAIT={docling_serve_settings.max_sync_wait}.",
) )
task = await orchestrator.get_raw_task(task_id=task.task_id) task_result = await orchestrator.task_result(task_id=task.task_id)
if task_result is None:
raise HTTPException(
status_code=404,
detail="Task result not found. Please wait for a completion status.",
)
response = await prepare_response( response = await prepare_response(
task=task, orchestrator=orchestrator, background_tasks=background_tasks task_id=task.task_id,
task_result=task_result,
orchestrator=orchestrator,
background_tasks=background_tasks,
) )
return response return response
@@ -457,9 +465,17 @@ def create_app(): # noqa: C901
detail=f"Conversion is taking too long. The maximum wait time is configure as DOCLING_SERVE_MAX_SYNC_WAIT={docling_serve_settings.max_sync_wait}.", detail=f"Conversion is taking too long. The maximum wait time is configure as DOCLING_SERVE_MAX_SYNC_WAIT={docling_serve_settings.max_sync_wait}.",
) )
task = await orchestrator.get_raw_task(task_id=task.task_id) task_result = await orchestrator.task_result(task_id=task.task_id)
if task_result is None:
raise HTTPException(
status_code=404,
detail="Task result not found. Please wait for a completion status.",
)
response = await prepare_response( response = await prepare_response(
task=task, orchestrator=orchestrator, background_tasks=background_tasks task_id=task.task_id,
task_result=task_result,
orchestrator=orchestrator,
background_tasks=background_tasks,
) )
return response return response
@@ -618,9 +634,17 @@ def create_app(): # noqa: C901
task_id: str, task_id: str,
): ):
try: try:
task = await orchestrator.get_raw_task(task_id=task_id) task_result = await orchestrator.task_result(task_id=task_id)
if task_result is None:
raise HTTPException(
status_code=404,
detail="Task result not found. Please wait for a completion status.",
)
response = await prepare_response( response = await prepare_response(
task=task, orchestrator=orchestrator, background_tasks=background_tasks task_id=task_id,
task_result=task_result,
orchestrator=orchestrator,
background_tasks=background_tasks,
) )
return response return response
except TaskNotFoundError: except TaskNotFoundError:

View File

@@ -5,7 +5,7 @@ from pydantic import BaseModel
from docling.datamodel.document import ConversionStatus, ErrorItem from docling.datamodel.document import ConversionStatus, ErrorItem
from docling.utils.profiling import ProfilingItem from docling.utils.profiling import ProfilingItem
from docling_core.types.doc import DoclingDocument from docling_jobkit.datamodel.result import ExportDocumentResponse
from docling_jobkit.datamodel.task_meta import TaskProcessingMeta from docling_jobkit.datamodel.task_meta import TaskProcessingMeta
@@ -18,17 +18,8 @@ class ClearResponse(BaseModel):
status: str = "ok" status: str = "ok"
class DocumentResponse(BaseModel):
filename: str
md_content: Optional[str] = None
json_content: Optional[DoclingDocument] = None
html_content: Optional[str] = None
text_content: Optional[str] = None
doctags_content: Optional[str] = None
class ConvertDocumentResponse(BaseModel): class ConvertDocumentResponse(BaseModel):
document: DocumentResponse document: ExportDocumentResponse
status: ConversionStatus status: ConversionStatus
errors: list[ErrorItem] = [] errors: list[ErrorItem] = []
processing_time: float processing_time: float
@@ -36,8 +27,10 @@ class ConvertDocumentResponse(BaseModel):
class PresignedUrlConvertDocumentResponse(BaseModel): class PresignedUrlConvertDocumentResponse(BaseModel):
status: ConversionStatus
processing_time: float processing_time: float
num_converted: int
num_succeeded: int
num_failed: int
class ConvertDocumentErrorResponse(BaseModel): class ConvertDocumentErrorResponse(BaseModel):

View File

@@ -3,6 +3,7 @@ from functools import lru_cache
from docling_jobkit.orchestrators.base_orchestrator import BaseOrchestrator from docling_jobkit.orchestrators.base_orchestrator import BaseOrchestrator
from docling_serve.settings import AsyncEngine, docling_serve_settings from docling_serve.settings import AsyncEngine, docling_serve_settings
from docling_serve.storage import get_scratch
@lru_cache @lru_cache
@@ -20,6 +21,7 @@ def get_async_orchestrator() -> BaseOrchestrator:
local_config = LocalOrchestratorConfig( local_config = LocalOrchestratorConfig(
num_workers=docling_serve_settings.eng_loc_num_workers, num_workers=docling_serve_settings.eng_loc_num_workers,
shared_models=docling_serve_settings.eng_loc_share_models, shared_models=docling_serve_settings.eng_loc_share_models,
scratch_dir=get_scratch(),
) )
cm_config = DoclingConverterManagerConfig( cm_config = DoclingConverterManagerConfig(
@@ -33,6 +35,20 @@ def get_async_orchestrator() -> BaseOrchestrator:
cm = DoclingConverterManager(config=cm_config) cm = DoclingConverterManager(config=cm_config)
return LocalOrchestrator(config=local_config, converter_manager=cm) return LocalOrchestrator(config=local_config, converter_manager=cm)
elif docling_serve_settings.eng_kind == AsyncEngine.RQ:
from docling_jobkit.orchestrators.rq.orchestrator import (
RQOrchestrator,
RQOrchestratorConfig,
)
rq_config = RQOrchestratorConfig(
redis_url=docling_serve_settings.eng_rq_redis_url,
results_prefix=docling_serve_settings.eng_rq_results_prefix,
sub_channel=docling_serve_settings.eng_rq_sub_channel,
scratch_dir=get_scratch(),
)
return RQOrchestrator(config=rq_config)
elif docling_serve_settings.eng_kind == AsyncEngine.KFP: elif docling_serve_settings.eng_kind == AsyncEngine.KFP:
from docling_jobkit.orchestrators.kfp.orchestrator import ( from docling_jobkit.orchestrators.kfp.orchestrator import (
KfpOrchestrator, KfpOrchestrator,

View File

@@ -1,317 +1,65 @@
import asyncio import asyncio
import logging import logging
import os
import shutil
import time
from collections.abc import Iterable
from pathlib import Path
from typing import Union
import httpx from fastapi import BackgroundTasks, Response
from fastapi import BackgroundTasks, HTTPException
from fastapi.responses import FileResponse
from docling.datamodel.base_models import OutputFormat from docling_jobkit.datamodel.result import (
from docling.datamodel.document import ConversionResult, ConversionStatus ConvertDocumentResult,
from docling_core.types.doc import ImageRefMode ExportResult,
from docling_jobkit.datamodel.convert import ConvertDocumentsOptions RemoteTargetResult,
from docling_jobkit.datamodel.task import Task ZipArchiveResult,
from docling_jobkit.datamodel.task_targets import InBodyTarget, PutTarget, TaskTarget )
from docling_jobkit.orchestrators.base_orchestrator import ( from docling_jobkit.orchestrators.base_orchestrator import (
BaseOrchestrator, BaseOrchestrator,
) )
from docling_serve.datamodel.responses import ( from docling_serve.datamodel.responses import (
ConvertDocumentResponse, ConvertDocumentResponse,
DocumentResponse,
PresignedUrlConvertDocumentResponse, PresignedUrlConvertDocumentResponse,
) )
from docling_serve.settings import docling_serve_settings from docling_serve.settings import docling_serve_settings
from docling_serve.storage import get_scratch
_log = logging.getLogger(__name__) _log = logging.getLogger(__name__)
def _export_document_as_content(
conv_res: ConversionResult,
export_json: bool,
export_html: bool,
export_md: bool,
export_txt: bool,
export_doctags: bool,
image_mode: ImageRefMode,
md_page_break_placeholder: str,
):
document = DocumentResponse(filename=conv_res.input.file.name)
if conv_res.status == ConversionStatus.SUCCESS:
new_doc = conv_res.document._make_copy_with_refmode(
Path(), image_mode, page_no=None
)
# Create the different formats
if export_json:
document.json_content = new_doc
if export_html:
document.html_content = new_doc.export_to_html(image_mode=image_mode)
if export_txt:
document.text_content = new_doc.export_to_markdown(
strict_text=True,
image_mode=image_mode,
)
if export_md:
document.md_content = new_doc.export_to_markdown(
image_mode=image_mode,
page_break_placeholder=md_page_break_placeholder or None,
)
if export_doctags:
document.doctags_content = new_doc.export_to_doctags()
elif conv_res.status == ConversionStatus.SKIPPED:
raise HTTPException(status_code=400, detail=conv_res.errors)
else:
raise HTTPException(status_code=500, detail=conv_res.errors)
return document
def _export_documents_as_files(
conv_results: Iterable[ConversionResult],
output_dir: Path,
export_json: bool,
export_html: bool,
export_md: bool,
export_txt: bool,
export_doctags: bool,
image_export_mode: ImageRefMode,
md_page_break_placeholder: str,
) -> ConversionStatus:
success_count = 0
failure_count = 0
# Default failure in case results is empty
conv_result = ConversionStatus.FAILURE
artifacts_dir = Path("artifacts/") # will be relative to the fname
for conv_res in conv_results:
conv_result = conv_res.status
if conv_res.status == ConversionStatus.SUCCESS:
success_count += 1
doc_filename = conv_res.input.file.stem
# Export JSON format:
if export_json:
fname = output_dir / f"{doc_filename}.json"
_log.info(f"writing JSON output to {fname}")
conv_res.document.save_as_json(
filename=fname,
image_mode=image_export_mode,
artifacts_dir=artifacts_dir,
)
# Export HTML format:
if export_html:
fname = output_dir / f"{doc_filename}.html"
_log.info(f"writing HTML output to {fname}")
conv_res.document.save_as_html(
filename=fname,
image_mode=image_export_mode,
artifacts_dir=artifacts_dir,
)
# Export Text format:
if export_txt:
fname = output_dir / f"{doc_filename}.txt"
_log.info(f"writing TXT output to {fname}")
conv_res.document.save_as_markdown(
filename=fname,
strict_text=True,
image_mode=ImageRefMode.PLACEHOLDER,
)
# Export Markdown format:
if export_md:
fname = output_dir / f"{doc_filename}.md"
_log.info(f"writing Markdown output to {fname}")
conv_res.document.save_as_markdown(
filename=fname,
artifacts_dir=artifacts_dir,
image_mode=image_export_mode,
page_break_placeholder=md_page_break_placeholder or None,
)
# Export Document Tags format:
if export_doctags:
fname = output_dir / f"{doc_filename}.doctags"
_log.info(f"writing Doc Tags output to {fname}")
conv_res.document.save_as_doctags(filename=fname)
else:
_log.warning(f"Document {conv_res.input.file} failed to convert.")
failure_count += 1
_log.info(
f"Processed {success_count + failure_count} docs, "
f"of which {failure_count} failed"
)
return conv_result
def process_results(
conversion_options: ConvertDocumentsOptions,
target: TaskTarget,
conv_results: Iterable[ConversionResult],
work_dir: Path,
) -> Union[ConvertDocumentResponse, FileResponse, PresignedUrlConvertDocumentResponse]:
# Let's start by processing the documents
try:
start_time = time.monotonic()
# Convert the iterator to a list to count the number of results and get timings
# As it's an iterator (lazy evaluation), it will also start the conversion
conv_results = list(conv_results)
processing_time = time.monotonic() - start_time
_log.info(
f"Processed {len(conv_results)} docs in {processing_time:.2f} seconds."
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
if len(conv_results) == 0:
raise HTTPException(
status_code=500, detail="No documents were generated by Docling."
)
# We have some results, let's prepare the response
response: Union[
FileResponse, ConvertDocumentResponse, PresignedUrlConvertDocumentResponse
]
# Booleans to know what to export
export_json = OutputFormat.JSON in conversion_options.to_formats
export_html = OutputFormat.HTML in conversion_options.to_formats
export_md = OutputFormat.MARKDOWN in conversion_options.to_formats
export_txt = OutputFormat.TEXT in conversion_options.to_formats
export_doctags = OutputFormat.DOCTAGS in conversion_options.to_formats
# Only 1 document was processed, and we are not returning it as a file
if len(conv_results) == 1 and isinstance(target, InBodyTarget):
conv_res = conv_results[0]
document = _export_document_as_content(
conv_res,
export_json=export_json,
export_html=export_html,
export_md=export_md,
export_txt=export_txt,
export_doctags=export_doctags,
image_mode=conversion_options.image_export_mode,
md_page_break_placeholder=conversion_options.md_page_break_placeholder,
)
response = ConvertDocumentResponse(
document=document,
status=conv_res.status,
processing_time=processing_time,
timings=conv_res.timings,
)
# Multiple documents were processed, or we are forced returning as a file
else:
# Temporary directory to store the outputs
output_dir = work_dir / "output"
output_dir.mkdir(parents=True, exist_ok=True)
# Worker pid to use in archive identification as we may have multiple workers
os.getpid()
# Export the documents
conv_result = _export_documents_as_files(
conv_results=conv_results,
output_dir=output_dir,
export_json=export_json,
export_html=export_html,
export_md=export_md,
export_txt=export_txt,
export_doctags=export_doctags,
image_export_mode=conversion_options.image_export_mode,
md_page_break_placeholder=conversion_options.md_page_break_placeholder,
)
files = os.listdir(output_dir)
if len(files) == 0:
raise HTTPException(status_code=500, detail="No documents were exported.")
file_path = work_dir / "converted_docs.zip"
shutil.make_archive(
base_name=str(file_path.with_suffix("")),
format="zip",
root_dir=output_dir,
)
# Other cleanups after the response is sent
# Output directory
# background_tasks.add_task(shutil.rmtree, work_dir, ignore_errors=True)
if isinstance(target, PutTarget):
try:
with open(file_path, "rb") as file_data:
r = httpx.put(str(target.url), files={"file": file_data})
r.raise_for_status()
response = PresignedUrlConvertDocumentResponse(
status=conv_result,
processing_time=processing_time,
)
except Exception as exc:
_log.error("An error occour while uploading zip to s3", exc_info=exc)
raise HTTPException(
status_code=500, detail="An error occour while uploading zip to s3."
)
else:
response = FileResponse(
file_path, filename=file_path.name, media_type="application/zip"
)
return response
async def prepare_response( async def prepare_response(
task: Task, orchestrator: BaseOrchestrator, background_tasks: BackgroundTasks task_id: str,
task_result: ConvertDocumentResult,
orchestrator: BaseOrchestrator,
background_tasks: BackgroundTasks,
): ):
if task.results is None: response: Response | ConvertDocumentResponse | PresignedUrlConvertDocumentResponse
raise HTTPException( if isinstance(task_result.result, ExportResult):
status_code=404, response = ConvertDocumentResponse(
detail="Task result not found. Please wait for a completion status.", document=task_result.result.content,
status=task_result.result.status,
processing_time=task_result.processing_time,
timings=task_result.result.timings,
errors=task_result.result.errors,
) )
assert task.options is not None elif isinstance(task_result.result, ZipArchiveResult):
response = Response(
work_dir = get_scratch() / task.task_id content=task_result.result.content,
response = process_results( media_type="application/zip",
conversion_options=task.options, headers={
target=task.target, "Content-Disposition": 'attachment; filename="converted_docs.zip"'
conv_results=task.results, },
work_dir=work_dir, )
) elif isinstance(task_result.result, RemoteTargetResult):
response = PresignedUrlConvertDocumentResponse(
if work_dir.exists(): processing_time=task_result.processing_time,
task.scratch_dir = work_dir num_converted=task_result.num_converted,
if not isinstance(response, FileResponse): num_succeeded=task_result.num_succeeded,
_log.warning( num_failed=task_result.num_failed,
f"Task {task.task_id=} produced content in {work_dir=} but the response is not a file." )
) else:
shutil.rmtree(work_dir, ignore_errors=True) raise ValueError("Unknown result type")
if docling_serve_settings.single_use_results: if docling_serve_settings.single_use_results:
if task.scratch_dir is not None:
background_tasks.add_task(
shutil.rmtree, task.scratch_dir, ignore_errors=True
)
async def _remove_task_impl(): async def _remove_task_impl():
await asyncio.sleep(docling_serve_settings.result_removal_delay) await asyncio.sleep(docling_serve_settings.result_removal_delay)
await orchestrator.delete_task(task_id=task.task_id) await orchestrator.delete_task(task_id=task_id)
async def _remove_task(): async def _remove_task():
asyncio.create_task(_remove_task_impl()) # noqa: RUF006 asyncio.create_task(_remove_task_impl()) # noqa: RUF006

View File

@@ -28,6 +28,7 @@ class UvicornSettings(BaseSettings):
class AsyncEngine(str, enum.Enum): class AsyncEngine(str, enum.Enum):
LOCAL = "local" LOCAL = "local"
KFP = "kfp" KFP = "kfp"
RQ = "rq"
class DoclingServeSettings(BaseSettings): class DoclingServeSettings(BaseSettings):
@@ -64,6 +65,10 @@ class DoclingServeSettings(BaseSettings):
# Local engine # Local engine
eng_loc_num_workers: int = 2 eng_loc_num_workers: int = 2
eng_loc_share_models: bool = False eng_loc_share_models: bool = False
# RQ engine
eng_rq_redis_url: str = ""
eng_rq_results_prefix: str = "docling:results"
eng_rq_sub_channel: str = "docling:updates"
# KFP engine # KFP engine
eng_kfp_endpoint: Optional[AnyUrl] = None eng_kfp_endpoint: Optional[AnyUrl] = None
eng_kfp_token: Optional[str] = None eng_kfp_token: Optional[str] = None
@@ -87,6 +92,10 @@ class DoclingServeSettings(BaseSettings):
"KFP is not yet working. To enable the development version, you must set DOCLING_SERVE_ENG_KFP_EXPERIMENTAL=true." "KFP is not yet working. To enable the development version, you must set DOCLING_SERVE_ENG_KFP_EXPERIMENTAL=true."
) )
if self.eng_kind == AsyncEngine.RQ:
if not self.eng_rq_redis_url:
raise ValueError("RQ Redis url is required when using the RQ engine.")
return self return self

View File

@@ -52,7 +52,7 @@ THe following table describes the options to configure the Docling Serve app.
| | `DOCLING_SERVE_CORS_ORIGINS` | `["*"]` | A list of origins that should be permitted to make cross-origin requests. | | | `DOCLING_SERVE_CORS_ORIGINS` | `["*"]` | A list of origins that should be permitted to make cross-origin requests. |
| | `DOCLING_SERVE_CORS_METHODS` | `["*"]` | A list of HTTP methods that should be allowed for cross-origin requests. | | | `DOCLING_SERVE_CORS_METHODS` | `["*"]` | A list of HTTP methods that should be allowed for cross-origin requests. |
| | `DOCLING_SERVE_CORS_HEADERS` | `["*"]` | A list of HTTP request headers that should be supported for cross-origin requests. | | | `DOCLING_SERVE_CORS_HEADERS` | `["*"]` | A list of HTTP request headers that should be supported for cross-origin requests. |
| | `DOCLING_SERVE_ENG_KIND` | `local` | The compute engine to use for the async tasks. Possible values are `local` and `kfp`. See below for more configurations of the engines. | | | `DOCLING_SERVE_ENG_KIND` | `local` | The compute engine to use for the async tasks. Possible values are `local`, `rq` and `kfp`. See below for more configurations of the engines. |
### Compute engine ### Compute engine
@@ -68,6 +68,16 @@ The following table describes the options to configure the Docling Serve local e
| `DOCLING_SERVE_ENG_LOC_NUM_WORKERS` | 2 | Number of workers/threads processing the incoming tasks. | | `DOCLING_SERVE_ENG_LOC_NUM_WORKERS` | 2 | Number of workers/threads processing the incoming tasks. |
| `DOCLING_SERVE_ENG_LOC_SHARE_MODELS` | False | If true, each process will share the same models among all thread workers. Otherwise, one instance of the models is allocated for each worker thread. | | `DOCLING_SERVE_ENG_LOC_SHARE_MODELS` | False | If true, each process will share the same models among all thread workers. Otherwise, one instance of the models is allocated for each worker thread. |
#### RQ engine
The following table describes the options to configure the Docling Serve RQ engine.
| ENV | Default | Description |
|-----|---------|-------------|
| `DOCLING_SERVE_ENG_RQ_REDIS_URL` | (required) | The connection Redis url, e.g. `redis://localhost:6373/` |
| `DOCLING_SERVE_ENG_RQ_RESULTS_PREFIX` | `docling:results` | The prefix used for storing the results in Redis. |
| `DOCLING_SERVE_ENG_RQ_RESULTS_PREFIX` | `docling:updates` | The channel key name used for storing communicating updates between the workers and the orchestrator. |
#### KFP engine #### KFP engine
The following table describes the options to configure the Docling Serve KFP engine. The following table describes the options to configure the Docling Serve KFP engine.

View File

@@ -35,7 +35,7 @@ requires-python = ">=3.10"
dependencies = [ dependencies = [
"docling~=2.38", "docling~=2.38",
"docling-core>=2.44.1", "docling-core>=2.44.1",
"docling-jobkit[kfp,vlm]>=1.3.1,<2.0.0", "docling-jobkit[kfp,rq,vlm]>=1.4.0,<2.0.0",
"fastapi[standard]~=0.115", "fastapi[standard]~=0.115",
"httpx~=0.28", "httpx~=0.28",
"pydantic~=2.10", "pydantic~=2.10",

1472
uv.lock generated

File diff suppressed because one or more lines are too long