From 1de82ca040b93a2da275b0093dff5820ae0b4102 Mon Sep 17 00:00:00 2001 From: Alex Date: Mon, 18 May 2026 22:22:43 +0100 Subject: [PATCH] fix: batch limits and failed task reque limit (#2484) --- application/api/user/idempotency.py | 63 ++++++- application/api/user/tasks.py | 46 ++++- application/core/settings.py | 3 + application/parser/file/docling_parser.py | 24 +++ tests/api/user/test_idempotency_decorator.py | 178 +++++++++++++++++++ tests/api/user/test_tasks.py | 60 +++++++ tests/parser/file/test_docling_parser.py | 82 +++++++++ 7 files changed, 449 insertions(+), 7 deletions(-) diff --git a/application/api/user/idempotency.py b/application/api/user/idempotency.py index 21e9eef1..c4199f79 100644 --- a/application/api/user/idempotency.py +++ b/application/api/user/idempotency.py @@ -3,6 +3,7 @@ from __future__ import annotations import functools +import inspect import logging import threading import uuid @@ -26,13 +27,20 @@ LEASE_HEARTBEAT_INTERVAL = 30 LEASE_RETRY_MAX = 10 -def with_idempotency(task_name: str) -> Callable[[Callable[..., Any]], Callable[..., Any]]: +def with_idempotency( + task_name: str, + *, + on_poison: Optional[Callable[[str, dict], None]] = None, +) -> Callable[[Callable[..., Any]], Callable[..., Any]]: """Short-circuit on completed key; gate concurrent runs via a lease. + The guard key is the caller's ``idempotency_key``, or one synthesized + from ``source_id`` so a keyless dispatch is still poison-guarded. + Entry short-circuits: - completed row → return cached result - live lease held → retry(countdown=LEASE_TTL_SECONDS) - - attempt_count > MAX_TASK_ATTEMPTS → poison-loop alert + - attempt_count > MAX_TASK_ATTEMPTS → poison alert; ``on_poison`` fires Success writes ``completed``; exceptions leave ``pending`` for autoretry until the poison-loop guard trips. """ @@ -40,7 +48,14 @@ def with_idempotency(task_name: str) -> Callable[[Callable[..., Any]], Callable[ def decorator(fn: Callable[..., Any]) -> Callable[..., Any]: @functools.wraps(fn) def wrapper(self, *args: Any, idempotency_key: Any = None, **kwargs: Any) -> Any: - key = idempotency_key if isinstance(idempotency_key, str) and idempotency_key else None + explicit_key = ( + idempotency_key + if isinstance(idempotency_key, str) and idempotency_key + else None + ) + # A keyless dispatch still gets the guard via a synthesized key; + # None means no anchor exists — run unguarded, as before. + key = explicit_key or _synthesize_guard_key(task_name, kwargs) if key is None: return fn(self, *args, idempotency_key=idempotency_key, **kwargs) @@ -88,6 +103,9 @@ def with_idempotency(task_name: str) -> Callable[[Callable[..., Any]], Callable[ "attempts": attempt, } _finalize(key, poisoned, status="failed") + _run_poison_hook( + on_poison, task_name, fn, self, args, kwargs, idempotency_key, + ) return poisoned heartbeat_thread, heartbeat_stop = _start_lease_heartbeat( @@ -109,6 +127,45 @@ def with_idempotency(task_name: str) -> Callable[[Callable[..., Any]], Callable[ return decorator +def _synthesize_guard_key(task_name: str, kwargs: dict) -> Optional[str]: + """Derive a deterministic guard key from ``source_id`` for a keyless dispatch. + + ``source_id`` is stable across broker redeliveries and unique per + upload, so the poison-loop counter survives an OOM SIGKILL. Returns + ``None`` when absent — the dispatch then runs unguarded as before. + """ + source_id = kwargs.get("source_id") + if source_id: + return f"auto:{task_name}:{source_id}" + return None + + +def _run_poison_hook( + on_poison: Optional[Callable[[str, dict], None]], + task_name: str, + fn: Callable[..., Any], + task_self: Any, + args: tuple, + kwargs: dict, + idempotency_key: Any, +) -> None: + """Invoke a task's poison-path hook with named call args; swallow failures. + + A hook failure must never change the poison-guard outcome. + """ + if on_poison is None: + return + try: + bound = inspect.signature(fn).bind_partial( + task_self, *args, idempotency_key=idempotency_key, **kwargs, + ) + on_poison(task_name, dict(bound.arguments)) + except Exception: + logger.exception( + "idempotency: poison hook failed for task=%s", task_name, + ) + + def _lookup_completed(key: str) -> Any: """Return cached ``result_json`` if a completed row exists for ``key``, else None.""" with db_readonly() as conn: diff --git a/application/api/user/tasks.py b/application/api/user/tasks.py index 1fc7d091..ea7385dd 100644 --- a/application/api/user/tasks.py +++ b/application/api/user/tasks.py @@ -27,8 +27,42 @@ DURABLE_TASK = dict( ) +# operation tag for the poison-path source.ingest.failed event, per task. +_INGEST_POISON_OPERATION = { + "ingest": "upload", + "ingest_remote": "upload", + "ingest_connector_task": "upload", + "reingest_source_task": "reingest", +} + + +def _emit_ingest_poison_event(task_name, bound): + """Publish a terminal ``source.ingest.failed`` when the poison-guard trips. + + The guard returns before the worker runs, so the worker's own failed + event never fires — without this the upload toast spins on "training". + """ + user = bound.get("user") + source_id = bound.get("source_id") + if not user or not source_id: + return + from application.events.publisher import publish_user_event + + publish_user_event( + user, + "source.ingest.failed", + { + "source_id": str(source_id), + "filename": bound.get("filename") or "", + "operation": _INGEST_POISON_OPERATION.get(task_name, "upload"), + "error": "Ingestion stopped after repeated failures.", + }, + scope={"kind": "source", "id": str(source_id)}, + ) + + @celery.task(**DURABLE_TASK) -@with_idempotency(task_name="ingest") +@with_idempotency(task_name="ingest", on_poison=_emit_ingest_poison_event) def ingest( self, directory, @@ -57,7 +91,7 @@ def ingest( @celery.task(**DURABLE_TASK) -@with_idempotency(task_name="ingest_remote") +@with_idempotency(task_name="ingest_remote", on_poison=_emit_ingest_poison_event) def ingest_remote( self, source_data, job_name, user, loader, idempotency_key=None, source_id=None, @@ -71,7 +105,9 @@ def ingest_remote( @celery.task(**DURABLE_TASK) -@with_idempotency(task_name="reingest_source_task") +@with_idempotency( + task_name="reingest_source_task", on_poison=_emit_ingest_poison_event, +) def reingest_source_task(self, source_id, user, idempotency_key=None): from application.worker import reingest_source_worker @@ -128,7 +164,9 @@ def process_agent_webhook(self, agent_id, payload, idempotency_key=None): @celery.task(**DURABLE_TASK) -@with_idempotency(task_name="ingest_connector_task") +@with_idempotency( + task_name="ingest_connector_task", on_poison=_emit_ingest_poison_event, +) def ingest_connector_task( self, job_name, diff --git a/application/core/settings.py b/application/core/settings.py index e243bcbd..61ea4b35 100644 --- a/application/core/settings.py +++ b/application/core/settings.py @@ -66,6 +66,9 @@ class Settings(BaseSettings): PARSE_IMAGE_REMOTE: bool = False DOCLING_OCR_ENABLED: bool = False # Enable OCR for docling parsers (PDF, images) DOCLING_OCR_ATTACHMENTS_ENABLED: bool = False # Enable OCR for docling when parsing attachments + # Pages docling's threaded pipeline buffers in flight; the library + # default (100) drives worker RSS to ~3 GB on a mid-size PDF. + DOCLING_PIPELINE_QUEUE_MAX_SIZE: int = 2 VECTOR_STORE: str = "faiss" # "faiss" or "elasticsearch" or "qdrant" or "milvus" or "lancedb" or "pgvector" RETRIEVERS_ENABLED: list = ["classic_rag"] AGENT_NAME: str = "classic" diff --git a/application/parser/file/docling_parser.py b/application/parser/file/docling_parser.py index a12431d1..ff0d3656 100644 --- a/application/parser/file/docling_parser.py +++ b/application/parser/file/docling_parser.py @@ -16,6 +16,29 @@ from application.parser.file.base_parser import BaseParser logger = logging.getLogger(__name__) +# Per-stage batch size for docling's threaded pipeline; 1 holds the +# concurrent working set to a single page (see _apply_pipeline_caps). +_PIPELINE_BATCH_SIZE = 1 + + +def _apply_pipeline_caps(pipeline_options) -> None: + """Cap docling's threaded-pipeline queue depth and batch sizes in place. + + hasattr-guarded so docling builds without these knobs are unaffected. + """ + from application.core.settings import settings + + caps = { + "queue_max_size": max(1, settings.DOCLING_PIPELINE_QUEUE_MAX_SIZE), + "layout_batch_size": _PIPELINE_BATCH_SIZE, + "table_batch_size": _PIPELINE_BATCH_SIZE, + "ocr_batch_size": _PIPELINE_BATCH_SIZE, + } + for name, value in caps.items(): + if hasattr(pipeline_options, name): + setattr(pipeline_options, name, value) + + class DoclingParser(BaseParser): """Parser using docling for advanced document processing. @@ -86,6 +109,7 @@ class DoclingParser(BaseParser): do_ocr=self.ocr_enabled, do_table_structure=self.table_structure, ) + _apply_pipeline_caps(pipeline_options) if self.ocr_enabled: ocr_options = self._get_ocr_options() diff --git a/tests/api/user/test_idempotency_decorator.py b/tests/api/user/test_idempotency_decorator.py index 89024ffd..e927cc07 100644 --- a/tests/api/user/test_idempotency_decorator.py +++ b/tests/api/user/test_idempotency_decorator.py @@ -417,3 +417,181 @@ class TestSuccessfulRunClearsLease: assert row[0] == "completed" assert row[1] is None assert row[2] is None + + +@pytest.mark.unit +class TestSynthesizedKeyGuardsKeylessDispatch: + """A keyless dispatch carrying ``source_id`` is still poison-guarded: + the wrapper synthesizes a deterministic key from ``source_id``. + """ + + def test_keyless_with_source_id_records_dedup_row(self, pg_conn): + from application.api.user.idempotency import with_idempotency + + @with_idempotency(task_name="ingest") + def task(self, idempotency_key=None, source_id=None): + return {"ran": True} + + with _patch_decorator_db(pg_conn): + result = task(_fake_celery_self(), source_id="src-abc") + + assert result == {"ran": True} + row = _row_for(pg_conn, "auto:ingest:src-abc") + assert row is not None + assert row[0] == "ingest" + assert row[2] == "completed" + + def test_synthesized_key_stable_across_redeliveries(self, pg_conn): + """Same ``source_id`` → same key → a redelivery short-circuits to + the cached result instead of re-running the body. + """ + from application.api.user.idempotency import with_idempotency + + runs = {"count": 0} + + @with_idempotency(task_name="ingest") + def task(self, idempotency_key=None, source_id=None): + runs["count"] += 1 + return {"n": runs["count"]} + + with _patch_decorator_db(pg_conn): + first = task(_fake_celery_self(), source_id="src-1") + second = task(_fake_celery_self(), source_id="src-1") + + assert first == second == {"n": 1} + assert runs["count"] == 1 + + def test_poison_guard_trips_for_keyless_dispatch(self, pg_conn): + """The core fix: a keyless OOM-looping dispatch is bounded — the + guard trips after MAX_TASK_ATTEMPTS with no explicit key. + """ + from application.api.user.idempotency import ( + MAX_TASK_ATTEMPTS, with_idempotency, + ) + + runs = {"count": 0} + + @with_idempotency(task_name="ingest") + def task(self, idempotency_key=None, source_id=None): + runs["count"] += 1 + raise RuntimeError("OOM-style failure") + + with _patch_decorator_db(pg_conn): + for _ in range(MAX_TASK_ATTEMPTS): + with pytest.raises(RuntimeError): + task(_fake_celery_self(), source_id="src-poison") + result = task(_fake_celery_self(), source_id="src-poison") + + assert runs["count"] == MAX_TASK_ATTEMPTS + assert result["success"] is False + assert "poison-loop" in result["error"] + assert _row_for(pg_conn, "auto:ingest:src-poison")[2] == "failed" + + def test_no_source_id_no_key_runs_unguarded(self, pg_conn): + """No explicit key and no ``source_id`` anchor → pass through with + no DB writes, exactly as before. + """ + from application.api.user.idempotency import with_idempotency + + @with_idempotency(task_name="store_attachment") + def task(self, idempotency_key=None): + return {"ran": True} + + with patch( + "application.api.user.idempotency.db_session" + ) as mock_session, patch( + "application.api.user.idempotency.db_readonly" + ) as mock_readonly: + result = task(_fake_celery_self()) + + assert result == {"ran": True} + assert mock_session.call_count == 0 + assert mock_readonly.call_count == 0 + + def test_explicit_key_takes_precedence_over_source_id(self, pg_conn): + """An explicit key wins; the synthesized ``auto:`` key is unused.""" + from application.api.user.idempotency import with_idempotency + + @with_idempotency(task_name="ingest") + def task(self, idempotency_key=None, source_id=None): + return {"ran": True} + + with _patch_decorator_db(pg_conn): + task( + _fake_celery_self(), + idempotency_key="explicit-k", + source_id="src-x", + ) + + assert _row_for(pg_conn, "explicit-k") is not None + assert _row_for(pg_conn, "auto:ingest:src-x") is None + + +@pytest.mark.unit +class TestPoisonHook: + """``on_poison`` fires on the poison-guard branch with the task's + bound arguments, and never on the success path. + """ + + def test_hook_invoked_with_bound_args_on_poison(self, pg_conn): + from application.api.user.idempotency import ( + MAX_TASK_ATTEMPTS, with_idempotency, + ) + + captured = [] + + def _hook(task_name, bound): + captured.append((task_name, bound)) + + @with_idempotency(task_name="ingest", on_poison=_hook) + def task(self, idempotency_key=None, source_id=None): + raise RuntimeError("never converges") + + with _patch_decorator_db(pg_conn): + for _ in range(MAX_TASK_ATTEMPTS): + with pytest.raises(RuntimeError): + task(_fake_celery_self(), source_id="src-h") + task(_fake_celery_self(), source_id="src-h") + + assert len(captured) == 1 + task_name, bound = captured[0] + assert task_name == "ingest" + assert bound["source_id"] == "src-h" + + def test_hook_not_invoked_on_success(self, pg_conn): + from application.api.user.idempotency import with_idempotency + + calls = [] + + @with_idempotency( + task_name="ingest", on_poison=lambda *a: calls.append(a) + ) + def task(self, idempotency_key=None, source_id=None): + return {"ok": True} + + with _patch_decorator_db(pg_conn): + task(_fake_celery_self(), source_id="src-ok") + + assert calls == [] + + def test_hook_failure_does_not_break_poison_return(self, pg_conn): + """A throwing hook must not change the poison-guard outcome.""" + from application.api.user.idempotency import ( + MAX_TASK_ATTEMPTS, with_idempotency, + ) + + def _bad_hook(task_name, bound): + raise ValueError("hook blew up") + + @with_idempotency(task_name="ingest", on_poison=_bad_hook) + def task(self, idempotency_key=None, source_id=None): + raise RuntimeError("never converges") + + with _patch_decorator_db(pg_conn): + for _ in range(MAX_TASK_ATTEMPTS): + with pytest.raises(RuntimeError): + task(_fake_celery_self(), source_id="src-bad") + result = task(_fake_celery_self(), source_id="src-bad") + + assert result["success"] is False + assert "poison-loop" in result["error"] diff --git a/tests/api/user/test_tasks.py b/tests/api/user/test_tasks.py index a2792db0..5826716a 100644 --- a/tests/api/user/test_tasks.py +++ b/tests/api/user/test_tasks.py @@ -546,3 +546,63 @@ class TestIngestIdempotency: assert first == second assert first == {"status": "ok", "directory": "dir"} assert len(worker_calls) == 1 + + +class TestIngestPoisonEvent: + """The poison hook publishes a terminal source.ingest.failed so the + upload toast resolves instead of hanging on "training". + """ + + @pytest.mark.unit + def test_publishes_failed_event(self): + from application.api.user.tasks import _emit_ingest_poison_event + + published = [] + + def _fake_publish(user, event_type, payload, *, scope=None): + published.append((user, event_type, payload, scope)) + + with patch( + "application.events.publisher.publish_user_event", + side_effect=_fake_publish, + ): + _emit_ingest_poison_event( + "ingest", + {"user": "u1", "source_id": "src-9", "filename": "doc.pdf"}, + ) + + assert len(published) == 1 + user, event_type, payload, scope = published[0] + assert user == "u1" + assert event_type == "source.ingest.failed" + assert payload["source_id"] == "src-9" + assert payload["filename"] == "doc.pdf" + assert payload["operation"] == "upload" + assert scope == {"kind": "source", "id": "src-9"} + + @pytest.mark.unit + def test_skips_when_source_id_missing(self): + from application.api.user.tasks import _emit_ingest_poison_event + + with patch( + "application.events.publisher.publish_user_event", + ) as mock_publish: + _emit_ingest_poison_event("ingest", {"user": "u1"}) + + mock_publish.assert_not_called() + + @pytest.mark.unit + def test_reingest_uses_reingest_operation(self): + from application.api.user.tasks import _emit_ingest_poison_event + + published = [] + with patch( + "application.events.publisher.publish_user_event", + side_effect=lambda *a, **k: published.append((a, k)), + ): + _emit_ingest_poison_event( + "reingest_source_task", + {"user": "u1", "source_id": "src-r"}, + ) + + assert published[0][0][2]["operation"] == "reingest" diff --git a/tests/parser/file/test_docling_parser.py b/tests/parser/file/test_docling_parser.py index cbf7e7af..97f96368 100644 --- a/tests/parser/file/test_docling_parser.py +++ b/tests/parser/file/test_docling_parser.py @@ -421,3 +421,85 @@ class TestDoclingParserGaps: parser = DoclingCSVParser() assert parser.export_format == "markdown" assert parser.ocr_enabled is True + + +# ===================================================================== +# Pipeline memory caps +# ===================================================================== + + +@pytest.mark.unit +class TestApplyPipelineCaps: + """_apply_pipeline_caps bounds docling's threaded-pipeline buffering.""" + + def test_caps_threaded_pipeline_knobs(self, monkeypatch): + from application.core.settings import settings + from application.parser.file.docling_parser import _apply_pipeline_caps + + monkeypatch.setattr( + settings, "DOCLING_PIPELINE_QUEUE_MAX_SIZE", 2, raising=False + ) + + class Opts: + # docling >= 2.94 threaded pipeline — all knobs present. + queue_max_size = 100 + layout_batch_size = 4 + table_batch_size = 4 + ocr_batch_size = 4 + + opts = Opts() + _apply_pipeline_caps(opts) + + assert opts.queue_max_size == 2 + assert opts.layout_batch_size == 1 + assert opts.table_batch_size == 1 + assert opts.ocr_batch_size == 1 + + def test_queue_size_is_settings_driven(self, monkeypatch): + from application.core.settings import settings + from application.parser.file.docling_parser import _apply_pipeline_caps + + monkeypatch.setattr( + settings, "DOCLING_PIPELINE_QUEUE_MAX_SIZE", 6, raising=False + ) + + class Opts: + queue_max_size = 100 + + opts = Opts() + _apply_pipeline_caps(opts) + assert opts.queue_max_size == 6 + + def test_misconfigured_zero_floors_to_one(self, monkeypatch): + """A 0 queue depth could deadlock the threaded pipeline — floor it.""" + from application.core.settings import settings + from application.parser.file.docling_parser import _apply_pipeline_caps + + monkeypatch.setattr( + settings, "DOCLING_PIPELINE_QUEUE_MAX_SIZE", 0, raising=False + ) + + class Opts: + queue_max_size = 100 + + opts = Opts() + _apply_pipeline_caps(opts) + assert opts.queue_max_size == 1 + + def test_noop_on_docling_without_threaded_pipeline(self): + """Builds predating the threaded pipeline lack the knobs — the cap + must be a silent no-op, not an AttributeError.""" + from application.parser.file.docling_parser import _apply_pipeline_caps + + class LegacyOpts: + __slots__ = ("do_ocr", "do_table_structure") + + def __init__(self): + self.do_ocr = False + self.do_table_structure = True + + opts = LegacyOpts() + _apply_pipeline_caps(opts) # must not raise + + assert not hasattr(opts, "queue_max_size") + assert not hasattr(opts, "layout_batch_size")