feat!: v1 api with list of sources and target (#249)

Signed-off-by: Michele Dolfi <dol@zurich.ibm.com>
This commit is contained in:
Michele Dolfi
2025-07-14 13:19:49 +02:00
committed by GitHub
parent daa924a77e
commit 56e328baf7
23 changed files with 556 additions and 367 deletions

View File

@@ -11,6 +11,7 @@ from fastapi import (
BackgroundTasks,
Depends,
FastAPI,
Form,
HTTPException,
Query,
UploadFile,
@@ -32,7 +33,9 @@ from docling_jobkit.datamodel.callback import (
ProgressCallbackRequest,
ProgressCallbackResponse,
)
from docling_jobkit.datamodel.http_inputs import FileSource, HttpSource
from docling_jobkit.datamodel.task import Task, TaskSource
from docling_jobkit.datamodel.task_targets import InBodyTarget, TaskTarget, ZipTarget
from docling_jobkit.orchestrators.base_orchestrator import (
BaseOrchestrator,
ProgressInvalid,
@@ -41,9 +44,10 @@ from docling_jobkit.orchestrators.base_orchestrator import (
from docling_serve.datamodel.convert import ConvertDocumentsRequestOptions
from docling_serve.datamodel.requests import (
ConvertDocumentFileSourcesRequest,
ConvertDocumentHttpSourcesRequest,
ConvertDocumentsRequest,
FileSourceRequest,
HttpSourceRequest,
TargetName,
)
from docling_serve.datamodel.responses import (
ClearResponse,
@@ -237,13 +241,16 @@ def create_app(): # noqa: C901
orchestrator: BaseOrchestrator, conversion_request: ConvertDocumentsRequest
) -> Task:
sources: list[TaskSource] = []
if isinstance(conversion_request, ConvertDocumentFileSourcesRequest):
sources.extend(conversion_request.file_sources)
if isinstance(conversion_request, ConvertDocumentHttpSourcesRequest):
sources.extend(conversion_request.http_sources)
for s in conversion_request.sources:
if isinstance(s, FileSourceRequest):
sources.append(FileSource.model_validate(s))
elif isinstance(s, HttpSourceRequest):
sources.append(HttpSource.model_validate(s))
task = await orchestrator.enqueue(
sources=sources, options=conversion_request.options
sources=sources,
options=conversion_request.options,
target=conversion_request.target,
)
return task
@@ -251,6 +258,7 @@ def create_app(): # noqa: C901
orchestrator: BaseOrchestrator,
files: list[UploadFile],
options: ConvertDocumentsRequestOptions,
target: TaskTarget,
) -> Task:
_log.info(f"Received {len(files)} files for processing.")
@@ -262,7 +270,9 @@ def create_app(): # noqa: C901
name = file.filename if file.filename else f"file{suffix}.pdf"
file_sources.append(DocumentStream(name=name, stream=buf))
task = await orchestrator.enqueue(sources=file_sources, options=options)
task = await orchestrator.enqueue(
sources=file_sources, options=options, target=target
)
return task
async def _wait_task_complete(orchestrator: BaseOrchestrator, task_id: str) -> bool:
@@ -300,7 +310,7 @@ def create_app(): # noqa: C901
# Convert a document from URL(s)
@app.post(
"/v1alpha/convert/source",
"/v1/convert/source",
response_model=ConvertDocumentResponse,
responses={
200: {
@@ -336,7 +346,7 @@ def create_app(): # noqa: C901
# Convert a document from file(s)
@app.post(
"/v1alpha/convert/file",
"/v1/convert/file",
response_model=ConvertDocumentResponse,
responses={
200: {
@@ -351,9 +361,11 @@ def create_app(): # noqa: C901
options: Annotated[
ConvertDocumentsRequestOptions, FormDepends(ConvertDocumentsRequestOptions)
],
target_type: Annotated[TargetName, Form()] = TargetName.INBODY,
):
target = InBodyTarget() if target_type == TargetName.INBODY else ZipTarget()
task = await _enque_file(
orchestrator=orchestrator, files=files, options=options
orchestrator=orchestrator, files=files, options=options, target=target
)
completed = await _wait_task_complete(
orchestrator=orchestrator, task_id=task.task_id
@@ -374,7 +386,7 @@ def create_app(): # noqa: C901
# Convert a document from URL(s) using the async api
@app.post(
"/v1alpha/convert/source/async",
"/v1/convert/source/async",
response_model=TaskStatusResponse,
)
async def process_url_async(
@@ -396,7 +408,7 @@ def create_app(): # noqa: C901
# Convert a document from file(s) using the async api
@app.post(
"/v1alpha/convert/file/async",
"/v1/convert/file/async",
response_model=TaskStatusResponse,
)
async def process_file_async(
@@ -406,9 +418,11 @@ def create_app(): # noqa: C901
options: Annotated[
ConvertDocumentsRequestOptions, FormDepends(ConvertDocumentsRequestOptions)
],
target_type: Annotated[TargetName, Form()] = TargetName.INBODY,
):
target = InBodyTarget() if target_type == TargetName.INBODY else ZipTarget()
task = await _enque_file(
orchestrator=orchestrator, files=files, options=options
orchestrator=orchestrator, files=files, options=options, target=target
)
task_queue_position = await orchestrator.get_queue_position(
task_id=task.task_id
@@ -422,7 +436,7 @@ def create_app(): # noqa: C901
# Task status poll
@app.get(
"/v1alpha/status/poll/{task_id}",
"/v1/status/poll/{task_id}",
response_model=TaskStatusResponse,
)
async def task_status_poll(
@@ -446,7 +460,7 @@ def create_app(): # noqa: C901
# Task status websocket
@app.websocket(
"/v1alpha/status/ws/{task_id}",
"/v1/status/ws/{task_id}",
)
async def task_status_ws(
websocket: WebSocket,
@@ -510,7 +524,7 @@ def create_app(): # noqa: C901
# Task result
@app.get(
"/v1alpha/result/{task_id}",
"/v1/result/{task_id}",
response_model=ConvertDocumentResponse,
responses={
200: {
@@ -534,7 +548,7 @@ def create_app(): # noqa: C901
# Update task progress
@app.post(
"/v1alpha/callback/task/progress",
"/v1/callback/task/progress",
response_model=ProgressCallbackResponse,
)
async def callback_task_progress(
@@ -555,7 +569,7 @@ def create_app(): # noqa: C901
# Offload models
@app.get(
"/v1alpha/clear/converters",
"/v1/clear/converters",
response_model=ClearResponse,
)
async def clear_converters(
@@ -566,7 +580,7 @@ def create_app(): # noqa: C901
# Clean results
@app.get(
"/v1alpha/clear/results",
"/v1/clear/results",
response_model=ClearResponse,
)
async def clear_results(

View File

@@ -1,24 +1,38 @@
from typing import Union
import enum
from typing import Annotated, Literal
from pydantic import BaseModel
from pydantic import BaseModel, Field
from docling_jobkit.datamodel.http_inputs import FileSource, HttpSource
from docling_jobkit.datamodel.task_targets import InBodyTarget, TaskTarget, ZipTarget
from docling_serve.datamodel.convert import ConvertDocumentsRequestOptions
class DocumentsConvertBase(BaseModel):
options: ConvertDocumentsRequestOptions = ConvertDocumentsRequestOptions()
## Sources
class ConvertDocumentHttpSourcesRequest(DocumentsConvertBase):
http_sources: list[HttpSource]
class FileSourceRequest(FileSource):
kind: Literal["file"] = "file"
class ConvertDocumentFileSourcesRequest(DocumentsConvertBase):
file_sources: list[FileSource]
class HttpSourceRequest(HttpSource):
kind: Literal["http"] = "http"
ConvertDocumentsRequest = Union[
ConvertDocumentFileSourcesRequest, ConvertDocumentHttpSourcesRequest
## Multipart targets
class TargetName(str, enum.Enum):
INBODY = InBodyTarget().kind
ZIP = ZipTarget().kind
## Aliases
SourceRequestItem = Annotated[
FileSourceRequest | HttpSourceRequest, Field(discriminator="kind")
]
## Complete Source request
class ConvertDocumentsRequest(BaseModel):
options: ConvertDocumentsRequestOptions = ConvertDocumentsRequestOptions()
sources: list[SourceRequestItem]
target: TaskTarget = InBodyTarget()

View File

@@ -241,7 +241,7 @@ def wait_task_finish(task_id: str, return_as_file: bool):
while not task_finished:
try:
response = httpx.get(
f"{get_api_endpoint()}/v1alpha/status/poll/{task_id}?wait=5",
f"{get_api_endpoint()}/v1/status/poll/{task_id}?wait=5",
verify=ssl_ctx,
timeout=15,
)
@@ -264,7 +264,7 @@ def wait_task_finish(task_id: str, return_as_file: bool):
if conversion_sucess:
try:
response = httpx.get(
f"{get_api_endpoint()}/v1alpha/result/{task_id}",
f"{get_api_endpoint()}/v1/result/{task_id}",
timeout=15,
verify=ssl_ctx,
)
@@ -296,8 +296,11 @@ def process_url(
do_picture_classification,
do_picture_description,
):
target = {"kind": "zip" if return_as_file else "inbody"}
parameters = {
"http_sources": [{"url": source} for source in input_sources.split(",")],
"sources": [
{"kind": "http", "url": source} for source in input_sources.split(",")
],
"options": {
"to_formats": to_formats,
"image_export_mode": image_export_mode,
@@ -309,24 +312,24 @@ def process_url(
"pdf_backend": pdf_backend,
"table_mode": table_mode,
"abort_on_error": abort_on_error,
"return_as_file": return_as_file,
"do_code_enrichment": do_code_enrichment,
"do_formula_enrichment": do_formula_enrichment,
"do_picture_classification": do_picture_classification,
"do_picture_description": do_picture_description,
},
"target": target,
}
if (
not parameters["http_sources"]
or len(parameters["http_sources"]) == 0
or parameters["http_sources"][0]["url"] == ""
not parameters["sources"]
or len(parameters["sources"]) == 0
or parameters["sources"][0]["url"] == ""
):
logger.error("No input sources provided.")
raise gr.Error("No input sources provided.", print_exception=False)
try:
ssl_ctx = get_ssl_context()
response = httpx.post(
f"{get_api_endpoint()}/v1alpha/convert/source/async",
f"{get_api_endpoint()}/v1/convert/source/async",
json=parameters,
verify=ssl_ctx,
timeout=60,
@@ -372,11 +375,13 @@ def process_file(
logger.error("No files provided.")
raise gr.Error("No files provided.", print_exception=False)
files_data = [
{"base64_string": file_to_base64(file), "filename": file.name} for file in files
{"kind": "file", "base64_string": file_to_base64(file), "filename": file.name}
for file in files
]
target = {"kind": "zip" if return_as_file else "inbody"}
parameters = {
"file_sources": files_data,
"sources": files_data,
"options": {
"to_formats": to_formats,
"image_export_mode": image_export_mode,
@@ -394,12 +399,13 @@ def process_file(
"do_picture_classification": do_picture_classification,
"do_picture_description": do_picture_description,
},
"target": target,
}
try:
ssl_ctx = get_ssl_context()
response = httpx.post(
f"{get_api_endpoint()}/v1alpha/convert/source/async",
f"{get_api_endpoint()}/v1/convert/source/async",
json=parameters,
verify=ssl_ctx,
timeout=60,

View File

@@ -15,6 +15,7 @@ from docling.datamodel.document import ConversionResult, ConversionStatus
from docling_core.types.doc import ImageRefMode
from docling_jobkit.datamodel.convert import ConvertDocumentsOptions
from docling_jobkit.datamodel.task import Task
from docling_jobkit.datamodel.task_targets import InBodyTarget, TaskTarget
from docling_jobkit.orchestrators.base_orchestrator import (
BaseOrchestrator,
)
@@ -139,6 +140,7 @@ def _export_documents_as_files(
def process_results(
conversion_options: ConvertDocumentsOptions,
target: TaskTarget,
conv_results: Iterable[ConversionResult],
work_dir: Path,
) -> Union[ConvertDocumentResponse, FileResponse]:
@@ -175,7 +177,7 @@ def process_results(
export_doctags = OutputFormat.DOCTAGS in conversion_options.to_formats
# Only 1 document was processed, and we are not returning it as a file
if len(conv_results) == 1 and not conversion_options.return_as_file:
if len(conv_results) == 1 and isinstance(target, InBodyTarget):
conv_res = conv_results[0]
document = _export_document_as_content(
conv_res,
@@ -252,6 +254,7 @@ async def prepare_response(
work_dir = get_scratch() / task.task_id
response = process_results(
conversion_options=task.options,
target=task.target,
conv_results=task.results,
work_dir=work_dir,
)