feat: clear results registry (#192)

Signed-off-by: Michele Dolfi <dol@zurich.ibm.com>
This commit is contained in:
Michele Dolfi
2025-05-23 08:30:57 -04:00
committed by GitHub
parent abe5aa03f5
commit de002dfcdc
10 changed files with 229 additions and 15 deletions

View File

@@ -546,6 +546,8 @@ def create_app(): # noqa: C901
status_code=400, detail=f"Invalid progress payload: {err}" status_code=400, detail=f"Invalid progress payload: {err}"
) )
#### Clear requests
# Offload models # Offload models
@app.get( @app.get(
"/v1alpha/clear/converters", "/v1alpha/clear/converters",
@@ -555,4 +557,16 @@ def create_app(): # noqa: C901
_get_converter_from_hash.cache_clear() _get_converter_from_hash.cache_clear()
return ClearResponse() return ClearResponse()
# Clean results
@app.get(
"/v1alpha/clear/results",
response_model=ClearResponse,
)
async def clear_results(
orchestrator: Annotated[BaseAsyncOrchestrator, Depends(get_async_orchestrator)],
older_then: float = 3600,
):
await orchestrator.clear_results(older_than=older_then)
return ClearResponse()
return app return app

View File

@@ -1,8 +1,10 @@
import datetime
from functools import partial
from pathlib import Path from pathlib import Path
from typing import Optional, Union from typing import Optional, Union
from fastapi.responses import FileResponse from fastapi.responses import FileResponse
from pydantic import BaseModel, ConfigDict from pydantic import BaseModel, ConfigDict, Field
from docling.datamodel.base_models import DocumentStream from docling.datamodel.base_models import DocumentStream
@@ -25,6 +27,27 @@ class Task(BaseModel):
result: Optional[Union[ConvertDocumentResponse, FileResponse]] = None result: Optional[Union[ConvertDocumentResponse, FileResponse]] = None
scratch_dir: Optional[Path] = None scratch_dir: Optional[Path] = None
processing_meta: Optional[TaskProcessingMeta] = None processing_meta: Optional[TaskProcessingMeta] = None
created_at: datetime.datetime = Field(
default_factory=partial(datetime.datetime.now, datetime.timezone.utc)
)
started_at: Optional[datetime.datetime] = None
finished_at: Optional[datetime.datetime] = None
last_update_at: datetime.datetime = Field(
default_factory=partial(datetime.datetime.now, datetime.timezone.utc)
)
def set_status(self, status: TaskStatus):
now = datetime.datetime.now(datetime.timezone.utc)
if status == TaskStatus.STARTED and self.started_at is None:
self.started_at = now
if (
status in [TaskStatus.SUCCESS, TaskStatus.FAILURE]
and self.finished_at is None
):
self.finished_at = now
self.last_update_at = now
self.task_status = status
def is_completed(self) -> bool: def is_completed(self) -> bool:
if self.task_status in [TaskStatus.SUCCESS, TaskStatus.FAILURE]: if self.task_status in [TaskStatus.SUCCESS, TaskStatus.FAILURE]:

View File

@@ -130,13 +130,13 @@ class AsyncKfpOrchestrator(BaseAsyncOrchestrator):
# CANCELED = "CANCELED" # CANCELED = "CANCELED"
# PAUSED = "PAUSED" # PAUSED = "PAUSED"
if run_info.state == V2beta1RuntimeState.SUCCEEDED: if run_info.state == V2beta1RuntimeState.SUCCEEDED:
task.task_status = TaskStatus.SUCCESS task.set_status(TaskStatus.SUCCESS)
elif run_info.state == V2beta1RuntimeState.PENDING: elif run_info.state == V2beta1RuntimeState.PENDING:
task.task_status = TaskStatus.PENDING task.set_status(TaskStatus.PENDING)
elif run_info.state == V2beta1RuntimeState.RUNNING: elif run_info.state == V2beta1RuntimeState.RUNNING:
task.task_status = TaskStatus.STARTED task.set_status(TaskStatus.STARTED)
else: else:
task.task_status = TaskStatus.FAILURE task.set_status(TaskStatus.FAILURE)
async def task_status(self, task_id: str, wait: float = 0.0) -> Task: async def task_status(self, task_id: str, wait: float = 0.0) -> Task:
await self._update_task_from_run(task_id=task_id, wait=wait) await self._update_task_from_run(task_id=task_id, wait=wait)

View File

@@ -36,7 +36,7 @@ class AsyncLocalWorker:
task = self.orchestrator.tasks[task_id] task = self.orchestrator.tasks[task_id]
try: try:
task.task_status = TaskStatus.STARTED task.set_status(TaskStatus.STARTED)
_log.info(f"Worker {self.worker_id} processing task {task_id}") _log.info(f"Worker {self.worker_id} processing task {task_id}")
# Notify clients about task updates # Notify clients about task updates
@@ -106,7 +106,7 @@ class AsyncLocalWorker:
task.sources = [] task.sources = []
task.options = None task.options = None
task.task_status = TaskStatus.SUCCESS task.set_status(TaskStatus.SUCCESS)
_log.info( _log.info(
f"Worker {self.worker_id} completed job {task_id} " f"Worker {self.worker_id} completed job {task_id} "
f"in {processing_time:.2f} seconds" f"in {processing_time:.2f} seconds"
@@ -116,7 +116,7 @@ class AsyncLocalWorker:
_log.error( _log.error(
f"Worker {self.worker_id} failed to process job {task_id}: {e}" f"Worker {self.worker_id} failed to process job {task_id}: {e}"
) )
task.task_status = TaskStatus.FAILURE task.set_status(TaskStatus.FAILURE)
finally: finally:
await self.orchestrator.notify_task_subscribers(task_id) await self.orchestrator.notify_task_subscribers(task_id)

View File

@@ -1,3 +1,6 @@
import asyncio
import datetime
import logging
import shutil import shutil
from typing import Union from typing import Union
@@ -20,6 +23,8 @@ from docling_serve.engines.base_orchestrator import (
) )
from docling_serve.settings import docling_serve_settings from docling_serve.settings import docling_serve_settings
_log = logging.getLogger(__name__)
class ProgressInvalid(OrchestratorError): class ProgressInvalid(OrchestratorError):
pass pass
@@ -46,13 +51,50 @@ class BaseAsyncOrchestrator(BaseOrchestrator):
async def task_result( async def task_result(
self, task_id: str, background_tasks: BackgroundTasks self, task_id: str, background_tasks: BackgroundTasks
) -> Union[ConvertDocumentResponse, FileResponse, None]: ) -> Union[ConvertDocumentResponse, FileResponse, None]:
task = await self.get_raw_task(task_id=task_id) try:
if task.is_completed() and task.scratch_dir is not None: task = await self.get_raw_task(task_id=task_id)
if docling_serve_settings.single_use_results: if task.is_completed() and docling_serve_settings.single_use_results:
background_tasks.add_task( if task.scratch_dir is not None:
shutil.rmtree, task.scratch_dir, ignore_errors=True background_tasks.add_task(
) shutil.rmtree, task.scratch_dir, ignore_errors=True
return task.result )
async def _remove_task_impl():
await asyncio.sleep(docling_serve_settings.result_removal_delay)
await self.delete_task(task_id=task.task_id)
async def _remove_task():
asyncio.create_task(_remove_task_impl()) # noqa: RUF006
background_tasks.add_task(_remove_task)
return task.result
except TaskNotFoundError:
return None
async def delete_task(self, task_id: str):
_log.info(f"Deleting {task_id=}")
if task_id in self.task_subscribers:
for websocket in self.task_subscribers[task_id]:
await websocket.close()
del self.task_subscribers[task_id]
if task_id in self.tasks:
del self.tasks[task_id]
async def clear_results(self, older_than: float = 0.0):
cutoff_time = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(
seconds=older_than
)
tasks_to_delete = [
task_id
for task_id, task in self.tasks.items()
if task.finished_at is not None and task.finished_at < cutoff_time
]
for task_id in tasks_to_delete:
await self.delete_task(task_id=task_id)
async def notify_task_subscribers(self, task_id: str): async def notify_task_subscribers(self, task_id: str):
if task_id not in self.task_subscribers: if task_id not in self.task_subscribers:

View File

@@ -42,6 +42,10 @@ class BaseOrchestrator(ABC):
) -> Union[ConvertDocumentResponse, FileResponse, None]: ) -> Union[ConvertDocumentResponse, FileResponse, None]:
pass pass
@abstractmethod
async def clear_results(self, older_than: float = 0.0):
pass
@abstractmethod @abstractmethod
async def process_queue(self): async def process_queue(self):
pass pass

View File

@@ -40,6 +40,7 @@ class DoclingServeSettings(BaseSettings):
static_path: Optional[Path] = None static_path: Optional[Path] = None
scratch_path: Optional[Path] = None scratch_path: Optional[Path] = None
single_use_results: bool = True single_use_results: bool = True
result_removal_delay: float = 300 # 5 minutes
options_cache_size: int = 2 options_cache_size: int = 2
enable_remote_services: bool = False enable_remote_services: bool = False
allow_external_plugins: bool = False allow_external_plugins: bool = False

View File

@@ -42,6 +42,7 @@ THe following table describes the options to configure the Docling Serve app.
| | `DOCLING_SERVE_ENABLE_REMOTE_SERVICES` | `false` | Allow pipeline components making remote connections. For example, this is needed when using a vision-language model via APIs. | | | `DOCLING_SERVE_ENABLE_REMOTE_SERVICES` | `false` | Allow pipeline components making remote connections. For example, this is needed when using a vision-language model via APIs. |
| | `DOCLING_SERVE_ALLOW_EXTERNAL_PLUGINS` | `false` | Allow the selection of third-party plugins. | | | `DOCLING_SERVE_ALLOW_EXTERNAL_PLUGINS` | `false` | Allow the selection of third-party plugins. |
| | `DOCLING_SERVE_SINGLE_USE_RESULTS` | `true` | If true, results can be accessed only once. If false, the results accumulate in the scratch directory. | | | `DOCLING_SERVE_SINGLE_USE_RESULTS` | `true` | If true, results can be accessed only once. If false, the results accumulate in the scratch directory. |
| | `DOCLING_SERVE_RESULT_REMOVAL_DELAY` | `300` | When `DOCLING_SERVE_SINGLE_USE_RESULTS` is active, this is the delay before results are removed from the task registry. |
| | `DOCLING_SERVE_MAX_DOCUMENT_TIMEOUT` | `604800` (7 days) | The maximum time for processing a document. | | | `DOCLING_SERVE_MAX_DOCUMENT_TIMEOUT` | `604800` (7 days) | The maximum time for processing a document. |
| | `DOCLING_SERVE_MAX_NUM_PAGES` | | The maximum number of pages for a document to be processed. | | | `DOCLING_SERVE_MAX_NUM_PAGES` | | The maximum number of pages for a document to be processed. |
| | `DOCLING_SERVE_MAX_FILE_SIZE` | | The maximum file size for a document to be processed. | | | `DOCLING_SERVE_MAX_FILE_SIZE` | | The maximum file size for a document to be processed. |

View File

@@ -51,10 +51,12 @@ async def test_convert_url(async_client):
time.sleep(2) time.sleep(2)
assert task["task_status"] == "success" assert task["task_status"] == "success"
print(f"Task completed with status {task['task_status']=}")
result_resp = await async_client.get(f"{base_url}/result/{task['task_id']}") result_resp = await async_client.get(f"{base_url}/result/{task['task_id']}")
assert result_resp.status_code == 200, "Response should be 200 OK" assert result_resp.status_code == 200, "Response should be 200 OK"
result = result_resp.json() result = result_resp.json()
print("Got result.")
assert "md_content" in result["document"] assert "md_content" in result["document"]
assert result["document"]["md_content"] is not None assert result["document"]["md_content"] is not None

127
tests/test_results_clear.py Normal file
View File

@@ -0,0 +1,127 @@
import asyncio
import base64
import json
from pathlib import Path
import pytest
import pytest_asyncio
from asgi_lifespan import LifespanManager
from httpx import ASGITransport, AsyncClient
from docling_serve.app import create_app
from docling_serve.settings import docling_serve_settings
@pytest.fixture(scope="session")
def event_loop():
return asyncio.get_event_loop()
@pytest_asyncio.fixture(scope="session")
async def app():
app = create_app()
async with LifespanManager(app) as manager:
print("Launching lifespan of app.")
yield manager.app
@pytest_asyncio.fixture(scope="session")
async def client(app):
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://app.io"
) as client:
print("Client is ready")
yield client
async def convert_file(client: AsyncClient):
doc_filename = Path("tests/2408.09869v5.pdf")
encoded_doc = base64.b64encode(doc_filename.read_bytes()).decode()
payload = {
"options": {
"to_formats": ["json"],
},
"file_sources": [{"base64_string": encoded_doc, "filename": doc_filename.name}],
}
response = await client.post("/v1alpha/convert/source/async", json=payload)
assert response.status_code == 200, "Response should be 200 OK"
task = response.json()
print(json.dumps(task, indent=2))
while task["task_status"] not in ("success", "failure"):
response = await client.get(f"/v1alpha/status/poll/{task['task_id']}")
assert response.status_code == 200, "Response should be 200 OK"
task = response.json()
print(f"{task['task_status']=}")
print(f"{task['task_position']=}")
await asyncio.sleep(2)
assert task["task_status"] == "success"
return task
@pytest.mark.asyncio
async def test_clear_results(client: AsyncClient):
"""Test removal of task."""
# Set long delay deletion
docling_serve_settings.result_removal_delay = 100
# Convert and wait for completion
task = await convert_file(client)
# Get result once
result_response = await client.get(f"/v1alpha/result/{task['task_id']}")
assert result_response.status_code == 200, "Response should be 200 OK"
print("Result 1 ok.")
result = result_response.json()
assert result["document"]["json_content"]["schema_name"] == "DoclingDocument"
# Get result twice
result_response = await client.get(f"/v1alpha/result/{task['task_id']}")
assert result_response.status_code == 200, "Response should be 200 OK"
print("Result 2 ok.")
result = result_response.json()
assert result["document"]["json_content"]["schema_name"] == "DoclingDocument"
# Clear
clear_response = await client.get("/v1alpha/clear/results?older_then=0")
assert clear_response.status_code == 200, "Response should be 200 OK"
print("Clear ok.")
# Get deleted result
result_response = await client.get(f"/v1alpha/result/{task['task_id']}")
assert result_response.status_code == 404, "Response should be removed"
print("Result was no longer found.")
@pytest.mark.asyncio
async def test_delay_remove(client: AsyncClient):
"""Test automatic removal of task with delay."""
# Set short delay deletion
docling_serve_settings.result_removal_delay = 5
# Convert and wait for completion
task = await convert_file(client)
# Get result once
result_response = await client.get(f"/v1alpha/result/{task['task_id']}")
assert result_response.status_code == 200, "Response should be 200 OK"
print("Result ok.")
result = result_response.json()
assert result["document"]["json_content"]["schema_name"] == "DoclingDocument"
print("Sleeping to wait the automatic task deletion.")
await asyncio.sleep(10)
# Get deleted result
result_response = await client.get(f"/v1alpha/result/{task['task_id']}")
assert result_response.status_code == 404, "Response should be removed"