Files
docling-serve/docling_serve/datamodel/requests.py
2025-10-03 14:24:51 +02:00

131 lines
3.9 KiB
Python

import enum
from functools import cache
from typing import Annotated, Generic, Literal
from pydantic import BaseModel, Field, model_validator
from pydantic_core import PydanticCustomError
from typing_extensions import Self, TypeVar
from docling_jobkit.datamodel.chunking import (
BaseChunkerOptions,
)
from docling_jobkit.datamodel.http_inputs import FileSource, HttpSource
from docling_jobkit.datamodel.s3_coords import S3Coordinates
from docling_jobkit.datamodel.task_targets import (
InBodyTarget,
PutTarget,
S3Target,
ZipTarget,
)
from docling_serve.datamodel.convert import ConvertDocumentsRequestOptions
from docling_serve.settings import AsyncEngine, docling_serve_settings
## Sources
class FileSourceRequest(FileSource):
kind: Literal["file"] = "file"
class HttpSourceRequest(HttpSource):
kind: Literal["http"] = "http"
class S3SourceRequest(S3Coordinates):
kind: Literal["s3"] = "s3"
## Multipart targets
class TargetName(str, enum.Enum):
INBODY = InBodyTarget().kind
ZIP = ZipTarget().kind
## Aliases
SourceRequestItem = Annotated[
FileSourceRequest | HttpSourceRequest | S3SourceRequest, Field(discriminator="kind")
]
TargetRequest = Annotated[
InBodyTarget | ZipTarget | S3Target | PutTarget,
Field(discriminator="kind"),
]
## Complete Source request
class ConvertDocumentsRequest(BaseModel):
options: ConvertDocumentsRequestOptions = ConvertDocumentsRequestOptions()
sources: list[SourceRequestItem]
target: TargetRequest = InBodyTarget()
@model_validator(mode="after")
def validate_s3_source_and_target(self) -> Self:
for source in self.sources:
if isinstance(source, S3SourceRequest):
if docling_serve_settings.eng_kind != AsyncEngine.KFP:
raise PydanticCustomError(
"error source", 'source kind "s3" requires engine kind "KFP"'
)
if self.target.kind != "s3":
raise PydanticCustomError(
"error source", 'source kind "s3" requires target kind "s3"'
)
if isinstance(self.target, S3Target):
for source in self.sources:
if isinstance(source, S3SourceRequest):
return self
raise PydanticCustomError(
"error target", 'target kind "s3" requires source kind "s3"'
)
return self
## Source chunking requests
class BaseChunkDocumentsRequest(BaseModel):
convert_options: Annotated[
ConvertDocumentsRequestOptions, Field(description="Conversion options.")
] = ConvertDocumentsRequestOptions()
sources: Annotated[
list[SourceRequestItem],
Field(description="List of input document sources to process."),
]
include_converted_doc: Annotated[
bool,
Field(
description="If true, the output will include both the chunks and the converted document."
),
] = False
target: Annotated[
TargetRequest, Field(description="Specification for the type of output target.")
] = InBodyTarget()
ChunkingOptT = TypeVar("ChunkingOptT", bound=BaseChunkerOptions)
class GenericChunkDocumentsRequest(BaseChunkDocumentsRequest, Generic[ChunkingOptT]):
chunking_options: ChunkingOptT
@cache
def make_request_model(
opt_type: type[ChunkingOptT],
) -> type[GenericChunkDocumentsRequest[ChunkingOptT]]:
"""
Dynamically create (and cache) a subclass of GenericChunkDocumentsRequest[opt_type]
with chunking_options having a default factory.
"""
return type(
f"{opt_type.__name__}DocumentsRequest",
(GenericChunkDocumentsRequest[opt_type],), # type: ignore[valid-type]
{
"__annotations__": {"chunking_options": opt_type},
"chunking_options": Field(
default_factory=opt_type, description="Options specific to the chunker."
),
},
)