diff --git a/docling_serve/app.py b/docling_serve/app.py index 0417b0e..e18e353 100644 --- a/docling_serve/app.py +++ b/docling_serve/app.py @@ -546,6 +546,8 @@ def create_app(): # noqa: C901 status_code=400, detail=f"Invalid progress payload: {err}" ) + #### Clear requests + # Offload models @app.get( "/v1alpha/clear/converters", @@ -555,4 +557,16 @@ def create_app(): # noqa: C901 _get_converter_from_hash.cache_clear() return ClearResponse() + # Clean results + @app.get( + "/v1alpha/clear/results", + response_model=ClearResponse, + ) + async def clear_results( + orchestrator: Annotated[BaseAsyncOrchestrator, Depends(get_async_orchestrator)], + older_then: float = 3600, + ): + await orchestrator.clear_results(older_than=older_then) + return ClearResponse() + return app diff --git a/docling_serve/datamodel/task.py b/docling_serve/datamodel/task.py index e76014d..5ea7de1 100644 --- a/docling_serve/datamodel/task.py +++ b/docling_serve/datamodel/task.py @@ -1,8 +1,10 @@ +import datetime +from functools import partial from pathlib import Path from typing import Optional, Union from fastapi.responses import FileResponse -from pydantic import BaseModel, ConfigDict +from pydantic import BaseModel, ConfigDict, Field from docling.datamodel.base_models import DocumentStream @@ -25,6 +27,27 @@ class Task(BaseModel): result: Optional[Union[ConvertDocumentResponse, FileResponse]] = None scratch_dir: Optional[Path] = None processing_meta: Optional[TaskProcessingMeta] = None + created_at: datetime.datetime = Field( + default_factory=partial(datetime.datetime.now, datetime.timezone.utc) + ) + started_at: Optional[datetime.datetime] = None + finished_at: Optional[datetime.datetime] = None + last_update_at: datetime.datetime = Field( + default_factory=partial(datetime.datetime.now, datetime.timezone.utc) + ) + + def set_status(self, status: TaskStatus): + now = datetime.datetime.now(datetime.timezone.utc) + if status == TaskStatus.STARTED and self.started_at is None: + self.started_at = now + if ( + status in [TaskStatus.SUCCESS, TaskStatus.FAILURE] + and self.finished_at is None + ): + self.finished_at = now + + self.last_update_at = now + self.task_status = status def is_completed(self) -> bool: if self.task_status in [TaskStatus.SUCCESS, TaskStatus.FAILURE]: diff --git a/docling_serve/engines/async_kfp/orchestrator.py b/docling_serve/engines/async_kfp/orchestrator.py index a6e15a1..98b84c3 100644 --- a/docling_serve/engines/async_kfp/orchestrator.py +++ b/docling_serve/engines/async_kfp/orchestrator.py @@ -130,13 +130,13 @@ class AsyncKfpOrchestrator(BaseAsyncOrchestrator): # CANCELED = "CANCELED" # PAUSED = "PAUSED" if run_info.state == V2beta1RuntimeState.SUCCEEDED: - task.task_status = TaskStatus.SUCCESS + task.set_status(TaskStatus.SUCCESS) elif run_info.state == V2beta1RuntimeState.PENDING: - task.task_status = TaskStatus.PENDING + task.set_status(TaskStatus.PENDING) elif run_info.state == V2beta1RuntimeState.RUNNING: - task.task_status = TaskStatus.STARTED + task.set_status(TaskStatus.STARTED) else: - task.task_status = TaskStatus.FAILURE + task.set_status(TaskStatus.FAILURE) async def task_status(self, task_id: str, wait: float = 0.0) -> Task: await self._update_task_from_run(task_id=task_id, wait=wait) diff --git a/docling_serve/engines/async_local/worker.py b/docling_serve/engines/async_local/worker.py index bd89671..549506c 100644 --- a/docling_serve/engines/async_local/worker.py +++ b/docling_serve/engines/async_local/worker.py @@ -36,7 +36,7 @@ class AsyncLocalWorker: task = self.orchestrator.tasks[task_id] try: - task.task_status = TaskStatus.STARTED + task.set_status(TaskStatus.STARTED) _log.info(f"Worker {self.worker_id} processing task {task_id}") # Notify clients about task updates @@ -106,7 +106,7 @@ class AsyncLocalWorker: task.sources = [] task.options = None - task.task_status = TaskStatus.SUCCESS + task.set_status(TaskStatus.SUCCESS) _log.info( f"Worker {self.worker_id} completed job {task_id} " f"in {processing_time:.2f} seconds" @@ -116,7 +116,7 @@ class AsyncLocalWorker: _log.error( f"Worker {self.worker_id} failed to process job {task_id}: {e}" ) - task.task_status = TaskStatus.FAILURE + task.set_status(TaskStatus.FAILURE) finally: await self.orchestrator.notify_task_subscribers(task_id) diff --git a/docling_serve/engines/async_orchestrator.py b/docling_serve/engines/async_orchestrator.py index 5e26911..b523bbc 100644 --- a/docling_serve/engines/async_orchestrator.py +++ b/docling_serve/engines/async_orchestrator.py @@ -1,3 +1,6 @@ +import asyncio +import datetime +import logging import shutil from typing import Union @@ -20,6 +23,8 @@ from docling_serve.engines.base_orchestrator import ( ) from docling_serve.settings import docling_serve_settings +_log = logging.getLogger(__name__) + class ProgressInvalid(OrchestratorError): pass @@ -46,13 +51,50 @@ class BaseAsyncOrchestrator(BaseOrchestrator): async def task_result( self, task_id: str, background_tasks: BackgroundTasks ) -> Union[ConvertDocumentResponse, FileResponse, None]: - task = await self.get_raw_task(task_id=task_id) - if task.is_completed() and task.scratch_dir is not None: - if docling_serve_settings.single_use_results: - background_tasks.add_task( - shutil.rmtree, task.scratch_dir, ignore_errors=True - ) - return task.result + try: + task = await self.get_raw_task(task_id=task_id) + if task.is_completed() and docling_serve_settings.single_use_results: + if task.scratch_dir is not None: + background_tasks.add_task( + shutil.rmtree, task.scratch_dir, ignore_errors=True + ) + + async def _remove_task_impl(): + await asyncio.sleep(docling_serve_settings.result_removal_delay) + await self.delete_task(task_id=task.task_id) + + async def _remove_task(): + asyncio.create_task(_remove_task_impl()) # noqa: RUF006 + + background_tasks.add_task(_remove_task) + + return task.result + except TaskNotFoundError: + return None + + async def delete_task(self, task_id: str): + _log.info(f"Deleting {task_id=}") + if task_id in self.task_subscribers: + for websocket in self.task_subscribers[task_id]: + await websocket.close() + + del self.task_subscribers[task_id] + + if task_id in self.tasks: + del self.tasks[task_id] + + async def clear_results(self, older_than: float = 0.0): + cutoff_time = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta( + seconds=older_than + ) + + tasks_to_delete = [ + task_id + for task_id, task in self.tasks.items() + if task.finished_at is not None and task.finished_at < cutoff_time + ] + for task_id in tasks_to_delete: + await self.delete_task(task_id=task_id) async def notify_task_subscribers(self, task_id: str): if task_id not in self.task_subscribers: diff --git a/docling_serve/engines/base_orchestrator.py b/docling_serve/engines/base_orchestrator.py index 72421a4..dea59a3 100644 --- a/docling_serve/engines/base_orchestrator.py +++ b/docling_serve/engines/base_orchestrator.py @@ -42,6 +42,10 @@ class BaseOrchestrator(ABC): ) -> Union[ConvertDocumentResponse, FileResponse, None]: pass + @abstractmethod + async def clear_results(self, older_than: float = 0.0): + pass + @abstractmethod async def process_queue(self): pass diff --git a/docling_serve/settings.py b/docling_serve/settings.py index 6c06e4d..13ce96c 100644 --- a/docling_serve/settings.py +++ b/docling_serve/settings.py @@ -40,6 +40,7 @@ class DoclingServeSettings(BaseSettings): static_path: Optional[Path] = None scratch_path: Optional[Path] = None single_use_results: bool = True + result_removal_delay: float = 300 # 5 minutes options_cache_size: int = 2 enable_remote_services: bool = False allow_external_plugins: bool = False diff --git a/docs/configuration.md b/docs/configuration.md index cf594db..c84c456 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -42,6 +42,7 @@ THe following table describes the options to configure the Docling Serve app. | | `DOCLING_SERVE_ENABLE_REMOTE_SERVICES` | `false` | Allow pipeline components making remote connections. For example, this is needed when using a vision-language model via APIs. | | | `DOCLING_SERVE_ALLOW_EXTERNAL_PLUGINS` | `false` | Allow the selection of third-party plugins. | | | `DOCLING_SERVE_SINGLE_USE_RESULTS` | `true` | If true, results can be accessed only once. If false, the results accumulate in the scratch directory. | +| | `DOCLING_SERVE_RESULT_REMOVAL_DELAY` | `300` | When `DOCLING_SERVE_SINGLE_USE_RESULTS` is active, this is the delay before results are removed from the task registry. | | | `DOCLING_SERVE_MAX_DOCUMENT_TIMEOUT` | `604800` (7 days) | The maximum time for processing a document. | | | `DOCLING_SERVE_MAX_NUM_PAGES` | | The maximum number of pages for a document to be processed. | | | `DOCLING_SERVE_MAX_FILE_SIZE` | | The maximum file size for a document to be processed. | diff --git a/tests/test_1-file-async.py b/tests/test_1-file-async.py index 3b9d2fb..e40feb2 100644 --- a/tests/test_1-file-async.py +++ b/tests/test_1-file-async.py @@ -51,10 +51,12 @@ async def test_convert_url(async_client): time.sleep(2) assert task["task_status"] == "success" + print(f"Task completed with status {task['task_status']=}") result_resp = await async_client.get(f"{base_url}/result/{task['task_id']}") assert result_resp.status_code == 200, "Response should be 200 OK" result = result_resp.json() + print("Got result.") assert "md_content" in result["document"] assert result["document"]["md_content"] is not None diff --git a/tests/test_results_clear.py b/tests/test_results_clear.py new file mode 100644 index 0000000..b5f2b73 --- /dev/null +++ b/tests/test_results_clear.py @@ -0,0 +1,127 @@ +import asyncio +import base64 +import json +from pathlib import Path + +import pytest +import pytest_asyncio +from asgi_lifespan import LifespanManager +from httpx import ASGITransport, AsyncClient + +from docling_serve.app import create_app +from docling_serve.settings import docling_serve_settings + + +@pytest.fixture(scope="session") +def event_loop(): + return asyncio.get_event_loop() + + +@pytest_asyncio.fixture(scope="session") +async def app(): + app = create_app() + + async with LifespanManager(app) as manager: + print("Launching lifespan of app.") + yield manager.app + + +@pytest_asyncio.fixture(scope="session") +async def client(app): + async with AsyncClient( + transport=ASGITransport(app=app), base_url="http://app.io" + ) as client: + print("Client is ready") + yield client + + +async def convert_file(client: AsyncClient): + doc_filename = Path("tests/2408.09869v5.pdf") + encoded_doc = base64.b64encode(doc_filename.read_bytes()).decode() + + payload = { + "options": { + "to_formats": ["json"], + }, + "file_sources": [{"base64_string": encoded_doc, "filename": doc_filename.name}], + } + + response = await client.post("/v1alpha/convert/source/async", json=payload) + assert response.status_code == 200, "Response should be 200 OK" + + task = response.json() + + print(json.dumps(task, indent=2)) + + while task["task_status"] not in ("success", "failure"): + response = await client.get(f"/v1alpha/status/poll/{task['task_id']}") + assert response.status_code == 200, "Response should be 200 OK" + task = response.json() + print(f"{task['task_status']=}") + print(f"{task['task_position']=}") + + await asyncio.sleep(2) + + assert task["task_status"] == "success" + + return task + + +@pytest.mark.asyncio +async def test_clear_results(client: AsyncClient): + """Test removal of task.""" + + # Set long delay deletion + docling_serve_settings.result_removal_delay = 100 + + # Convert and wait for completion + task = await convert_file(client) + + # Get result once + result_response = await client.get(f"/v1alpha/result/{task['task_id']}") + assert result_response.status_code == 200, "Response should be 200 OK" + print("Result 1 ok.") + result = result_response.json() + assert result["document"]["json_content"]["schema_name"] == "DoclingDocument" + + # Get result twice + result_response = await client.get(f"/v1alpha/result/{task['task_id']}") + assert result_response.status_code == 200, "Response should be 200 OK" + print("Result 2 ok.") + result = result_response.json() + assert result["document"]["json_content"]["schema_name"] == "DoclingDocument" + + # Clear + clear_response = await client.get("/v1alpha/clear/results?older_then=0") + assert clear_response.status_code == 200, "Response should be 200 OK" + print("Clear ok.") + + # Get deleted result + result_response = await client.get(f"/v1alpha/result/{task['task_id']}") + assert result_response.status_code == 404, "Response should be removed" + print("Result was no longer found.") + + +@pytest.mark.asyncio +async def test_delay_remove(client: AsyncClient): + """Test automatic removal of task with delay.""" + + # Set short delay deletion + docling_serve_settings.result_removal_delay = 5 + + # Convert and wait for completion + task = await convert_file(client) + + # Get result once + result_response = await client.get(f"/v1alpha/result/{task['task_id']}") + assert result_response.status_code == 200, "Response should be 200 OK" + print("Result ok.") + result = result_response.json() + assert result["document"]["json_content"]["schema_name"] == "DoclingDocument" + + print("Sleeping to wait the automatic task deletion.") + await asyncio.sleep(10) + + # Get deleted result + result_response = await client.get(f"/v1alpha/result/{task['task_id']}") + assert result_response.status_code == 404, "Response should be removed"