diff --git a/docling_serve/app.py b/docling_serve/app.py index 6f1fad3..f35014b 100644 --- a/docling_serve/app.py +++ b/docling_serve/app.py @@ -1,11 +1,11 @@ import asyncio import importlib.metadata import logging -import tempfile +import shutil +import time from contextlib import asynccontextmanager from io import BytesIO -from pathlib import Path -from typing import Annotated, Any, Optional, Union +from typing import Annotated from fastapi import ( BackgroundTasks, @@ -35,6 +35,7 @@ from docling_serve.datamodel.callback import ( from docling_serve.datamodel.convert import ConvertDocumentsOptions from docling_serve.datamodel.requests import ( ConvertDocumentFileSourcesRequest, + ConvertDocumentHttpSourcesRequest, ConvertDocumentsRequest, ) from docling_serve.datamodel.responses import ( @@ -44,11 +45,7 @@ from docling_serve.datamodel.responses import ( TaskStatusResponse, WebsocketMessage, ) -from docling_serve.docling_conversion import ( - convert_documents, - get_converter, - get_pdf_pipeline_opts, -) +from docling_serve.datamodel.task import Task, TaskSource from docling_serve.engines.async_orchestrator import ( BaseAsyncOrchestrator, ProgressInvalid, @@ -56,8 +53,8 @@ from docling_serve.engines.async_orchestrator import ( from docling_serve.engines.async_orchestrator_factory import get_async_orchestrator from docling_serve.engines.base_orchestrator import TaskNotFoundError from docling_serve.helper_functions import FormDepends -from docling_serve.response_preparation import process_results from docling_serve.settings import docling_serve_settings +from docling_serve.storage import get_scratch # Set up custom logging as we'll be intermixes with FastAPI/Uvicorn's logging @@ -95,11 +92,11 @@ _log = logging.getLogger(__name__) # Context manager to initialize and clean up the lifespan of the FastAPI app @asynccontextmanager async def lifespan(app: FastAPI): - # Converter with default options - pdf_format_option = get_pdf_pipeline_opts(ConvertDocumentsOptions()) - get_converter(pdf_format_option) - orchestrator = get_async_orchestrator() + scratch_dir = get_scratch() + + # Warm up processing cache + await orchestrator.warm_up_caches() # Start the background queue processor queue_task = asyncio.create_task(orchestrator.process_queue()) @@ -113,6 +110,10 @@ async def lifespan(app: FastAPI): except asyncio.CancelledError: _log.info("Queue processor cancelled.") + # Remove scratch directory in case it was a tempfile + if docling_serve_settings.scratch_path is not None: + shutil.rmtree(scratch_dir, ignore_errors=True) + ################################## # App creation and configuration # @@ -162,7 +163,8 @@ def create_app(): # noqa: C901 from docling_serve.gradio_ui import ui as gradio_ui - tmp_output_dir = Path(tempfile.mkdtemp()) + tmp_output_dir = get_scratch() / "gradio" + tmp_output_dir.mkdir(exist_ok=True, parents=True) gradio_ui.gradio_output_dir = tmp_output_dir app = gr.mount_gradio_app( app, @@ -210,6 +212,56 @@ def create_app(): # noqa: C901 redoc_js_url="/static/redoc.standalone.js", ) + ######################## + # Async / Sync helpers # + ######################## + + async def _enque_source( + orchestrator: BaseAsyncOrchestrator, conversion_request: ConvertDocumentsRequest + ) -> Task: + sources: list[TaskSource] = [] + if isinstance(conversion_request, ConvertDocumentFileSourcesRequest): + sources.extend(conversion_request.file_sources) + if isinstance(conversion_request, ConvertDocumentHttpSourcesRequest): + sources.extend(conversion_request.http_sources) + + task = await orchestrator.enqueue( + sources=sources, options=conversion_request.options + ) + return task + + async def _enque_file( + orchestrator: BaseAsyncOrchestrator, + files: list[UploadFile], + options: ConvertDocumentsOptions, + ) -> Task: + _log.info(f"Received {len(files)} files for processing.") + + # Load the uploaded files to Docling DocumentStream + file_sources: list[TaskSource] = [] + for i, file in enumerate(files): + buf = BytesIO(file.file.read()) + suffix = "" if len(file_sources) == 1 else f"_{i}" + name = file.filename if file.filename else f"file{suffix}.pdf" + file_sources.append(DocumentStream(name=name, stream=buf)) + + task = await orchestrator.enqueue(sources=file_sources, options=options) + return task + + async def _wait_task_complete( + orchestrator: BaseAsyncOrchestrator, task_id: str + ) -> bool: + MAX_WAIT = 120 + start_time = time.monotonic() + while True: + task = await orchestrator.task_status(task_id=task_id) + if task.is_completed(): + return True + await asyncio.sleep(5) + elapsed_time = time.monotonic() - start_time + if elapsed_time > MAX_WAIT: + return False + ############################# # API Endpoints definitions # ############################# @@ -243,33 +295,33 @@ def create_app(): # noqa: C901 } }, ) - def process_url( - background_tasks: BackgroundTasks, conversion_request: ConvertDocumentsRequest + async def process_url( + background_tasks: BackgroundTasks, + orchestrator: Annotated[BaseAsyncOrchestrator, Depends(get_async_orchestrator)], + conversion_request: ConvertDocumentsRequest, ): - sources: list[Union[str, DocumentStream]] = [] - headers: Optional[dict[str, Any]] = None - if isinstance(conversion_request, ConvertDocumentFileSourcesRequest): - for file_source in conversion_request.file_sources: - sources.append(file_source.to_document_stream()) - else: - for http_source in conversion_request.http_sources: - sources.append(http_source.url) - if headers is None and http_source.headers: - headers = http_source.headers - - # Note: results are only an iterator->lazy evaluation - results = convert_documents( - sources=sources, options=conversion_request.options, headers=headers + task = await _enque_source( + orchestrator=orchestrator, conversion_request=conversion_request + ) + success = await _wait_task_complete( + orchestrator=orchestrator, task_id=task.task_id ) - # The real processing will happen here - response = process_results( - background_tasks=background_tasks, - conversion_options=conversion_request.options, - conv_results=results, - ) + if not success: + # TODO: abort task! + return HTTPException( + status_code=504, detail="Conversion is taking too long." + ) - return response + result = await orchestrator.task_result( + task_id=task.task_id, background_tasks=background_tasks + ) + if result is None: + raise HTTPException( + status_code=404, + detail="Task result not found. Please wait for a completion status.", + ) + return result # Convert a document from file(s) @app.post( @@ -283,29 +335,34 @@ def create_app(): # noqa: C901 ) async def process_file( background_tasks: BackgroundTasks, + orchestrator: Annotated[BaseAsyncOrchestrator, Depends(get_async_orchestrator)], files: list[UploadFile], options: Annotated[ ConvertDocumentsOptions, FormDepends(ConvertDocumentsOptions) ], ): - _log.info(f"Received {len(files)} files for processing.") - - # Load the uploaded files to Docling DocumentStream - file_sources = [] - for file in files: - buf = BytesIO(file.file.read()) - name = file.filename if file.filename else "file.pdf" - file_sources.append(DocumentStream(name=name, stream=buf)) - - results = convert_documents(sources=file_sources, options=options) - - response = process_results( - background_tasks=background_tasks, - conversion_options=options, - conv_results=results, + task = await _enque_file( + orchestrator=orchestrator, files=files, options=options + ) + success = await _wait_task_complete( + orchestrator=orchestrator, task_id=task.task_id ) - return response + if not success: + # TODO: abort task! + return HTTPException( + status_code=504, detail="Conversion is taking too long." + ) + + result = await orchestrator.task_result( + task_id=task.task_id, background_tasks=background_tasks + ) + if result is None: + raise HTTPException( + status_code=404, + detail="Task result not found. Please wait for a completion status.", + ) + return result # Convert a document from URL(s) using the async api @app.post( @@ -316,7 +373,35 @@ def create_app(): # noqa: C901 orchestrator: Annotated[BaseAsyncOrchestrator, Depends(get_async_orchestrator)], conversion_request: ConvertDocumentsRequest, ): - task = await orchestrator.enqueue(request=conversion_request) + task = await _enque_source( + orchestrator=orchestrator, conversion_request=conversion_request + ) + task_queue_position = await orchestrator.get_queue_position( + task_id=task.task_id + ) + return TaskStatusResponse( + task_id=task.task_id, + task_status=task.task_status, + task_position=task_queue_position, + task_meta=task.processing_meta, + ) + + # Convert a document from file(s) using the async api + @app.post( + "/v1alpha/convert/file/async", + response_model=TaskStatusResponse, + ) + async def process_file_async( + orchestrator: Annotated[BaseAsyncOrchestrator, Depends(get_async_orchestrator)], + background_tasks: BackgroundTasks, + files: list[UploadFile], + options: Annotated[ + ConvertDocumentsOptions, FormDepends(ConvertDocumentsOptions) + ], + ): + task = await _enque_file( + orchestrator=orchestrator, files=files, options=options + ) task_queue_position = await orchestrator.get_queue_position( task_id=task.task_id ) @@ -426,9 +511,12 @@ def create_app(): # noqa: C901 ) async def task_result( orchestrator: Annotated[BaseAsyncOrchestrator, Depends(get_async_orchestrator)], + background_tasks: BackgroundTasks, task_id: str, ): - result = await orchestrator.task_result(task_id=task_id) + result = await orchestrator.task_result( + task_id=task_id, background_tasks=background_tasks + ) if result is None: raise HTTPException( status_code=404, diff --git a/docling_serve/datamodel/requests.py b/docling_serve/datamodel/requests.py index 864254d..984438c 100644 --- a/docling_serve/datamodel/requests.py +++ b/docling_serve/datamodel/requests.py @@ -2,7 +2,7 @@ import base64 from io import BytesIO from typing import Annotated, Any, Union -from pydantic import BaseModel, Field +from pydantic import AnyHttpUrl, BaseModel, Field from docling.datamodel.base_models import DocumentStream @@ -15,7 +15,7 @@ class DocumentsConvertBase(BaseModel): class HttpSource(BaseModel): url: Annotated[ - str, + AnyHttpUrl, Field( description="HTTP url to process", examples=["https://arxiv.org/pdf/2206.01062"], diff --git a/docling_serve/datamodel/task.py b/docling_serve/datamodel/task.py index 9a180c2..e76014d 100644 --- a/docling_serve/datamodel/task.py +++ b/docling_serve/datamodel/task.py @@ -1,18 +1,29 @@ -from typing import Optional +from pathlib import Path +from typing import Optional, Union -from pydantic import BaseModel +from fastapi.responses import FileResponse +from pydantic import BaseModel, ConfigDict +from docling.datamodel.base_models import DocumentStream + +from docling_serve.datamodel.convert import ConvertDocumentsOptions from docling_serve.datamodel.engines import TaskStatus -from docling_serve.datamodel.requests import ConvertDocumentsRequest +from docling_serve.datamodel.requests import FileSource, HttpSource from docling_serve.datamodel.responses import ConvertDocumentResponse from docling_serve.datamodel.task_meta import TaskProcessingMeta +TaskSource = Union[HttpSource, FileSource, DocumentStream] + class Task(BaseModel): + model_config = ConfigDict(arbitrary_types_allowed=True) + task_id: str task_status: TaskStatus = TaskStatus.PENDING - request: Optional[ConvertDocumentsRequest] - result: Optional[ConvertDocumentResponse] = None + sources: list[TaskSource] = [] + options: Optional[ConvertDocumentsOptions] + result: Optional[Union[ConvertDocumentResponse, FileResponse]] = None + scratch_dir: Optional[Path] = None processing_meta: Optional[TaskProcessingMeta] = None def is_completed(self) -> bool: diff --git a/docling_serve/engines/async_kfp/orchestrator.py b/docling_serve/engines/async_kfp/orchestrator.py index 66014d3..a6e15a1 100644 --- a/docling_serve/engines/async_kfp/orchestrator.py +++ b/docling_serve/engines/async_kfp/orchestrator.py @@ -14,10 +14,11 @@ from docling_serve.datamodel.callback import ( ProgressSetNumDocs, ProgressUpdateProcessed, ) +from docling_serve.datamodel.convert import ConvertDocumentsOptions from docling_serve.datamodel.engines import TaskStatus from docling_serve.datamodel.kfp import CallbackSpec -from docling_serve.datamodel.requests import ConvertDocumentsRequest -from docling_serve.datamodel.task import Task +from docling_serve.datamodel.requests import HttpSource +from docling_serve.datamodel.task import Task, TaskSource from docling_serve.datamodel.task_meta import TaskProcessingMeta from docling_serve.engines.async_kfp.kfp_pipeline import process from docling_serve.engines.async_orchestrator import ( @@ -71,7 +72,9 @@ class AsyncKfpOrchestrator(BaseAsyncOrchestrator): # verify_ssl=False, ) - async def enqueue(self, request: ConvertDocumentsRequest) -> Task: + async def enqueue( + self, sources: list[TaskSource], options: ConvertDocumentsOptions + ) -> Task: callbacks = [] if docling_serve_settings.eng_kfp_self_callback_endpoint is not None: headers = {} @@ -92,6 +95,8 @@ class AsyncKfpOrchestrator(BaseAsyncOrchestrator): ) CallbacksType = TypeAdapter(list[CallbackSpec]) + SourcesListType = TypeAdapter(list[HttpSource]) + http_sources = [s for s in sources if isinstance(s, HttpSource)] # hack: since the current kfp backend is not resolving the job_id placeholder, # we set the run_name and pass it as argument to the job itself. run_name = f"docling-job-{uuid.uuid4()}" @@ -99,7 +104,8 @@ class AsyncKfpOrchestrator(BaseAsyncOrchestrator): process, arguments={ "batch_size": 10, - "request": request.model_dump(mode="json"), + "sources": SourcesListType.dump_python(http_sources, mode="json"), + "options": options.model_dump(mode="json"), "callbacks": CallbacksType.dump_python(callbacks, mode="json"), "run_name": run_name, }, @@ -107,7 +113,7 @@ class AsyncKfpOrchestrator(BaseAsyncOrchestrator): ) task_id = kfp_run.run_id - task = Task(task_id=task_id, request=request) + task = Task(task_id=task_id, sources=sources, options=options) await self.init_task_tracking(task) return task @@ -185,6 +191,9 @@ class AsyncKfpOrchestrator(BaseAsyncOrchestrator): async def process_queue(self): return + async def warm_up_caches(self): + return + async def _get_run_id(self, run_name: str) -> str: res = self._client.list_runs( filter=json.dumps( diff --git a/docling_serve/engines/async_local/orchestrator.py b/docling_serve/engines/async_local/orchestrator.py index 63c58f4..51a8892 100644 --- a/docling_serve/engines/async_local/orchestrator.py +++ b/docling_serve/engines/async_local/orchestrator.py @@ -3,8 +3,9 @@ import logging import uuid from typing import Optional -from docling_serve.datamodel.requests import ConvertDocumentsRequest -from docling_serve.datamodel.task import Task +from docling_serve.datamodel.convert import ConvertDocumentsOptions +from docling_serve.datamodel.task import Task, TaskSource +from docling_serve.docling_conversion import get_converter, get_pdf_pipeline_opts from docling_serve.engines.async_local.worker import AsyncLocalWorker from docling_serve.engines.async_orchestrator import BaseAsyncOrchestrator from docling_serve.settings import docling_serve_settings @@ -18,9 +19,11 @@ class AsyncLocalOrchestrator(BaseAsyncOrchestrator): self.task_queue = asyncio.Queue() self.queue_list: list[str] = [] - async def enqueue(self, request: ConvertDocumentsRequest) -> Task: + async def enqueue( + self, sources: list[TaskSource], options: ConvertDocumentsOptions + ) -> Task: task_id = str(uuid.uuid4()) - task = Task(task_id=task_id, request=request) + task = Task(task_id=task_id, sources=sources, options=options) await self.init_task_tracking(task) self.queue_list.append(task_id) @@ -47,3 +50,8 @@ class AsyncLocalOrchestrator(BaseAsyncOrchestrator): # Wait for all workers to complete (they won't, as they run indefinitely) await asyncio.gather(*workers) _log.debug("All workers completed.") + + async def warm_up_caches(self): + # Converter with default options + pdf_format_option = get_pdf_pipeline_opts(ConvertDocumentsOptions()) + get_converter(pdf_format_option) diff --git a/docling_serve/engines/async_local/worker.py b/docling_serve/engines/async_local/worker.py index f19f33c..bd89671 100644 --- a/docling_serve/engines/async_local/worker.py +++ b/docling_serve/engines/async_local/worker.py @@ -1,17 +1,18 @@ import asyncio import logging +import shutil import time from typing import TYPE_CHECKING, Any, Optional, Union -from fastapi import BackgroundTasks +from fastapi.responses import FileResponse from docling.datamodel.base_models import DocumentStream from docling_serve.datamodel.engines import TaskStatus -from docling_serve.datamodel.requests import ConvertDocumentFileSourcesRequest -from docling_serve.datamodel.responses import ConvertDocumentResponse +from docling_serve.datamodel.requests import FileSource, HttpSource from docling_serve.docling_conversion import convert_documents from docling_serve.response_preparation import process_results +from docling_serve.storage import get_scratch if TYPE_CHECKING: from docling_serve.engines.async_local.orchestrator import AsyncLocalOrchestrator @@ -44,59 +45,66 @@ class AsyncLocalWorker: # Notify clients about queue updates await self.orchestrator.notify_queue_positions() - # Get the current event loop - asyncio.get_event_loop() - # Define a callback function to send progress updates to the client. # TODO: send partial updates, e.g. when a document in the batch is done def run_conversion(): - sources: list[Union[str, DocumentStream]] = [] + convert_sources: list[Union[str, DocumentStream]] = [] headers: Optional[dict[str, Any]] = None - if isinstance(task.request, ConvertDocumentFileSourcesRequest): - for file_source in task.request.file_sources: - sources.append(file_source.to_document_stream()) - else: - for http_source in task.request.http_sources: - sources.append(http_source.url) - if headers is None and http_source.headers: - headers = http_source.headers + for source in task.sources: + if isinstance(source, DocumentStream): + convert_sources.append(source) + elif isinstance(source, FileSource): + convert_sources.append(source.to_document_stream()) + elif isinstance(source, HttpSource): + convert_sources.append(str(source.url)) + if headers is None and source.headers: + headers = source.headers # Note: results are only an iterator->lazy evaluation results = convert_documents( - sources=sources, - options=task.request.options, + sources=convert_sources, + options=task.options, headers=headers, ) # The real processing will happen here + work_dir = get_scratch() / task_id response = process_results( - background_tasks=BackgroundTasks(), - conversion_options=task.request.options, + conversion_options=task.options, conv_results=results, + work_dir=work_dir, ) + if work_dir.exists(): + task.scratch_dir = work_dir + if not isinstance(response, FileResponse): + _log.warning( + f"Task {task_id=} produced content in {work_dir=} but the response is not a file." + ) + shutil.rmtree(work_dir, ignore_errors=True) + return response - # Run the prediction in a thread to avoid blocking the event loop. start_time = time.monotonic() + + # Run the prediction in a thread to avoid blocking the event loop. + # Get the current event loop + # loop = asyncio.get_event_loop() # future = asyncio.run_coroutine_threadsafe( # run_conversion(), # loop=loop # ) # response = future.result() + # Run in a thread response = await asyncio.to_thread( run_conversion, ) processing_time = time.monotonic() - start_time - if not isinstance(response, ConvertDocumentResponse): - _log.error( - f"Worker {self.worker_id} got un-processable " - "result for {task_id}: {type(response)}" - ) task.result = response - task.request = None + task.sources = [] + task.options = None task.task_status = TaskStatus.SUCCESS _log.info( diff --git a/docling_serve/engines/async_orchestrator.py b/docling_serve/engines/async_orchestrator.py index 398849a..5e26911 100644 --- a/docling_serve/engines/async_orchestrator.py +++ b/docling_serve/engines/async_orchestrator.py @@ -1,8 +1,13 @@ -from fastapi import WebSocket +import shutil +from typing import Union + +from fastapi import BackgroundTasks, WebSocket +from fastapi.responses import FileResponse from docling_serve.datamodel.callback import ProgressCallbackRequest from docling_serve.datamodel.engines import TaskStatus from docling_serve.datamodel.responses import ( + ConvertDocumentResponse, MessageKind, TaskStatusResponse, WebsocketMessage, @@ -13,6 +18,7 @@ from docling_serve.engines.base_orchestrator import ( OrchestratorError, TaskNotFoundError, ) +from docling_serve.settings import docling_serve_settings class ProgressInvalid(OrchestratorError): @@ -37,8 +43,15 @@ class BaseAsyncOrchestrator(BaseOrchestrator): async def task_status(self, task_id: str, wait: float = 0.0) -> Task: return await self.get_raw_task(task_id=task_id) - async def task_result(self, task_id: str): + async def task_result( + self, task_id: str, background_tasks: BackgroundTasks + ) -> Union[ConvertDocumentResponse, FileResponse, None]: task = await self.get_raw_task(task_id=task_id) + if task.is_completed() and task.scratch_dir is not None: + if docling_serve_settings.single_use_results: + background_tasks.add_task( + shutil.rmtree, task.scratch_dir, ignore_errors=True + ) return task.result async def notify_task_subscribers(self, task_id: str): diff --git a/docling_serve/engines/base_orchestrator.py b/docling_serve/engines/base_orchestrator.py index e0a6422..72421a4 100644 --- a/docling_serve/engines/base_orchestrator.py +++ b/docling_serve/engines/base_orchestrator.py @@ -1,8 +1,12 @@ from abc import ABC, abstractmethod -from typing import Optional +from typing import Optional, Union -from docling_serve.datamodel.requests import ConvertDocumentsRequest -from docling_serve.datamodel.task import Task +from fastapi import BackgroundTasks +from fastapi.responses import FileResponse + +from docling_serve.datamodel.convert import ConvertDocumentsOptions +from docling_serve.datamodel.responses import ConvertDocumentResponse +from docling_serve.datamodel.task import Task, TaskSource class OrchestratorError(Exception): @@ -15,7 +19,9 @@ class TaskNotFoundError(OrchestratorError): class BaseOrchestrator(ABC): @abstractmethod - async def enqueue(self, request: ConvertDocumentsRequest) -> Task: + async def enqueue( + self, sources: list[TaskSource], options: ConvertDocumentsOptions + ) -> Task: pass @abstractmethod @@ -31,9 +37,15 @@ class BaseOrchestrator(ABC): pass @abstractmethod - async def task_result(self, task_id: str): + async def task_result( + self, task_id: str, background_tasks: BackgroundTasks + ) -> Union[ConvertDocumentResponse, FileResponse, None]: pass @abstractmethod async def process_queue(self): pass + + @abstractmethod + async def warm_up_caches(self): + pass diff --git a/docling_serve/gradio_ui.py b/docling_serve/gradio_ui.py index 6aa0ec8..6fcf2cc 100644 --- a/docling_serve/gradio_ui.py +++ b/docling_serve/gradio_ui.py @@ -6,6 +6,7 @@ import ssl import tempfile import time from pathlib import Path +from typing import Optional import certifi import gradio as gr @@ -203,12 +204,16 @@ def clear_file_input(): return None -def auto_set_return_as_file(url_input, file_input, image_export_mode): +def auto_set_return_as_file( + url_input_value: str, + file_input_value: Optional[list[str]], + image_export_mode_value: str, +): # If more than one input source is provided, return as file if ( - (len(url_input.split(",")) > 1) - or (file_input and len(file_input) > 1) - or (image_export_mode == "referenced") + (len(url_input_value.split(",")) > 1) + or (file_input_value and len(file_input_value) > 1) + or (image_export_mode_value == "referenced") ): return True else: @@ -344,7 +349,7 @@ def file_to_base64(file): def process_file( - file, + files, to_formats, image_export_mode, pipeline, @@ -361,10 +366,12 @@ def process_file( do_picture_classification, do_picture_description, ): - if not file or file == "": + if not files or len(files) == 0: logger.error("No files provided.") raise gr.Error("No files provided.", print_exception=False) - files_data = [{"base64_string": file_to_base64(file), "filename": file.name}] + files_data = [ + {"base64_string": file_to_base64(file), "filename": file.name} for file in files + ] parameters = { "file_sources": files_data, @@ -552,7 +559,7 @@ with gr.Blocks( ".png", ".gif", ], - file_count="single", + file_count="multiple", scale=4, ) with gr.Column(scale=1): @@ -625,9 +632,7 @@ with gr.Blocks( ) with gr.Column(scale=1): abort_on_error = gr.Checkbox(label="Abort on Error", value=False) - return_as_file = gr.Checkbox( - label="Return as File", visible=False, value=False - ) # Disable until async handle output as file + return_as_file = gr.Checkbox(label="Return as File", value=False) with gr.Row(): with gr.Column(): do_code_enrichment = gr.Checkbox( @@ -677,23 +682,22 @@ with gr.Blocks( # UI Actions # ############## - # Disable until async handle output as file # Handle Return as File - # url_input.change( - # auto_set_return_as_file, - # inputs=[url_input, file_input, image_export_mode], - # outputs=[return_as_file], - # ) - # file_input.change( - # auto_set_return_as_file, - # inputs=[url_input, file_input, image_export_mode], - # outputs=[return_as_file], - # ) - # image_export_mode.change( - # auto_set_return_as_file, - # inputs=[url_input, file_input, image_export_mode], - # outputs=[return_as_file], - # ) + url_input.change( + auto_set_return_as_file, + inputs=[url_input, file_input, image_export_mode], + outputs=[return_as_file], + ) + file_input.change( + auto_set_return_as_file, + inputs=[url_input, file_input, image_export_mode], + outputs=[return_as_file], + ) + image_export_mode.change( + auto_set_return_as_file, + inputs=[url_input, file_input, image_export_mode], + outputs=[return_as_file], + ) # URL processing url_process_btn.click( diff --git a/docling_serve/response_preparation.py b/docling_serve/response_preparation.py index 6cbadc8..64b8704 100644 --- a/docling_serve/response_preparation.py +++ b/docling_serve/response_preparation.py @@ -1,13 +1,12 @@ import logging import os import shutil -import tempfile import time from collections.abc import Iterable from pathlib import Path from typing import Union -from fastapi import BackgroundTasks, HTTPException +from fastapi import HTTPException from fastapi.responses import FileResponse from docling.datamodel.base_models import OutputFormat @@ -124,9 +123,9 @@ def _export_documents_as_files( def process_results( - background_tasks: BackgroundTasks, conversion_options: ConvertDocumentsOptions, conv_results: Iterable[ConversionResult], + work_dir: Path, ) -> Union[ConvertDocumentResponse, FileResponse]: # Let's start by processing the documents try: @@ -183,7 +182,6 @@ def process_results( # Multiple documents were processed, or we are forced returning as a file else: # Temporary directory to store the outputs - work_dir = Path(tempfile.mkdtemp(prefix="docling_")) output_dir = work_dir / "output" output_dir.mkdir(parents=True, exist_ok=True) @@ -203,7 +201,6 @@ def process_results( ) files = os.listdir(output_dir) - if len(files) == 0: raise HTTPException(status_code=500, detail="No documents were exported.") @@ -216,7 +213,7 @@ def process_results( # Other cleanups after the response is sent # Output directory - background_tasks.add_task(shutil.rmtree, work_dir, ignore_errors=True) + # background_tasks.add_task(shutil.rmtree, work_dir, ignore_errors=True) response = FileResponse( file_path, filename=file_path.name, media_type="application/zip" diff --git a/docling_serve/settings.py b/docling_serve/settings.py index a073940..ebc120c 100644 --- a/docling_serve/settings.py +++ b/docling_serve/settings.py @@ -38,6 +38,8 @@ class DoclingServeSettings(BaseSettings): api_host: str = "localhost" artifacts_path: Optional[Path] = None static_path: Optional[Path] = None + scratch_path: Optional[Path] = None + single_use_results: bool = True options_cache_size: int = 2 enable_remote_services: bool = False allow_external_plugins: bool = False diff --git a/docling_serve/storage.py b/docling_serve/storage.py new file mode 100644 index 0000000..2b72c30 --- /dev/null +++ b/docling_serve/storage.py @@ -0,0 +1,16 @@ +import tempfile +from functools import lru_cache +from pathlib import Path + +from docling_serve.settings import docling_serve_settings + + +@lru_cache +def get_scratch() -> Path: + scratch_dir = ( + docling_serve_settings.scratch_path + if docling_serve_settings.scratch_path is not None + else Path(tempfile.mkdtemp(prefix="docling_")) + ) + scratch_dir.mkdir(exist_ok=True, parents=True) + return scratch_dir diff --git a/docs/configuration.md b/docs/configuration.md index 22eeb8b..ed3c049 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -37,9 +37,11 @@ THe following table describes the options to configure the Docling Serve app. | -----------|-----|---------|-------------| | `--artifacts-path` | `DOCLING_SERVE_ARTIFACTS_PATH` | unset | If set to a valid directory, the model weights will be loaded from this path | | | `DOCLING_SERVE_STATIC_PATH` | unset | If set to a valid directory, the static assets for the docs and ui will be loaded from this path | +| | `DOCLING_SERVE_SCRATCH_PATH` | | If set, this directory will be used as scratch workspace, e.g. storing the results before they get requested. If unset, a temporary created is created for this purpose. | | `--enable-ui` | `DOCLING_SERVE_ENABLE_UI` | `false` | Enable the demonstrator UI. | | | `DOCLING_SERVE_ENABLE_REMOTE_SERVICES` | `false` | Allow pipeline components making remote connections. For example, this is needed when using a vision-language model via APIs. | | | `DOCLING_SERVE_ALLOW_EXTERNAL_PLUGINS` | `false` | Allow the selection of third-party plugins. | +| | `DOCLING_SERVE_SINGLE_USE_RESULTS` | `true` | If true, results can be accessed only once. If false, the results accumulate in the scratch directory. | | | `DOCLING_SERVE_MAX_DOCUMENT_TIMEOUT` | `604800` (7 days) | The maximum time for processing a document. | | | `DOCLING_SERVE_MAX_NUM_PAGES` | | The maximum number of pages for a document to be processed. | | | `DOCLING_SERVE_MAX_FILE_SIZE` | | The maximum file size for a document to be processed. | diff --git a/pyproject.toml b/pyproject.toml index 0cdef27..4a59249 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -66,6 +66,7 @@ cu124 = [ [dependency-groups] dev = [ + "asgi-lifespan~=2.0", "mypy~=1.11", "pre-commit-uv~=4.1", "pytest~=8.3", diff --git a/tests/test_1-file-all-outputs.py b/tests/test_1-file-all-outputs.py index 3214d82..9a2ae30 100644 --- a/tests/test_1-file-all-outputs.py +++ b/tests/test_1-file-all-outputs.py @@ -47,9 +47,7 @@ async def test_convert_file(async_client): "files": ("2206.01062v1.pdf", open(file_path, "rb"), "application/pdf"), } - response = await async_client.post( - url, files=files, data={"options": json.dumps(options)} - ) + response = await async_client.post(url, files=files, data=options) assert response.status_code == 200, "Response should be 200 OK" data = response.json() diff --git a/tests/test_1-file-async.py b/tests/test_1-file-async.py new file mode 100644 index 0000000..3b9d2fb --- /dev/null +++ b/tests/test_1-file-async.py @@ -0,0 +1,69 @@ +import json +import time +from pathlib import Path + +import httpx +import pytest +import pytest_asyncio + + +@pytest_asyncio.fixture +async def async_client(): + async with httpx.AsyncClient(timeout=60.0) as client: + yield client + + +@pytest.mark.asyncio +async def test_convert_url(async_client): + """Test convert URL to all outputs""" + + base_url = "http://localhost:5001/v1alpha" + payload = { + "to_formats": ["md", "json", "html"], + "image_export_mode": "placeholder", + "ocr": False, + "abort_on_error": False, + "return_as_file": False, + } + + file_path = Path(__file__).parent / "2206.01062v1.pdf" + files = { + "files": (file_path.name, file_path.open("rb"), "application/pdf"), + } + + for n in range(1): + response = await async_client.post( + f"{base_url}/convert/file/async", files=files, data=payload + ) + assert response.status_code == 200, "Response should be 200 OK" + + task = response.json() + + print(json.dumps(task, indent=2)) + + while task["task_status"] not in ("success", "failure"): + response = await async_client.get(f"{base_url}/status/poll/{task['task_id']}") + assert response.status_code == 200, "Response should be 200 OK" + task = response.json() + print(f"{task['task_status']=}") + print(f"{task['task_position']=}") + + time.sleep(2) + + assert task["task_status"] == "success" + + result_resp = await async_client.get(f"{base_url}/result/{task['task_id']}") + assert result_resp.status_code == 200, "Response should be 200 OK" + result = result_resp.json() + + assert "md_content" in result["document"] + assert result["document"]["md_content"] is not None + assert len(result["document"]["md_content"]) > 10 + + assert "html_content" in result["document"] + assert result["document"]["html_content"] is not None + assert len(result["document"]["html_content"]) > 10 + + assert "json_content" in result["document"] + assert result["document"]["json_content"] is not None + assert result["document"]["json_content"]["schema_name"] == "DoclingDocument" diff --git a/tests/test_1-url-async.py b/tests/test_1-url-async.py index d52dc40..05c5cd9 100644 --- a/tests/test_1-url-async.py +++ b/tests/test_1-url-async.py @@ -38,7 +38,7 @@ async def test_convert_url(async_client): } print(json.dumps(payload, indent=2)) - for n in range(1): + for n in range(3): response = await async_client.post( f"{base_url}/convert/source/async", json=payload ) diff --git a/tests/test_2-files-all-outputs.py b/tests/test_2-files-all-outputs.py index db2a5e9..ba4f496 100644 --- a/tests/test_2-files-all-outputs.py +++ b/tests/test_2-files-all-outputs.py @@ -1,4 +1,3 @@ -import json import os import httpx @@ -48,9 +47,7 @@ async def test_convert_file(async_client): ("files", ("2408.09869v5.pdf", open(file_path, "rb"), "application/pdf")), ] - response = await async_client.post( - url, files=files, data={"options": json.dumps(options)} - ) + response = await async_client.post(url, files=files, data=options) assert response.status_code == 200, "Response should be 200 OK" # Check for zip file attachment diff --git a/tests/test_2-urls-async-all-outputs.py b/tests/test_2-urls-async-all-outputs.py new file mode 100644 index 0000000..7a3004e --- /dev/null +++ b/tests/test_2-urls-async-all-outputs.py @@ -0,0 +1,88 @@ +import json +import time + +import httpx +import pytest +import pytest_asyncio +from pytest_check import check + + +@pytest_asyncio.fixture +async def async_client(): + async with httpx.AsyncClient(timeout=60.0) as client: + yield client + + +@pytest.mark.asyncio +async def test_convert_url(async_client): + """Test convert URL to all outputs""" + base_url = "http://localhost:5001/v1alpha" + payload = { + "options": { + "from_formats": [ + "docx", + "pptx", + "html", + "image", + "pdf", + "asciidoc", + "md", + "xlsx", + ], + "to_formats": ["md", "json", "html", "text", "doctags"], + "image_export_mode": "placeholder", + "ocr": True, + "force_ocr": False, + "ocr_engine": "easyocr", + "ocr_lang": ["en"], + "pdf_backend": "dlparse_v2", + "table_mode": "fast", + "abort_on_error": False, + "return_as_file": False, + }, + "http_sources": [ + {"url": "https://arxiv.org/pdf/2206.01062"}, + {"url": "https://arxiv.org/pdf/2408.09869"}, + ], + } + + response = await async_client.post(f"{base_url}/convert/source/async", json=payload) + assert response.status_code == 200, "Response should be 200 OK" + + task = response.json() + + print(json.dumps(task, indent=2)) + + while task["task_status"] not in ("success", "failure"): + response = await async_client.get(f"{base_url}/status/poll/{task['task_id']}") + assert response.status_code == 200, "Response should be 200 OK" + task = response.json() + print(f"{task['task_status']=}") + print(f"{task['task_position']=}") + + time.sleep(2) + + assert task["task_status"] == "success" + + result_resp = await async_client.get(f"{base_url}/result/{task['task_id']}") + assert result_resp.status_code == 200, "Response should be 200 OK" + + # Check for zip file attachment + content_disposition = result_resp.headers.get("content-disposition") + + with check: + assert content_disposition is not None, ( + "Content-Disposition header should be present" + ) + with check: + assert "attachment" in content_disposition, "Response should be an attachment" + with check: + assert 'filename="converted_docs.zip"' in content_disposition, ( + "Attachment filename should be 'converted_docs.zip'" + ) + + content_type = result_resp.headers.get("content-type") + with check: + assert content_type == "application/zip", ( + "Content-Type should be 'application/zip'" + ) diff --git a/tests/test_fastapi_endpoints.py b/tests/test_fastapi_endpoints.py index 5a32c44..2651725 100644 --- a/tests/test_fastapi_endpoints.py +++ b/tests/test_fastapi_endpoints.py @@ -1,22 +1,50 @@ +import asyncio import json import os -from fastapi.testclient import TestClient +import pytest +import pytest_asyncio +from asgi_lifespan import LifespanManager +from httpx import ASGITransport, AsyncClient from pytest_check import check from docling_serve.app import create_app -client = TestClient(create_app()) + +@pytest.fixture(scope="session") +def event_loop(): + return asyncio.get_event_loop() -def test_health(): - response = client.get("/health") +@pytest_asyncio.fixture(scope="session") +async def app(): + app = create_app() + + async with LifespanManager(app) as manager: + print("Launching lifespan of app.") + yield manager.app + + +@pytest_asyncio.fixture(scope="session") +async def client(app): + async with AsyncClient( + transport=ASGITransport(app=app), base_url="http://app.io" + ) as client: + print("Client is ready") + yield client + + +@pytest.mark.asyncio +async def test_health(client: AsyncClient): + response = await client.get("/health") assert response.status_code == 200 assert response.json() == {"status": "ok"} -def test_convert_file(): +@pytest.mark.asyncio +async def test_convert_file(client: AsyncClient): """Test convert single file to all outputs""" + endpoint = "/v1alpha/convert/file" options = { "from_formats": [ @@ -48,7 +76,7 @@ def test_convert_file(): "files": ("2206.01062v1.pdf", open(file_path, "rb"), "application/pdf"), } - response = client.post(endpoint, files=files, data=options) + response = await client.post(endpoint, files=files, data=options) assert response.status_code == 200, "Response should be 200 OK" data = response.json() diff --git a/uv.lock b/uv.lock index 1447af5..72f09f5 100644 --- a/uv.lock +++ b/uv.lock @@ -183,6 +183,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/a1/ee/48ca1a7c89ffec8b6a0c5d02b89c305671d5ffd8d3c94acf8b8c408575bb/anyio-4.9.0-py3-none-any.whl", hash = "sha256:9f76d541cad6e36af7beb62e978876f3b41e3e04f2c1fbf0884604c0a9c4d93c", size = 100916, upload_time = "2025-03-17T00:02:52.713Z" }, ] +[[package]] +name = "asgi-lifespan" +version = "2.1.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "sniffio", marker = "platform_machine != 'x86_64' or sys_platform != 'darwin' or (extra == 'extra-13-docling-serve-cpu' and extra == 'extra-13-docling-serve-cu124')" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/6a/da/e7908b54e0f8043725a990bf625f2041ecf6bfe8eb7b19407f1c00b630f7/asgi-lifespan-2.1.0.tar.gz", hash = "sha256:5e2effaf0bfe39829cf2d64e7ecc47c7d86d676a6599f7afba378c31f5e3a308", size = 15627, upload_time = "2023-03-28T17:35:49.126Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/2f/f5/c36551e93acba41a59939ae6a0fb77ddb3f2e8e8caa716410c65f7341f72/asgi_lifespan-2.1.0-py3-none-any.whl", hash = "sha256:ed840706680e28428c01e14afb3875d7d76d3206f3d5b2f2294e059b5c23804f", size = 10895, upload_time = "2023-03-28T17:35:47.772Z" }, +] + [[package]] name = "async-timeout" version = "5.0.1" @@ -727,6 +739,7 @@ ui = [ [package.dev-dependencies] dev = [ + { name = "asgi-lifespan", marker = "platform_machine != 'x86_64' or sys_platform != 'darwin' or (extra == 'extra-13-docling-serve-cpu' and extra == 'extra-13-docling-serve-cu124')" }, { name = "mypy", marker = "platform_machine != 'x86_64' or sys_platform != 'darwin' or (extra == 'extra-13-docling-serve-cpu' and extra == 'extra-13-docling-serve-cu124')" }, { name = "pre-commit-uv", marker = "platform_machine != 'x86_64' or sys_platform != 'darwin' or (extra == 'extra-13-docling-serve-cpu' and extra == 'extra-13-docling-serve-cu124')" }, { name = "pytest", marker = "platform_machine != 'x86_64' or sys_platform != 'darwin' or (extra == 'extra-13-docling-serve-cpu' and extra == 'extra-13-docling-serve-cu124')" }, @@ -763,6 +776,7 @@ provides-extras = ["ui", "tesserocr", "rapidocr", "cpu", "cu124"] [package.metadata.requires-dev] dev = [ + { name = "asgi-lifespan", specifier = "~=2.0" }, { name = "mypy", specifier = "~=1.11" }, { name = "pre-commit-uv", specifier = "~=4.1" }, { name = "pytest", specifier = "~=8.3" },