diff --git a/docling_serve/app.py b/docling_serve/app.py index 7e76fa2..4990095 100644 --- a/docling_serve/app.py +++ b/docling_serve/app.py @@ -34,8 +34,13 @@ from docling_jobkit.datamodel.callback import ( ProgressCallbackResponse, ) from docling_jobkit.datamodel.http_inputs import FileSource, HttpSource +from docling_jobkit.datamodel.s3_coords import S3Coordinates from docling_jobkit.datamodel.task import Task, TaskSource -from docling_jobkit.datamodel.task_targets import InBodyTarget, TaskTarget, ZipTarget +from docling_jobkit.datamodel.task_targets import ( + InBodyTarget, + TaskTarget, + ZipTarget, +) from docling_jobkit.orchestrators.base_orchestrator import ( BaseOrchestrator, ProgressInvalid, @@ -47,6 +52,7 @@ from docling_serve.datamodel.requests import ( ConvertDocumentsRequest, FileSourceRequest, HttpSourceRequest, + S3SourceRequest, TargetName, ) from docling_serve.datamodel.responses import ( @@ -54,6 +60,7 @@ from docling_serve.datamodel.responses import ( ConvertDocumentResponse, HealthCheckResponse, MessageKind, + PresignedUrlConvertDocumentResponse, TaskStatusResponse, WebsocketMessage, ) @@ -246,6 +253,8 @@ def create_app(): # noqa: C901 sources.append(FileSource.model_validate(s)) elif isinstance(s, HttpSourceRequest): sources.append(HttpSource.model_validate(s)) + elif isinstance(s, S3SourceRequest): + sources.append(S3Coordinates.model_validate(s)) task = await orchestrator.enqueue( sources=sources, @@ -525,7 +534,7 @@ def create_app(): # noqa: C901 # Task result @app.get( "/v1/result/{task_id}", - response_model=ConvertDocumentResponse, + response_model=ConvertDocumentResponse | PresignedUrlConvertDocumentResponse, responses={ 200: { "content": {"application/zip": {}}, diff --git a/docling_serve/datamodel/requests.py b/docling_serve/datamodel/requests.py index e356561..943592b 100644 --- a/docling_serve/datamodel/requests.py +++ b/docling_serve/datamodel/requests.py @@ -1,12 +1,21 @@ import enum from typing import Annotated, Literal -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, model_validator +from pydantic_core import PydanticCustomError +from typing_extensions import Self from docling_jobkit.datamodel.http_inputs import FileSource, HttpSource -from docling_jobkit.datamodel.task_targets import InBodyTarget, TaskTarget, ZipTarget +from docling_jobkit.datamodel.s3_coords import S3Coordinates +from docling_jobkit.datamodel.task_targets import ( + InBodyTarget, + S3Target, + TaskTarget, + ZipTarget, +) from docling_serve.datamodel.convert import ConvertDocumentsRequestOptions +from docling_serve.settings import AsyncEngine, docling_serve_settings ## Sources @@ -19,6 +28,10 @@ class HttpSourceRequest(HttpSource): kind: Literal["http"] = "http" +class S3SourceRequest(S3Coordinates): + kind: Literal["s3"] = "s3" + + ## Multipart targets class TargetName(str, enum.Enum): INBODY = InBodyTarget().kind @@ -27,7 +40,7 @@ class TargetName(str, enum.Enum): ## Aliases SourceRequestItem = Annotated[ - FileSourceRequest | HttpSourceRequest, Field(discriminator="kind") + FileSourceRequest | HttpSourceRequest | S3SourceRequest, Field(discriminator="kind") ] @@ -36,3 +49,24 @@ class ConvertDocumentsRequest(BaseModel): options: ConvertDocumentsRequestOptions = ConvertDocumentsRequestOptions() sources: list[SourceRequestItem] target: TaskTarget = InBodyTarget() + + @model_validator(mode="after") + def validate_s3_source_and_target(self) -> Self: + for source in self.sources: + if isinstance(source, S3SourceRequest): + if docling_serve_settings.eng_kind != AsyncEngine.KFP: + raise PydanticCustomError( + "error source", 'source kind "s3" requires engine kind "KFP"' + ) + if self.target.kind != "s3": + raise PydanticCustomError( + "error source", 'source kind "s3" requires target kind "s3"' + ) + if isinstance(self.target, S3Target): + for source in self.sources: + if isinstance(source, S3SourceRequest): + return self + raise PydanticCustomError( + "error target", 'target kind "s3" requires source kind "s3"' + ) + return self diff --git a/docling_serve/datamodel/responses.py b/docling_serve/datamodel/responses.py index 9b3d554..01286f8 100644 --- a/docling_serve/datamodel/responses.py +++ b/docling_serve/datamodel/responses.py @@ -35,6 +35,11 @@ class ConvertDocumentResponse(BaseModel): timings: dict[str, ProfilingItem] = {} +class PresignedUrlConvertDocumentResponse(BaseModel): + status: ConversionStatus + processing_time: float + + class ConvertDocumentErrorResponse(BaseModel): status: ConversionStatus diff --git a/docling_serve/response_preparation.py b/docling_serve/response_preparation.py index 5f403f2..23387c9 100644 --- a/docling_serve/response_preparation.py +++ b/docling_serve/response_preparation.py @@ -7,6 +7,7 @@ from collections.abc import Iterable from pathlib import Path from typing import Union +import httpx from fastapi import BackgroundTasks, HTTPException from fastapi.responses import FileResponse @@ -15,12 +16,16 @@ from docling.datamodel.document import ConversionResult, ConversionStatus from docling_core.types.doc import ImageRefMode from docling_jobkit.datamodel.convert import ConvertDocumentsOptions from docling_jobkit.datamodel.task import Task -from docling_jobkit.datamodel.task_targets import InBodyTarget, TaskTarget +from docling_jobkit.datamodel.task_targets import InBodyTarget, PutTarget, TaskTarget from docling_jobkit.orchestrators.base_orchestrator import ( BaseOrchestrator, ) -from docling_serve.datamodel.responses import ConvertDocumentResponse, DocumentResponse +from docling_serve.datamodel.responses import ( + ConvertDocumentResponse, + DocumentResponse, + PresignedUrlConvertDocumentResponse, +) from docling_serve.settings import docling_serve_settings from docling_serve.storage import get_scratch @@ -79,11 +84,15 @@ def _export_documents_as_files( export_doctags: bool, image_export_mode: ImageRefMode, md_page_break_placeholder: str, -): +) -> ConversionStatus: success_count = 0 failure_count = 0 + # Default failure in case results is empty + conv_result = ConversionStatus.FAILURE + for conv_res in conv_results: + conv_result = conv_res.status if conv_res.status == ConversionStatus.SUCCESS: success_count += 1 doc_filename = conv_res.input.file.stem @@ -138,6 +147,7 @@ def _export_documents_as_files( f"Processed {success_count + failure_count} docs, " f"of which {failure_count} failed" ) + return conv_result def process_results( @@ -145,7 +155,7 @@ def process_results( target: TaskTarget, conv_results: Iterable[ConversionResult], work_dir: Path, -) -> Union[ConvertDocumentResponse, FileResponse]: +) -> Union[ConvertDocumentResponse, FileResponse, PresignedUrlConvertDocumentResponse]: # Let's start by processing the documents try: start_time = time.monotonic() @@ -169,7 +179,9 @@ def process_results( ) # We have some results, let's prepare the response - response: Union[FileResponse, ConvertDocumentResponse] + response: Union[ + FileResponse, ConvertDocumentResponse, PresignedUrlConvertDocumentResponse + ] # Booleans to know what to export export_json = OutputFormat.JSON in conversion_options.to_formats @@ -209,7 +221,7 @@ def process_results( os.getpid() # Export the documents - _export_documents_as_files( + conv_result = _export_documents_as_files( conv_results=conv_results, output_dir=output_dir, export_json=export_json, @@ -236,9 +248,24 @@ def process_results( # Output directory # background_tasks.add_task(shutil.rmtree, work_dir, ignore_errors=True) - response = FileResponse( - file_path, filename=file_path.name, media_type="application/zip" - ) + if isinstance(target, PutTarget): + try: + with open(file_path, "rb") as file_data: + r = httpx.put(str(target.url), files={"file": file_data}) + r.raise_for_status() + response = PresignedUrlConvertDocumentResponse( + status=conv_result, + processing_time=processing_time, + ) + except Exception as exc: + _log.error("An error occour while uploading zip to s3", exc_info=exc) + raise HTTPException( + status_code=500, detail="An error occour while uploading zip to s3." + ) + else: + response = FileResponse( + file_path, filename=file_path.name, media_type="application/zip" + ) return response diff --git a/pyproject.toml b/pyproject.toml index 0174a44..d17b238 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,7 +36,7 @@ requires-python = ">=3.10" dependencies = [ "docling~=2.38", "docling-core>=2.32.0", - "docling-jobkit[kfp,vlm]~=1.1", + "docling-jobkit[kfp,vlm]~=1.2", "fastapi[standard]~=0.115", "httpx~=0.28", "pydantic~=2.10", @@ -128,7 +128,7 @@ torchvision = [ { index = "pytorch-cu126", group = "cu126" }, { index = "pytorch-cu128", group = "cu128" }, ] -# docling-jobkit = { git = "https://github.com/docling-project/docling-jobkit/", rev = "refactor" } +# docling-jobkit = { git = "https://github.com/docling-project/docling-jobkit/", rev = "main" } # docling-jobkit = { path = "../docling-jobkit", editable = true } [[tool.uv.index]] diff --git a/uv.lock b/uv.lock index bf4edba..71e94e0 100644 --- a/uv.lock +++ b/uv.lock @@ -864,7 +864,7 @@ wheels = [ [[package]] name = "docling-jobkit" -version = "1.1.1" +version = "1.2.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "boto3", marker = "platform_machine != 'x86_64' or sys_platform != 'darwin' or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu124') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu126') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu128') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu124' and extra == 'group-13-docling-serve-cu126') or (extra == 'group-13-docling-serve-cu124' and extra == 'group-13-docling-serve-cu128') or (extra == 'group-13-docling-serve-cu124' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-cu128') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu128' and extra == 'group-13-docling-serve-pypi')" }, @@ -877,9 +877,9 @@ dependencies = [ { name = "pydantic-settings", marker = "platform_machine != 'x86_64' or sys_platform != 'darwin' or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu124') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu126') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu128') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu124' and extra == 'group-13-docling-serve-cu126') or (extra == 'group-13-docling-serve-cu124' and extra == 'group-13-docling-serve-cu128') or (extra == 'group-13-docling-serve-cu124' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-cu128') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu128' and extra == 'group-13-docling-serve-pypi')" }, { name = "typer", marker = "platform_machine != 'x86_64' or sys_platform != 'darwin' or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu124') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu126') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu128') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu124' and extra == 'group-13-docling-serve-cu126') or (extra == 'group-13-docling-serve-cu124' and extra == 'group-13-docling-serve-cu128') or (extra == 'group-13-docling-serve-cu124' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-cu128') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu128' and extra == 'group-13-docling-serve-pypi')" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/d9/69/dcfc9deb8b734db1d5b805f1bb3d02f0c721d595d438e089f74b62d601a8/docling_jobkit-1.1.1.tar.gz", hash = "sha256:49c58e71bf8d384b2f961f357b0abaec9c2fa32691295e977578f5ff0b459f45", size = 34859, upload-time = "2025-07-18T15:24:00.885Z" } +sdist = { url = "https://files.pythonhosted.org/packages/4d/64/8a8f369b9fa6b46050acbca7eb44b78e130d298303413ef9778ecb60a7d5/docling_jobkit-1.2.0.tar.gz", hash = "sha256:712336436a09bb84266ce474c9ace038bf57a6700d6abc931f3f03d0ad795fab", size = 34534, upload-time = "2025-07-24T08:42:37.924Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/dc/1d/a557da19a1270b19a5c1a164012d2d89c361be1fd71037fb1e057075e4fd/docling_jobkit-1.1.1-py3-none-any.whl", hash = "sha256:6bd60509ace839ae0b8e3191f40a556d5c4b269801a04108baf8ffb3b74f289b", size = 42301, upload-time = "2025-07-18T15:23:59.311Z" }, + { url = "https://files.pythonhosted.org/packages/c6/eb/343960310b9b41e9bfbcbc317ec2d847bd51bffda58b5bae3400a24d9fac/docling_jobkit-1.2.0-py3-none-any.whl", hash = "sha256:8135708f51307dfe38037b522a45201309887d7538d8117c033f08487499ff98", size = 41839, upload-time = "2025-07-24T08:42:36.736Z" }, ] [package.optional-dependencies] @@ -1001,7 +1001,7 @@ pypi = [ requires-dist = [ { name = "docling", specifier = "~=2.38" }, { name = "docling-core", specifier = ">=2.32.0" }, - { name = "docling-jobkit", extras = ["kfp", "vlm"], specifier = "~=1.1" }, + { name = "docling-jobkit", extras = ["kfp", "vlm"], specifier = "~=1.2" }, { name = "fastapi", extras = ["standard"], specifier = "~=0.115" }, { name = "flash-attn", marker = "platform_machine == 'x86_64' and sys_platform == 'linux' and extra == 'flash-attn'", specifier = "~=2.7.0" }, { name = "gradio", marker = "extra == 'ui'", specifier = "~=5.9" },