feat: add new source and target (#270)

Signed-off-by: Tiago Santana <54704492+SantanaTiago@users.noreply.github.com>
This commit is contained in:
Tiago Santana
2025-07-29 13:44:49 +01:00
committed by GitHub
parent 24db461b14
commit 3771c1b554
6 changed files with 95 additions and 20 deletions

View File

@@ -34,8 +34,13 @@ from docling_jobkit.datamodel.callback import (
ProgressCallbackResponse,
)
from docling_jobkit.datamodel.http_inputs import FileSource, HttpSource
from docling_jobkit.datamodel.s3_coords import S3Coordinates
from docling_jobkit.datamodel.task import Task, TaskSource
from docling_jobkit.datamodel.task_targets import InBodyTarget, TaskTarget, ZipTarget
from docling_jobkit.datamodel.task_targets import (
InBodyTarget,
TaskTarget,
ZipTarget,
)
from docling_jobkit.orchestrators.base_orchestrator import (
BaseOrchestrator,
ProgressInvalid,
@@ -47,6 +52,7 @@ from docling_serve.datamodel.requests import (
ConvertDocumentsRequest,
FileSourceRequest,
HttpSourceRequest,
S3SourceRequest,
TargetName,
)
from docling_serve.datamodel.responses import (
@@ -54,6 +60,7 @@ from docling_serve.datamodel.responses import (
ConvertDocumentResponse,
HealthCheckResponse,
MessageKind,
PresignedUrlConvertDocumentResponse,
TaskStatusResponse,
WebsocketMessage,
)
@@ -246,6 +253,8 @@ def create_app(): # noqa: C901
sources.append(FileSource.model_validate(s))
elif isinstance(s, HttpSourceRequest):
sources.append(HttpSource.model_validate(s))
elif isinstance(s, S3SourceRequest):
sources.append(S3Coordinates.model_validate(s))
task = await orchestrator.enqueue(
sources=sources,
@@ -525,7 +534,7 @@ def create_app(): # noqa: C901
# Task result
@app.get(
"/v1/result/{task_id}",
response_model=ConvertDocumentResponse,
response_model=ConvertDocumentResponse | PresignedUrlConvertDocumentResponse,
responses={
200: {
"content": {"application/zip": {}},

View File

@@ -1,12 +1,21 @@
import enum
from typing import Annotated, Literal
from pydantic import BaseModel, Field
from pydantic import BaseModel, Field, model_validator
from pydantic_core import PydanticCustomError
from typing_extensions import Self
from docling_jobkit.datamodel.http_inputs import FileSource, HttpSource
from docling_jobkit.datamodel.task_targets import InBodyTarget, TaskTarget, ZipTarget
from docling_jobkit.datamodel.s3_coords import S3Coordinates
from docling_jobkit.datamodel.task_targets import (
InBodyTarget,
S3Target,
TaskTarget,
ZipTarget,
)
from docling_serve.datamodel.convert import ConvertDocumentsRequestOptions
from docling_serve.settings import AsyncEngine, docling_serve_settings
## Sources
@@ -19,6 +28,10 @@ class HttpSourceRequest(HttpSource):
kind: Literal["http"] = "http"
class S3SourceRequest(S3Coordinates):
kind: Literal["s3"] = "s3"
## Multipart targets
class TargetName(str, enum.Enum):
INBODY = InBodyTarget().kind
@@ -27,7 +40,7 @@ class TargetName(str, enum.Enum):
## Aliases
SourceRequestItem = Annotated[
FileSourceRequest | HttpSourceRequest, Field(discriminator="kind")
FileSourceRequest | HttpSourceRequest | S3SourceRequest, Field(discriminator="kind")
]
@@ -36,3 +49,24 @@ class ConvertDocumentsRequest(BaseModel):
options: ConvertDocumentsRequestOptions = ConvertDocumentsRequestOptions()
sources: list[SourceRequestItem]
target: TaskTarget = InBodyTarget()
@model_validator(mode="after")
def validate_s3_source_and_target(self) -> Self:
for source in self.sources:
if isinstance(source, S3SourceRequest):
if docling_serve_settings.eng_kind != AsyncEngine.KFP:
raise PydanticCustomError(
"error source", 'source kind "s3" requires engine kind "KFP"'
)
if self.target.kind != "s3":
raise PydanticCustomError(
"error source", 'source kind "s3" requires target kind "s3"'
)
if isinstance(self.target, S3Target):
for source in self.sources:
if isinstance(source, S3SourceRequest):
return self
raise PydanticCustomError(
"error target", 'target kind "s3" requires source kind "s3"'
)
return self

View File

@@ -35,6 +35,11 @@ class ConvertDocumentResponse(BaseModel):
timings: dict[str, ProfilingItem] = {}
class PresignedUrlConvertDocumentResponse(BaseModel):
status: ConversionStatus
processing_time: float
class ConvertDocumentErrorResponse(BaseModel):
status: ConversionStatus

View File

@@ -7,6 +7,7 @@ from collections.abc import Iterable
from pathlib import Path
from typing import Union
import httpx
from fastapi import BackgroundTasks, HTTPException
from fastapi.responses import FileResponse
@@ -15,12 +16,16 @@ from docling.datamodel.document import ConversionResult, ConversionStatus
from docling_core.types.doc import ImageRefMode
from docling_jobkit.datamodel.convert import ConvertDocumentsOptions
from docling_jobkit.datamodel.task import Task
from docling_jobkit.datamodel.task_targets import InBodyTarget, TaskTarget
from docling_jobkit.datamodel.task_targets import InBodyTarget, PutTarget, TaskTarget
from docling_jobkit.orchestrators.base_orchestrator import (
BaseOrchestrator,
)
from docling_serve.datamodel.responses import ConvertDocumentResponse, DocumentResponse
from docling_serve.datamodel.responses import (
ConvertDocumentResponse,
DocumentResponse,
PresignedUrlConvertDocumentResponse,
)
from docling_serve.settings import docling_serve_settings
from docling_serve.storage import get_scratch
@@ -79,11 +84,15 @@ def _export_documents_as_files(
export_doctags: bool,
image_export_mode: ImageRefMode,
md_page_break_placeholder: str,
):
) -> ConversionStatus:
success_count = 0
failure_count = 0
# Default failure in case results is empty
conv_result = ConversionStatus.FAILURE
for conv_res in conv_results:
conv_result = conv_res.status
if conv_res.status == ConversionStatus.SUCCESS:
success_count += 1
doc_filename = conv_res.input.file.stem
@@ -138,6 +147,7 @@ def _export_documents_as_files(
f"Processed {success_count + failure_count} docs, "
f"of which {failure_count} failed"
)
return conv_result
def process_results(
@@ -145,7 +155,7 @@ def process_results(
target: TaskTarget,
conv_results: Iterable[ConversionResult],
work_dir: Path,
) -> Union[ConvertDocumentResponse, FileResponse]:
) -> Union[ConvertDocumentResponse, FileResponse, PresignedUrlConvertDocumentResponse]:
# Let's start by processing the documents
try:
start_time = time.monotonic()
@@ -169,7 +179,9 @@ def process_results(
)
# We have some results, let's prepare the response
response: Union[FileResponse, ConvertDocumentResponse]
response: Union[
FileResponse, ConvertDocumentResponse, PresignedUrlConvertDocumentResponse
]
# Booleans to know what to export
export_json = OutputFormat.JSON in conversion_options.to_formats
@@ -209,7 +221,7 @@ def process_results(
os.getpid()
# Export the documents
_export_documents_as_files(
conv_result = _export_documents_as_files(
conv_results=conv_results,
output_dir=output_dir,
export_json=export_json,
@@ -236,6 +248,21 @@ def process_results(
# Output directory
# background_tasks.add_task(shutil.rmtree, work_dir, ignore_errors=True)
if isinstance(target, PutTarget):
try:
with open(file_path, "rb") as file_data:
r = httpx.put(str(target.url), files={"file": file_data})
r.raise_for_status()
response = PresignedUrlConvertDocumentResponse(
status=conv_result,
processing_time=processing_time,
)
except Exception as exc:
_log.error("An error occour while uploading zip to s3", exc_info=exc)
raise HTTPException(
status_code=500, detail="An error occour while uploading zip to s3."
)
else:
response = FileResponse(
file_path, filename=file_path.name, media_type="application/zip"
)

View File

@@ -36,7 +36,7 @@ requires-python = ">=3.10"
dependencies = [
"docling~=2.38",
"docling-core>=2.32.0",
"docling-jobkit[kfp,vlm]~=1.1",
"docling-jobkit[kfp,vlm]~=1.2",
"fastapi[standard]~=0.115",
"httpx~=0.28",
"pydantic~=2.10",
@@ -128,7 +128,7 @@ torchvision = [
{ index = "pytorch-cu126", group = "cu126" },
{ index = "pytorch-cu128", group = "cu128" },
]
# docling-jobkit = { git = "https://github.com/docling-project/docling-jobkit/", rev = "refactor" }
# docling-jobkit = { git = "https://github.com/docling-project/docling-jobkit/", rev = "main" }
# docling-jobkit = { path = "../docling-jobkit", editable = true }
[[tool.uv.index]]

8
uv.lock generated
View File

@@ -864,7 +864,7 @@ wheels = [
[[package]]
name = "docling-jobkit"
version = "1.1.1"
version = "1.2.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "boto3", marker = "platform_machine != 'x86_64' or sys_platform != 'darwin' or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu124') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu126') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu128') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu124' and extra == 'group-13-docling-serve-cu126') or (extra == 'group-13-docling-serve-cu124' and extra == 'group-13-docling-serve-cu128') or (extra == 'group-13-docling-serve-cu124' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-cu128') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu128' and extra == 'group-13-docling-serve-pypi')" },
@@ -877,9 +877,9 @@ dependencies = [
{ name = "pydantic-settings", marker = "platform_machine != 'x86_64' or sys_platform != 'darwin' or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu124') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu126') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu128') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu124' and extra == 'group-13-docling-serve-cu126') or (extra == 'group-13-docling-serve-cu124' and extra == 'group-13-docling-serve-cu128') or (extra == 'group-13-docling-serve-cu124' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-cu128') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu128' and extra == 'group-13-docling-serve-pypi')" },
{ name = "typer", marker = "platform_machine != 'x86_64' or sys_platform != 'darwin' or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu124') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu126') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu128') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu124' and extra == 'group-13-docling-serve-cu126') or (extra == 'group-13-docling-serve-cu124' and extra == 'group-13-docling-serve-cu128') or (extra == 'group-13-docling-serve-cu124' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-cu128') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu128' and extra == 'group-13-docling-serve-pypi')" },
]
sdist = { url = "https://files.pythonhosted.org/packages/d9/69/dcfc9deb8b734db1d5b805f1bb3d02f0c721d595d438e089f74b62d601a8/docling_jobkit-1.1.1.tar.gz", hash = "sha256:49c58e71bf8d384b2f961f357b0abaec9c2fa32691295e977578f5ff0b459f45", size = 34859, upload-time = "2025-07-18T15:24:00.885Z" }
sdist = { url = "https://files.pythonhosted.org/packages/4d/64/8a8f369b9fa6b46050acbca7eb44b78e130d298303413ef9778ecb60a7d5/docling_jobkit-1.2.0.tar.gz", hash = "sha256:712336436a09bb84266ce474c9ace038bf57a6700d6abc931f3f03d0ad795fab", size = 34534, upload-time = "2025-07-24T08:42:37.924Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/dc/1d/a557da19a1270b19a5c1a164012d2d89c361be1fd71037fb1e057075e4fd/docling_jobkit-1.1.1-py3-none-any.whl", hash = "sha256:6bd60509ace839ae0b8e3191f40a556d5c4b269801a04108baf8ffb3b74f289b", size = 42301, upload-time = "2025-07-18T15:23:59.311Z" },
{ url = "https://files.pythonhosted.org/packages/c6/eb/343960310b9b41e9bfbcbc317ec2d847bd51bffda58b5bae3400a24d9fac/docling_jobkit-1.2.0-py3-none-any.whl", hash = "sha256:8135708f51307dfe38037b522a45201309887d7538d8117c033f08487499ff98", size = 41839, upload-time = "2025-07-24T08:42:36.736Z" },
]
[package.optional-dependencies]
@@ -1001,7 +1001,7 @@ pypi = [
requires-dist = [
{ name = "docling", specifier = "~=2.38" },
{ name = "docling-core", specifier = ">=2.32.0" },
{ name = "docling-jobkit", extras = ["kfp", "vlm"], specifier = "~=1.1" },
{ name = "docling-jobkit", extras = ["kfp", "vlm"], specifier = "~=1.2" },
{ name = "fastapi", extras = ["standard"], specifier = "~=0.115" },
{ name = "flash-attn", marker = "platform_machine == 'x86_64' and sys_platform == 'linux' and extra == 'flash-attn'", specifier = "~=2.7.0" },
{ name = "gradio", marker = "extra == 'ui'", specifier = "~=5.9" },