mirror of
https://github.com/docling-project/docling-serve.git
synced 2025-11-29 08:33:50 +00:00
322 lines
11 KiB
Python
322 lines
11 KiB
Python
import asyncio
|
|
import logging
|
|
import os
|
|
import shutil
|
|
import time
|
|
from collections.abc import Iterable
|
|
from pathlib import Path
|
|
from typing import Union
|
|
|
|
import httpx
|
|
from fastapi import BackgroundTasks, HTTPException
|
|
from fastapi.responses import FileResponse
|
|
|
|
from docling.datamodel.base_models import OutputFormat
|
|
from docling.datamodel.document import ConversionResult, ConversionStatus
|
|
from docling_core.types.doc import ImageRefMode
|
|
from docling_jobkit.datamodel.convert import ConvertDocumentsOptions
|
|
from docling_jobkit.datamodel.task import Task
|
|
from docling_jobkit.datamodel.task_targets import InBodyTarget, PutTarget, TaskTarget
|
|
from docling_jobkit.orchestrators.base_orchestrator import (
|
|
BaseOrchestrator,
|
|
)
|
|
|
|
from docling_serve.datamodel.responses import (
|
|
ConvertDocumentResponse,
|
|
DocumentResponse,
|
|
PresignedUrlConvertDocumentResponse,
|
|
)
|
|
from docling_serve.settings import docling_serve_settings
|
|
from docling_serve.storage import get_scratch
|
|
|
|
_log = logging.getLogger(__name__)
|
|
|
|
|
|
def _export_document_as_content(
|
|
conv_res: ConversionResult,
|
|
export_json: bool,
|
|
export_html: bool,
|
|
export_md: bool,
|
|
export_txt: bool,
|
|
export_doctags: bool,
|
|
image_mode: ImageRefMode,
|
|
md_page_break_placeholder: str,
|
|
):
|
|
document = DocumentResponse(filename=conv_res.input.file.name)
|
|
|
|
if conv_res.status == ConversionStatus.SUCCESS:
|
|
new_doc = conv_res.document._make_copy_with_refmode(
|
|
Path(), image_mode, page_no=None
|
|
)
|
|
|
|
# Create the different formats
|
|
if export_json:
|
|
document.json_content = new_doc
|
|
if export_html:
|
|
document.html_content = new_doc.export_to_html(image_mode=image_mode)
|
|
if export_txt:
|
|
document.text_content = new_doc.export_to_markdown(
|
|
strict_text=True,
|
|
image_mode=image_mode,
|
|
)
|
|
if export_md:
|
|
document.md_content = new_doc.export_to_markdown(
|
|
image_mode=image_mode,
|
|
page_break_placeholder=md_page_break_placeholder or None,
|
|
)
|
|
if export_doctags:
|
|
document.doctags_content = new_doc.export_to_doctags()
|
|
elif conv_res.status == ConversionStatus.SKIPPED:
|
|
raise HTTPException(status_code=400, detail=conv_res.errors)
|
|
else:
|
|
raise HTTPException(status_code=500, detail=conv_res.errors)
|
|
|
|
return document
|
|
|
|
|
|
def _export_documents_as_files(
|
|
conv_results: Iterable[ConversionResult],
|
|
output_dir: Path,
|
|
export_json: bool,
|
|
export_html: bool,
|
|
export_md: bool,
|
|
export_txt: bool,
|
|
export_doctags: bool,
|
|
image_export_mode: ImageRefMode,
|
|
md_page_break_placeholder: str,
|
|
) -> ConversionStatus:
|
|
success_count = 0
|
|
failure_count = 0
|
|
|
|
# Default failure in case results is empty
|
|
conv_result = ConversionStatus.FAILURE
|
|
|
|
artifacts_dir = Path("artifacts/") # will be relative to the fname
|
|
|
|
for conv_res in conv_results:
|
|
conv_result = conv_res.status
|
|
if conv_res.status == ConversionStatus.SUCCESS:
|
|
success_count += 1
|
|
doc_filename = conv_res.input.file.stem
|
|
|
|
# Export JSON format:
|
|
if export_json:
|
|
fname = output_dir / f"{doc_filename}.json"
|
|
_log.info(f"writing JSON output to {fname}")
|
|
conv_res.document.save_as_json(
|
|
filename=fname,
|
|
image_mode=image_export_mode,
|
|
artifacts_dir=artifacts_dir,
|
|
)
|
|
|
|
# Export HTML format:
|
|
if export_html:
|
|
fname = output_dir / f"{doc_filename}.html"
|
|
_log.info(f"writing HTML output to {fname}")
|
|
conv_res.document.save_as_html(
|
|
filename=fname,
|
|
image_mode=image_export_mode,
|
|
artifacts_dir=artifacts_dir,
|
|
)
|
|
|
|
# Export Text format:
|
|
if export_txt:
|
|
fname = output_dir / f"{doc_filename}.txt"
|
|
_log.info(f"writing TXT output to {fname}")
|
|
conv_res.document.save_as_markdown(
|
|
filename=fname,
|
|
strict_text=True,
|
|
image_mode=ImageRefMode.PLACEHOLDER,
|
|
)
|
|
|
|
# Export Markdown format:
|
|
if export_md:
|
|
fname = output_dir / f"{doc_filename}.md"
|
|
_log.info(f"writing Markdown output to {fname}")
|
|
conv_res.document.save_as_markdown(
|
|
filename=fname,
|
|
artifacts_dir=artifacts_dir,
|
|
image_mode=image_export_mode,
|
|
page_break_placeholder=md_page_break_placeholder or None,
|
|
)
|
|
|
|
# Export Document Tags format:
|
|
if export_doctags:
|
|
fname = output_dir / f"{doc_filename}.doctags"
|
|
_log.info(f"writing Doc Tags output to {fname}")
|
|
conv_res.document.save_as_doctags(filename=fname)
|
|
|
|
else:
|
|
_log.warning(f"Document {conv_res.input.file} failed to convert.")
|
|
failure_count += 1
|
|
|
|
_log.info(
|
|
f"Processed {success_count + failure_count} docs, "
|
|
f"of which {failure_count} failed"
|
|
)
|
|
return conv_result
|
|
|
|
|
|
def process_results(
|
|
conversion_options: ConvertDocumentsOptions,
|
|
target: TaskTarget,
|
|
conv_results: Iterable[ConversionResult],
|
|
work_dir: Path,
|
|
) -> Union[ConvertDocumentResponse, FileResponse, PresignedUrlConvertDocumentResponse]:
|
|
# Let's start by processing the documents
|
|
try:
|
|
start_time = time.monotonic()
|
|
|
|
# Convert the iterator to a list to count the number of results and get timings
|
|
# As it's an iterator (lazy evaluation), it will also start the conversion
|
|
conv_results = list(conv_results)
|
|
|
|
processing_time = time.monotonic() - start_time
|
|
|
|
_log.info(
|
|
f"Processed {len(conv_results)} docs in {processing_time:.2f} seconds."
|
|
)
|
|
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
if len(conv_results) == 0:
|
|
raise HTTPException(
|
|
status_code=500, detail="No documents were generated by Docling."
|
|
)
|
|
|
|
# We have some results, let's prepare the response
|
|
response: Union[
|
|
FileResponse, ConvertDocumentResponse, PresignedUrlConvertDocumentResponse
|
|
]
|
|
|
|
# Booleans to know what to export
|
|
export_json = OutputFormat.JSON in conversion_options.to_formats
|
|
export_html = OutputFormat.HTML in conversion_options.to_formats
|
|
export_md = OutputFormat.MARKDOWN in conversion_options.to_formats
|
|
export_txt = OutputFormat.TEXT in conversion_options.to_formats
|
|
export_doctags = OutputFormat.DOCTAGS in conversion_options.to_formats
|
|
|
|
# Only 1 document was processed, and we are not returning it as a file
|
|
if len(conv_results) == 1 and isinstance(target, InBodyTarget):
|
|
conv_res = conv_results[0]
|
|
document = _export_document_as_content(
|
|
conv_res,
|
|
export_json=export_json,
|
|
export_html=export_html,
|
|
export_md=export_md,
|
|
export_txt=export_txt,
|
|
export_doctags=export_doctags,
|
|
image_mode=conversion_options.image_export_mode,
|
|
md_page_break_placeholder=conversion_options.md_page_break_placeholder,
|
|
)
|
|
|
|
response = ConvertDocumentResponse(
|
|
document=document,
|
|
status=conv_res.status,
|
|
processing_time=processing_time,
|
|
timings=conv_res.timings,
|
|
)
|
|
|
|
# Multiple documents were processed, or we are forced returning as a file
|
|
else:
|
|
# Temporary directory to store the outputs
|
|
output_dir = work_dir / "output"
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Worker pid to use in archive identification as we may have multiple workers
|
|
os.getpid()
|
|
|
|
# Export the documents
|
|
conv_result = _export_documents_as_files(
|
|
conv_results=conv_results,
|
|
output_dir=output_dir,
|
|
export_json=export_json,
|
|
export_html=export_html,
|
|
export_md=export_md,
|
|
export_txt=export_txt,
|
|
export_doctags=export_doctags,
|
|
image_export_mode=conversion_options.image_export_mode,
|
|
md_page_break_placeholder=conversion_options.md_page_break_placeholder,
|
|
)
|
|
|
|
files = os.listdir(output_dir)
|
|
if len(files) == 0:
|
|
raise HTTPException(status_code=500, detail="No documents were exported.")
|
|
|
|
file_path = work_dir / "converted_docs.zip"
|
|
shutil.make_archive(
|
|
base_name=str(file_path.with_suffix("")),
|
|
format="zip",
|
|
root_dir=output_dir,
|
|
)
|
|
|
|
# Other cleanups after the response is sent
|
|
# Output directory
|
|
# background_tasks.add_task(shutil.rmtree, work_dir, ignore_errors=True)
|
|
|
|
if isinstance(target, PutTarget):
|
|
try:
|
|
with open(file_path, "rb") as file_data:
|
|
r = httpx.put(str(target.url), files={"file": file_data})
|
|
r.raise_for_status()
|
|
response = PresignedUrlConvertDocumentResponse(
|
|
status=conv_result,
|
|
processing_time=processing_time,
|
|
)
|
|
except Exception as exc:
|
|
_log.error("An error occour while uploading zip to s3", exc_info=exc)
|
|
raise HTTPException(
|
|
status_code=500, detail="An error occour while uploading zip to s3."
|
|
)
|
|
else:
|
|
response = FileResponse(
|
|
file_path, filename=file_path.name, media_type="application/zip"
|
|
)
|
|
|
|
return response
|
|
|
|
|
|
async def prepare_response(
|
|
task: Task, orchestrator: BaseOrchestrator, background_tasks: BackgroundTasks
|
|
):
|
|
if task.results is None:
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail="Task result not found. Please wait for a completion status.",
|
|
)
|
|
assert task.options is not None
|
|
|
|
work_dir = get_scratch() / task.task_id
|
|
response = process_results(
|
|
conversion_options=task.options,
|
|
target=task.target,
|
|
conv_results=task.results,
|
|
work_dir=work_dir,
|
|
)
|
|
|
|
if work_dir.exists():
|
|
task.scratch_dir = work_dir
|
|
if not isinstance(response, FileResponse):
|
|
_log.warning(
|
|
f"Task {task.task_id=} produced content in {work_dir=} but the response is not a file."
|
|
)
|
|
shutil.rmtree(work_dir, ignore_errors=True)
|
|
|
|
if docling_serve_settings.single_use_results:
|
|
if task.scratch_dir is not None:
|
|
background_tasks.add_task(
|
|
shutil.rmtree, task.scratch_dir, ignore_errors=True
|
|
)
|
|
|
|
async def _remove_task_impl():
|
|
await asyncio.sleep(docling_serve_settings.result_removal_delay)
|
|
await orchestrator.delete_task(task_id=task.task_id)
|
|
|
|
async def _remove_task():
|
|
asyncio.create_task(_remove_task_impl()) # noqa: RUF006
|
|
|
|
background_tasks.add_task(_remove_task)
|
|
|
|
return response
|