Compare commits

...

1 Commits

Author SHA1 Message Date
Alex
1de82ca040 fix: batch limits and failed task reque limit (#2484) 2026-05-18 22:22:43 +01:00
7 changed files with 449 additions and 7 deletions

View File

@@ -3,6 +3,7 @@
from __future__ import annotations
import functools
import inspect
import logging
import threading
import uuid
@@ -26,13 +27,20 @@ LEASE_HEARTBEAT_INTERVAL = 30
LEASE_RETRY_MAX = 10
def with_idempotency(task_name: str) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
def with_idempotency(
task_name: str,
*,
on_poison: Optional[Callable[[str, dict], None]] = None,
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
"""Short-circuit on completed key; gate concurrent runs via a lease.
The guard key is the caller's ``idempotency_key``, or one synthesized
from ``source_id`` so a keyless dispatch is still poison-guarded.
Entry short-circuits:
- completed row → return cached result
- live lease held → retry(countdown=LEASE_TTL_SECONDS)
- attempt_count > MAX_TASK_ATTEMPTS → poison-loop alert
- attempt_count > MAX_TASK_ATTEMPTS → poison alert; ``on_poison`` fires
Success writes ``completed``; exceptions leave ``pending`` for
autoretry until the poison-loop guard trips.
"""
@@ -40,7 +48,14 @@ def with_idempotency(task_name: str) -> Callable[[Callable[..., Any]], Callable[
def decorator(fn: Callable[..., Any]) -> Callable[..., Any]:
@functools.wraps(fn)
def wrapper(self, *args: Any, idempotency_key: Any = None, **kwargs: Any) -> Any:
key = idempotency_key if isinstance(idempotency_key, str) and idempotency_key else None
explicit_key = (
idempotency_key
if isinstance(idempotency_key, str) and idempotency_key
else None
)
# A keyless dispatch still gets the guard via a synthesized key;
# None means no anchor exists — run unguarded, as before.
key = explicit_key or _synthesize_guard_key(task_name, kwargs)
if key is None:
return fn(self, *args, idempotency_key=idempotency_key, **kwargs)
@@ -88,6 +103,9 @@ def with_idempotency(task_name: str) -> Callable[[Callable[..., Any]], Callable[
"attempts": attempt,
}
_finalize(key, poisoned, status="failed")
_run_poison_hook(
on_poison, task_name, fn, self, args, kwargs, idempotency_key,
)
return poisoned
heartbeat_thread, heartbeat_stop = _start_lease_heartbeat(
@@ -109,6 +127,45 @@ def with_idempotency(task_name: str) -> Callable[[Callable[..., Any]], Callable[
return decorator
def _synthesize_guard_key(task_name: str, kwargs: dict) -> Optional[str]:
"""Derive a deterministic guard key from ``source_id`` for a keyless dispatch.
``source_id`` is stable across broker redeliveries and unique per
upload, so the poison-loop counter survives an OOM SIGKILL. Returns
``None`` when absent — the dispatch then runs unguarded as before.
"""
source_id = kwargs.get("source_id")
if source_id:
return f"auto:{task_name}:{source_id}"
return None
def _run_poison_hook(
on_poison: Optional[Callable[[str, dict], None]],
task_name: str,
fn: Callable[..., Any],
task_self: Any,
args: tuple,
kwargs: dict,
idempotency_key: Any,
) -> None:
"""Invoke a task's poison-path hook with named call args; swallow failures.
A hook failure must never change the poison-guard outcome.
"""
if on_poison is None:
return
try:
bound = inspect.signature(fn).bind_partial(
task_self, *args, idempotency_key=idempotency_key, **kwargs,
)
on_poison(task_name, dict(bound.arguments))
except Exception:
logger.exception(
"idempotency: poison hook failed for task=%s", task_name,
)
def _lookup_completed(key: str) -> Any:
"""Return cached ``result_json`` if a completed row exists for ``key``, else None."""
with db_readonly() as conn:

View File

@@ -27,8 +27,42 @@ DURABLE_TASK = dict(
)
# operation tag for the poison-path source.ingest.failed event, per task.
_INGEST_POISON_OPERATION = {
"ingest": "upload",
"ingest_remote": "upload",
"ingest_connector_task": "upload",
"reingest_source_task": "reingest",
}
def _emit_ingest_poison_event(task_name, bound):
"""Publish a terminal ``source.ingest.failed`` when the poison-guard trips.
The guard returns before the worker runs, so the worker's own failed
event never fires — without this the upload toast spins on "training".
"""
user = bound.get("user")
source_id = bound.get("source_id")
if not user or not source_id:
return
from application.events.publisher import publish_user_event
publish_user_event(
user,
"source.ingest.failed",
{
"source_id": str(source_id),
"filename": bound.get("filename") or "",
"operation": _INGEST_POISON_OPERATION.get(task_name, "upload"),
"error": "Ingestion stopped after repeated failures.",
},
scope={"kind": "source", "id": str(source_id)},
)
@celery.task(**DURABLE_TASK)
@with_idempotency(task_name="ingest")
@with_idempotency(task_name="ingest", on_poison=_emit_ingest_poison_event)
def ingest(
self,
directory,
@@ -57,7 +91,7 @@ def ingest(
@celery.task(**DURABLE_TASK)
@with_idempotency(task_name="ingest_remote")
@with_idempotency(task_name="ingest_remote", on_poison=_emit_ingest_poison_event)
def ingest_remote(
self, source_data, job_name, user, loader,
idempotency_key=None, source_id=None,
@@ -71,7 +105,9 @@ def ingest_remote(
@celery.task(**DURABLE_TASK)
@with_idempotency(task_name="reingest_source_task")
@with_idempotency(
task_name="reingest_source_task", on_poison=_emit_ingest_poison_event,
)
def reingest_source_task(self, source_id, user, idempotency_key=None):
from application.worker import reingest_source_worker
@@ -128,7 +164,9 @@ def process_agent_webhook(self, agent_id, payload, idempotency_key=None):
@celery.task(**DURABLE_TASK)
@with_idempotency(task_name="ingest_connector_task")
@with_idempotency(
task_name="ingest_connector_task", on_poison=_emit_ingest_poison_event,
)
def ingest_connector_task(
self,
job_name,

View File

@@ -66,6 +66,9 @@ class Settings(BaseSettings):
PARSE_IMAGE_REMOTE: bool = False
DOCLING_OCR_ENABLED: bool = False # Enable OCR for docling parsers (PDF, images)
DOCLING_OCR_ATTACHMENTS_ENABLED: bool = False # Enable OCR for docling when parsing attachments
# Pages docling's threaded pipeline buffers in flight; the library
# default (100) drives worker RSS to ~3 GB on a mid-size PDF.
DOCLING_PIPELINE_QUEUE_MAX_SIZE: int = 2
VECTOR_STORE: str = "faiss" # "faiss" or "elasticsearch" or "qdrant" or "milvus" or "lancedb" or "pgvector"
RETRIEVERS_ENABLED: list = ["classic_rag"]
AGENT_NAME: str = "classic"

View File

@@ -16,6 +16,29 @@ from application.parser.file.base_parser import BaseParser
logger = logging.getLogger(__name__)
# Per-stage batch size for docling's threaded pipeline; 1 holds the
# concurrent working set to a single page (see _apply_pipeline_caps).
_PIPELINE_BATCH_SIZE = 1
def _apply_pipeline_caps(pipeline_options) -> None:
"""Cap docling's threaded-pipeline queue depth and batch sizes in place.
hasattr-guarded so docling builds without these knobs are unaffected.
"""
from application.core.settings import settings
caps = {
"queue_max_size": max(1, settings.DOCLING_PIPELINE_QUEUE_MAX_SIZE),
"layout_batch_size": _PIPELINE_BATCH_SIZE,
"table_batch_size": _PIPELINE_BATCH_SIZE,
"ocr_batch_size": _PIPELINE_BATCH_SIZE,
}
for name, value in caps.items():
if hasattr(pipeline_options, name):
setattr(pipeline_options, name, value)
class DoclingParser(BaseParser):
"""Parser using docling for advanced document processing.
@@ -86,6 +109,7 @@ class DoclingParser(BaseParser):
do_ocr=self.ocr_enabled,
do_table_structure=self.table_structure,
)
_apply_pipeline_caps(pipeline_options)
if self.ocr_enabled:
ocr_options = self._get_ocr_options()

View File

@@ -417,3 +417,181 @@ class TestSuccessfulRunClearsLease:
assert row[0] == "completed"
assert row[1] is None
assert row[2] is None
@pytest.mark.unit
class TestSynthesizedKeyGuardsKeylessDispatch:
"""A keyless dispatch carrying ``source_id`` is still poison-guarded:
the wrapper synthesizes a deterministic key from ``source_id``.
"""
def test_keyless_with_source_id_records_dedup_row(self, pg_conn):
from application.api.user.idempotency import with_idempotency
@with_idempotency(task_name="ingest")
def task(self, idempotency_key=None, source_id=None):
return {"ran": True}
with _patch_decorator_db(pg_conn):
result = task(_fake_celery_self(), source_id="src-abc")
assert result == {"ran": True}
row = _row_for(pg_conn, "auto:ingest:src-abc")
assert row is not None
assert row[0] == "ingest"
assert row[2] == "completed"
def test_synthesized_key_stable_across_redeliveries(self, pg_conn):
"""Same ``source_id`` → same key → a redelivery short-circuits to
the cached result instead of re-running the body.
"""
from application.api.user.idempotency import with_idempotency
runs = {"count": 0}
@with_idempotency(task_name="ingest")
def task(self, idempotency_key=None, source_id=None):
runs["count"] += 1
return {"n": runs["count"]}
with _patch_decorator_db(pg_conn):
first = task(_fake_celery_self(), source_id="src-1")
second = task(_fake_celery_self(), source_id="src-1")
assert first == second == {"n": 1}
assert runs["count"] == 1
def test_poison_guard_trips_for_keyless_dispatch(self, pg_conn):
"""The core fix: a keyless OOM-looping dispatch is bounded — the
guard trips after MAX_TASK_ATTEMPTS with no explicit key.
"""
from application.api.user.idempotency import (
MAX_TASK_ATTEMPTS, with_idempotency,
)
runs = {"count": 0}
@with_idempotency(task_name="ingest")
def task(self, idempotency_key=None, source_id=None):
runs["count"] += 1
raise RuntimeError("OOM-style failure")
with _patch_decorator_db(pg_conn):
for _ in range(MAX_TASK_ATTEMPTS):
with pytest.raises(RuntimeError):
task(_fake_celery_self(), source_id="src-poison")
result = task(_fake_celery_self(), source_id="src-poison")
assert runs["count"] == MAX_TASK_ATTEMPTS
assert result["success"] is False
assert "poison-loop" in result["error"]
assert _row_for(pg_conn, "auto:ingest:src-poison")[2] == "failed"
def test_no_source_id_no_key_runs_unguarded(self, pg_conn):
"""No explicit key and no ``source_id`` anchor → pass through with
no DB writes, exactly as before.
"""
from application.api.user.idempotency import with_idempotency
@with_idempotency(task_name="store_attachment")
def task(self, idempotency_key=None):
return {"ran": True}
with patch(
"application.api.user.idempotency.db_session"
) as mock_session, patch(
"application.api.user.idempotency.db_readonly"
) as mock_readonly:
result = task(_fake_celery_self())
assert result == {"ran": True}
assert mock_session.call_count == 0
assert mock_readonly.call_count == 0
def test_explicit_key_takes_precedence_over_source_id(self, pg_conn):
"""An explicit key wins; the synthesized ``auto:`` key is unused."""
from application.api.user.idempotency import with_idempotency
@with_idempotency(task_name="ingest")
def task(self, idempotency_key=None, source_id=None):
return {"ran": True}
with _patch_decorator_db(pg_conn):
task(
_fake_celery_self(),
idempotency_key="explicit-k",
source_id="src-x",
)
assert _row_for(pg_conn, "explicit-k") is not None
assert _row_for(pg_conn, "auto:ingest:src-x") is None
@pytest.mark.unit
class TestPoisonHook:
"""``on_poison`` fires on the poison-guard branch with the task's
bound arguments, and never on the success path.
"""
def test_hook_invoked_with_bound_args_on_poison(self, pg_conn):
from application.api.user.idempotency import (
MAX_TASK_ATTEMPTS, with_idempotency,
)
captured = []
def _hook(task_name, bound):
captured.append((task_name, bound))
@with_idempotency(task_name="ingest", on_poison=_hook)
def task(self, idempotency_key=None, source_id=None):
raise RuntimeError("never converges")
with _patch_decorator_db(pg_conn):
for _ in range(MAX_TASK_ATTEMPTS):
with pytest.raises(RuntimeError):
task(_fake_celery_self(), source_id="src-h")
task(_fake_celery_self(), source_id="src-h")
assert len(captured) == 1
task_name, bound = captured[0]
assert task_name == "ingest"
assert bound["source_id"] == "src-h"
def test_hook_not_invoked_on_success(self, pg_conn):
from application.api.user.idempotency import with_idempotency
calls = []
@with_idempotency(
task_name="ingest", on_poison=lambda *a: calls.append(a)
)
def task(self, idempotency_key=None, source_id=None):
return {"ok": True}
with _patch_decorator_db(pg_conn):
task(_fake_celery_self(), source_id="src-ok")
assert calls == []
def test_hook_failure_does_not_break_poison_return(self, pg_conn):
"""A throwing hook must not change the poison-guard outcome."""
from application.api.user.idempotency import (
MAX_TASK_ATTEMPTS, with_idempotency,
)
def _bad_hook(task_name, bound):
raise ValueError("hook blew up")
@with_idempotency(task_name="ingest", on_poison=_bad_hook)
def task(self, idempotency_key=None, source_id=None):
raise RuntimeError("never converges")
with _patch_decorator_db(pg_conn):
for _ in range(MAX_TASK_ATTEMPTS):
with pytest.raises(RuntimeError):
task(_fake_celery_self(), source_id="src-bad")
result = task(_fake_celery_self(), source_id="src-bad")
assert result["success"] is False
assert "poison-loop" in result["error"]

View File

@@ -546,3 +546,63 @@ class TestIngestIdempotency:
assert first == second
assert first == {"status": "ok", "directory": "dir"}
assert len(worker_calls) == 1
class TestIngestPoisonEvent:
"""The poison hook publishes a terminal source.ingest.failed so the
upload toast resolves instead of hanging on "training".
"""
@pytest.mark.unit
def test_publishes_failed_event(self):
from application.api.user.tasks import _emit_ingest_poison_event
published = []
def _fake_publish(user, event_type, payload, *, scope=None):
published.append((user, event_type, payload, scope))
with patch(
"application.events.publisher.publish_user_event",
side_effect=_fake_publish,
):
_emit_ingest_poison_event(
"ingest",
{"user": "u1", "source_id": "src-9", "filename": "doc.pdf"},
)
assert len(published) == 1
user, event_type, payload, scope = published[0]
assert user == "u1"
assert event_type == "source.ingest.failed"
assert payload["source_id"] == "src-9"
assert payload["filename"] == "doc.pdf"
assert payload["operation"] == "upload"
assert scope == {"kind": "source", "id": "src-9"}
@pytest.mark.unit
def test_skips_when_source_id_missing(self):
from application.api.user.tasks import _emit_ingest_poison_event
with patch(
"application.events.publisher.publish_user_event",
) as mock_publish:
_emit_ingest_poison_event("ingest", {"user": "u1"})
mock_publish.assert_not_called()
@pytest.mark.unit
def test_reingest_uses_reingest_operation(self):
from application.api.user.tasks import _emit_ingest_poison_event
published = []
with patch(
"application.events.publisher.publish_user_event",
side_effect=lambda *a, **k: published.append((a, k)),
):
_emit_ingest_poison_event(
"reingest_source_task",
{"user": "u1", "source_id": "src-r"},
)
assert published[0][0][2]["operation"] == "reingest"

View File

@@ -421,3 +421,85 @@ class TestDoclingParserGaps:
parser = DoclingCSVParser()
assert parser.export_format == "markdown"
assert parser.ocr_enabled is True
# =====================================================================
# Pipeline memory caps
# =====================================================================
@pytest.mark.unit
class TestApplyPipelineCaps:
"""_apply_pipeline_caps bounds docling's threaded-pipeline buffering."""
def test_caps_threaded_pipeline_knobs(self, monkeypatch):
from application.core.settings import settings
from application.parser.file.docling_parser import _apply_pipeline_caps
monkeypatch.setattr(
settings, "DOCLING_PIPELINE_QUEUE_MAX_SIZE", 2, raising=False
)
class Opts:
# docling >= 2.94 threaded pipeline — all knobs present.
queue_max_size = 100
layout_batch_size = 4
table_batch_size = 4
ocr_batch_size = 4
opts = Opts()
_apply_pipeline_caps(opts)
assert opts.queue_max_size == 2
assert opts.layout_batch_size == 1
assert opts.table_batch_size == 1
assert opts.ocr_batch_size == 1
def test_queue_size_is_settings_driven(self, monkeypatch):
from application.core.settings import settings
from application.parser.file.docling_parser import _apply_pipeline_caps
monkeypatch.setattr(
settings, "DOCLING_PIPELINE_QUEUE_MAX_SIZE", 6, raising=False
)
class Opts:
queue_max_size = 100
opts = Opts()
_apply_pipeline_caps(opts)
assert opts.queue_max_size == 6
def test_misconfigured_zero_floors_to_one(self, monkeypatch):
"""A 0 queue depth could deadlock the threaded pipeline — floor it."""
from application.core.settings import settings
from application.parser.file.docling_parser import _apply_pipeline_caps
monkeypatch.setattr(
settings, "DOCLING_PIPELINE_QUEUE_MAX_SIZE", 0, raising=False
)
class Opts:
queue_max_size = 100
opts = Opts()
_apply_pipeline_caps(opts)
assert opts.queue_max_size == 1
def test_noop_on_docling_without_threaded_pipeline(self):
"""Builds predating the threaded pipeline lack the knobs — the cap
must be a silent no-op, not an AttributeError."""
from application.parser.file.docling_parser import _apply_pipeline_caps
class LegacyOpts:
__slots__ = ("do_ocr", "do_table_structure")
def __init__(self):
self.do_ocr = False
self.do_table_structure = True
opts = LegacyOpts()
_apply_pipeline_caps(opts) # must not raise
assert not hasattr(opts, "queue_max_size")
assert not hasattr(opts, "layout_batch_size")