Compare commits

...

2 Commits

Author SHA1 Message Date
Alex
afb69bb584 fix: batch limits and failed task reque limit 2026-05-18 22:00:39 +01:00
Alex
8f7742c937 fix: better source upload status and fix reconciliation issue (#2482)
* fix: better source upload status and fix reconciliation issue

* fix: mini issues

* chore: locale coverage
2026-05-18 14:22:03 +01:00
38 changed files with 1401 additions and 50 deletions

View File

@@ -0,0 +1,44 @@
"""0008 ingest_chunk_progress.status — terminal flag for stalled ingests.
The reconciler's stalled-ingest sweep had no terminal write, so a dead
ingest re-alerted every ~30 min forever. ``status`` lets it escalate a
stalled checkpoint to ``'stalled'`` once and stop re-selecting it;
``init_progress`` resets it to ``'active'`` on reingest.
Revision ID: 0008_ingest_progress_status
Revises: 0007_message_events
"""
from typing import Sequence, Union
from alembic import op
revision: str = "0008_ingest_progress_status"
down_revision: Union[str, None] = "0007_message_events"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# Constant DEFAULT — metadata-only ADD COLUMN, no table rewrite.
op.execute(
"""
ALTER TABLE ingest_chunk_progress
ADD COLUMN status TEXT NOT NULL DEFAULT 'active'
CHECK (status IN ('active', 'stalled'));
"""
)
# Partial index for the reconciler's stalled-ingest sweep.
op.execute(
"CREATE INDEX ingest_chunk_progress_active_idx "
"ON ingest_chunk_progress (last_updated) "
"WHERE status = 'active';"
)
def downgrade() -> None:
op.execute("DROP INDEX IF EXISTS ingest_chunk_progress_active_idx;")
op.execute(
"ALTER TABLE ingest_chunk_progress DROP COLUMN IF EXISTS status;"
)

View File

@@ -3,6 +3,7 @@
from __future__ import annotations
import functools
import inspect
import logging
import threading
import uuid
@@ -26,13 +27,20 @@ LEASE_HEARTBEAT_INTERVAL = 30
LEASE_RETRY_MAX = 10
def with_idempotency(task_name: str) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
def with_idempotency(
task_name: str,
*,
on_poison: Optional[Callable[[str, dict], None]] = None,
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
"""Short-circuit on completed key; gate concurrent runs via a lease.
The guard key is the caller's ``idempotency_key``, or one synthesized
from ``source_id`` so a keyless dispatch is still poison-guarded.
Entry short-circuits:
- completed row → return cached result
- live lease held → retry(countdown=LEASE_TTL_SECONDS)
- attempt_count > MAX_TASK_ATTEMPTS → poison-loop alert
- attempt_count > MAX_TASK_ATTEMPTS → poison alert; ``on_poison`` fires
Success writes ``completed``; exceptions leave ``pending`` for
autoretry until the poison-loop guard trips.
"""
@@ -40,7 +48,14 @@ def with_idempotency(task_name: str) -> Callable[[Callable[..., Any]], Callable[
def decorator(fn: Callable[..., Any]) -> Callable[..., Any]:
@functools.wraps(fn)
def wrapper(self, *args: Any, idempotency_key: Any = None, **kwargs: Any) -> Any:
key = idempotency_key if isinstance(idempotency_key, str) and idempotency_key else None
explicit_key = (
idempotency_key
if isinstance(idempotency_key, str) and idempotency_key
else None
)
# A keyless dispatch still gets the guard via a synthesized key;
# None means no anchor exists — run unguarded, as before.
key = explicit_key or _synthesize_guard_key(task_name, kwargs)
if key is None:
return fn(self, *args, idempotency_key=idempotency_key, **kwargs)
@@ -88,6 +103,9 @@ def with_idempotency(task_name: str) -> Callable[[Callable[..., Any]], Callable[
"attempts": attempt,
}
_finalize(key, poisoned, status="failed")
_run_poison_hook(
on_poison, task_name, fn, self, args, kwargs, idempotency_key,
)
return poisoned
heartbeat_thread, heartbeat_stop = _start_lease_heartbeat(
@@ -109,6 +127,45 @@ def with_idempotency(task_name: str) -> Callable[[Callable[..., Any]], Callable[
return decorator
def _synthesize_guard_key(task_name: str, kwargs: dict) -> Optional[str]:
"""Derive a deterministic guard key from ``source_id`` for a keyless dispatch.
``source_id`` is stable across broker redeliveries and unique per
upload, so the poison-loop counter survives an OOM SIGKILL. Returns
``None`` when absent — the dispatch then runs unguarded as before.
"""
source_id = kwargs.get("source_id")
if source_id:
return f"auto:{task_name}:{source_id}"
return None
def _run_poison_hook(
on_poison: Optional[Callable[[str, dict], None]],
task_name: str,
fn: Callable[..., Any],
task_self: Any,
args: tuple,
kwargs: dict,
idempotency_key: Any,
) -> None:
"""Invoke a task's poison-path hook with named call args; swallow failures.
A hook failure must never change the poison-guard outcome.
"""
if on_poison is None:
return
try:
bound = inspect.signature(fn).bind_partial(
task_self, *args, idempotency_key=idempotency_key, **kwargs,
)
on_poison(task_name, dict(bound.arguments))
except Exception:
logger.exception(
"idempotency: poison hook failed for task=%s", task_name,
)
def _lookup_completed(key: str) -> Any:
"""Return cached ``result_json`` if a completed row exists for ``key``, else None."""
with db_readonly() as conn:

View File

@@ -114,11 +114,11 @@ def run_reconciliation() -> Dict[str, Any]:
},
)
# Q4: ingest checkpoints whose heartbeat has gone silent. The
# reconciler only escalates (alerts) — it doesn't kill the worker
# or roll back the partial embed. The next dispatch resumes from
# ``last_index`` thanks to the per-chunk checkpoint, so this is an
# observability sweep, not a recovery action.
# Q4: ingest checkpoints whose heartbeat has gone silent. Each is
# escalated to terminal ``status='stalled'`` and alerted once — no
# worker kill, no rollback of the partial embed. The 'stalled' flag
# ends the re-alert loop and drives the "indexing failed" badge the
# sources list derives from this row.
with engine.begin() as conn:
repo = ReconciliationRepository(conn)
for row in repo.find_and_lock_stalled_ingests():
@@ -134,8 +134,7 @@ def run_reconciliation() -> Dict[str, Any]:
"last_updated": str(row.get("last_updated")),
},
)
# Bump the heartbeat so we don't re-alert every tick.
repo.touch_ingest_progress(str(row["source_id"]))
repo.mark_ingest_stalled(str(row["source_id"]))
# Q5: idempotency rows whose lease expired with attempts exhausted.
# The wrapper's poison-loop guard normally finalises these, but if

View File

@@ -7,9 +7,12 @@ from flask import current_app, jsonify, make_response, redirect, request
from flask_restx import fields, Namespace, Resource
from application.api import api
from application.api.user.tasks import sync_source
from application.api.user.tasks import reingest_source_task, sync_source
from application.core.settings import settings
from application.parser.remote.remote_creator import normalize_remote_data
from application.storage.db.repositories.ingest_chunk_progress import (
IngestChunkProgressRepository,
)
from application.storage.db.repositories.sources import SourcesRepository
from application.storage.db.session import db_readonly, db_session
from application.storage.storage_creator import StorageCreator
@@ -140,6 +143,8 @@ class PaginatedSources(Resource):
"provider": provider,
"isNested": bool(doc.get("directory_structure")),
"type": doc.get("type", "file"),
# Derived in SourcesRepository.list_for_user.
"ingestStatus": doc.get("ingest_status"),
}
)
response = {
@@ -347,6 +352,70 @@ class SyncSource(Resource):
return make_response(jsonify({"success": True, "task_id": task.id}), 200)
@sources_ns.route("/sources/reingest")
class ReingestSource(Resource):
reingest_source_model = api.model(
"ReingestSourceModel",
{"source_id": fields.String(required=True, description="Source ID")},
)
@api.expect(reingest_source_model)
@api.doc(
description="Re-run ingestion for a source — e.g. to recover a "
"stalled embed flagged by the reconciler."
)
def post(self):
decoded_token = request.decoded_token
if not decoded_token:
return make_response(jsonify({"success": False}), 401)
user = decoded_token.get("sub")
data = request.get_json() or {}
missing_fields = check_required_fields(data, ["source_id"])
if missing_fields:
return missing_fields
source_id = data["source_id"]
try:
with db_readonly() as conn:
doc = SourcesRepository(conn).get_any(source_id, user)
except Exception as err:
current_app.logger.error(
f"Error looking up source: {err}", exc_info=True
)
return make_response(
jsonify({"success": False, "message": "Invalid source ID"}), 400
)
if not doc:
return make_response(
jsonify({"success": False, "message": "Source not found"}), 404
)
resolved_source_id = str(doc["id"])
# Drop the stale chunk-progress row so the sources list stops
# deriving a 'failed' status; reingest never rewrites it itself.
try:
with db_session() as conn:
IngestChunkProgressRepository(conn).delete(resolved_source_id)
except Exception as err:
current_app.logger.warning(
f"Could not clear ingest progress for {resolved_source_id}: "
f"{err}",
exc_info=True,
)
try:
# Scoped key so repeated clicks collapse onto one reingest.
task = reingest_source_task.delay(
source_id=resolved_source_id,
user=user,
idempotency_key=f"reingest-source:{user}:{resolved_source_id}",
)
except Exception as err:
current_app.logger.error(
f"Error starting reingest for source {source_id}: {err}",
exc_info=True,
)
return make_response(jsonify({"success": False}), 400)
return make_response(jsonify({"success": True, "task_id": task.id}), 200)
@sources_ns.route("/directory_structure")
class DirectoryStructure(Resource):
@api.doc(

View File

@@ -27,8 +27,42 @@ DURABLE_TASK = dict(
)
# operation tag for the poison-path source.ingest.failed event, per task.
_INGEST_POISON_OPERATION = {
"ingest": "upload",
"ingest_remote": "upload",
"ingest_connector_task": "upload",
"reingest_source_task": "reingest",
}
def _emit_ingest_poison_event(task_name, bound):
"""Publish a terminal ``source.ingest.failed`` when the poison-guard trips.
The guard returns before the worker runs, so the worker's own failed
event never fires — without this the upload toast spins on "training".
"""
user = bound.get("user")
source_id = bound.get("source_id")
if not user or not source_id:
return
from application.events.publisher import publish_user_event
publish_user_event(
user,
"source.ingest.failed",
{
"source_id": str(source_id),
"filename": bound.get("filename") or "",
"operation": _INGEST_POISON_OPERATION.get(task_name, "upload"),
"error": "Ingestion stopped after repeated failures.",
},
scope={"kind": "source", "id": str(source_id)},
)
@celery.task(**DURABLE_TASK)
@with_idempotency(task_name="ingest")
@with_idempotency(task_name="ingest", on_poison=_emit_ingest_poison_event)
def ingest(
self,
directory,
@@ -57,7 +91,7 @@ def ingest(
@celery.task(**DURABLE_TASK)
@with_idempotency(task_name="ingest_remote")
@with_idempotency(task_name="ingest_remote", on_poison=_emit_ingest_poison_event)
def ingest_remote(
self, source_data, job_name, user, loader,
idempotency_key=None, source_id=None,
@@ -71,7 +105,9 @@ def ingest_remote(
@celery.task(**DURABLE_TASK)
@with_idempotency(task_name="reingest_source_task")
@with_idempotency(
task_name="reingest_source_task", on_poison=_emit_ingest_poison_event,
)
def reingest_source_task(self, source_id, user, idempotency_key=None):
from application.worker import reingest_source_worker
@@ -128,7 +164,9 @@ def process_agent_webhook(self, agent_id, payload, idempotency_key=None):
@celery.task(**DURABLE_TASK)
@with_idempotency(task_name="ingest_connector_task")
@with_idempotency(
task_name="ingest_connector_task", on_poison=_emit_ingest_poison_event,
)
def ingest_connector_task(
self,
job_name,

View File

@@ -66,6 +66,9 @@ class Settings(BaseSettings):
PARSE_IMAGE_REMOTE: bool = False
DOCLING_OCR_ENABLED: bool = False # Enable OCR for docling parsers (PDF, images)
DOCLING_OCR_ATTACHMENTS_ENABLED: bool = False # Enable OCR for docling when parsing attachments
# Pages docling's threaded pipeline buffers in flight; the library
# default (100) drives worker RSS to ~3 GB on a mid-size PDF.
DOCLING_PIPELINE_QUEUE_MAX_SIZE: int = 2
VECTOR_STORE: str = "faiss" # "faiss" or "elasticsearch" or "qdrant" or "milvus" or "lancedb" or "pgvector"
RETRIEVERS_ENABLED: list = ["classic_rag"]
AGENT_NAME: str = "classic"

View File

@@ -154,6 +154,8 @@ def embed_and_store_documents(
*,
attempt_id: Optional[str] = None,
user_id: Optional[str] = None,
progress_start: int = 0,
progress_end: int = 100,
) -> None:
"""Embeds documents and stores them in a vector store.
@@ -176,6 +178,11 @@ def embed_and_store_documents(
published to ``user:{user_id}`` for the in-app upload toast.
``None`` is the safe default — workers without a user
context (e.g. background syncs) skip the publish.
progress_start: Percent the reported progress maps to at chunk 0.
Lets a caller reserve the lower band for an earlier stage
(e.g. parsing). Defaults to ``0`` (embed owns the whole bar).
progress_end: Percent the reported progress maps to at the final
chunk. Defaults to ``100``.
Returns:
None
@@ -257,6 +264,7 @@ def embed_and_store_documents(
failed_idx: int | None = None
last_published_pct = -1
source_id_str = str(source_id)
progress_span = progress_end - progress_start
for idx in tqdm(
range(loop_start, total_docs),
desc="Embedding 🦖",
@@ -266,8 +274,10 @@ def embed_and_store_documents(
):
doc = docs[idx]
try:
# Update task status for progress tracking
progress = int(((idx + 1) / total_docs) * 100)
# Map the embed loop into [progress_start, progress_end].
progress = progress_start + int(
((idx + 1) / total_docs) * progress_span
)
task_status.update_state(state="PROGRESS", meta={"current": progress})
# SSE push for sub-second upload-toast updates. Throttled to one

View File

@@ -211,13 +211,22 @@ class SimpleDirectoryReader(BaseReader):
return new_input_files
def load_data(self, concatenate: bool = False) -> List[Document]:
def load_data(
self,
concatenate: bool = False,
progress_callback: Optional[Callable[[int, int], None]] = None,
) -> List[Document]:
"""Load data from the input directory.
Args:
concatenate (bool): whether to concatenate all files into one document.
If set to True, file metadata is ignored.
False by default.
progress_callback (Optional[Callable[[int, int], None]]): Called
after each file is parsed with ``(files_done, total_files)``.
Lets callers surface parse/OCR progress before embedding
begins. Exceptions raised by the callback are swallowed so
progress reporting can never fail ingestion.
Returns:
List[Document]: A list of documents.
@@ -226,8 +235,9 @@ class SimpleDirectoryReader(BaseReader):
data_list: List[str] = []
metadata_list = []
self.file_token_counts = {}
for input_file in self.input_files:
total_files = len(self.input_files)
for file_index, input_file in enumerate(self.input_files):
suffix_lower = input_file.suffix.lower()
parser_metadata = {}
if suffix_lower in self.file_extractor:
@@ -277,7 +287,15 @@ class SimpleDirectoryReader(BaseReader):
else:
data_list.append(str(data))
metadata_list.append(base_metadata)
if progress_callback is not None:
try:
progress_callback(file_index + 1, total_files)
except Exception:
logging.warning(
"load_data progress callback failed", exc_info=True
)
# Build directory structure if input_dir is provided
if hasattr(self, 'input_dir'):
self.directory_structure = self.build_directory_structure(self.input_dir)

View File

@@ -16,6 +16,29 @@ from application.parser.file.base_parser import BaseParser
logger = logging.getLogger(__name__)
# Per-stage batch size for docling's threaded pipeline; 1 holds the
# concurrent working set to a single page (see _apply_pipeline_caps).
_PIPELINE_BATCH_SIZE = 1
def _apply_pipeline_caps(pipeline_options) -> None:
"""Cap docling's threaded-pipeline queue depth and batch sizes in place.
hasattr-guarded so docling builds without these knobs are unaffected.
"""
from application.core.settings import settings
caps = {
"queue_max_size": max(1, settings.DOCLING_PIPELINE_QUEUE_MAX_SIZE),
"layout_batch_size": _PIPELINE_BATCH_SIZE,
"table_batch_size": _PIPELINE_BATCH_SIZE,
"ocr_batch_size": _PIPELINE_BATCH_SIZE,
}
for name, value in caps.items():
if hasattr(pipeline_options, name):
setattr(pipeline_options, name, value)
class DoclingParser(BaseParser):
"""Parser using docling for advanced document processing.
@@ -86,6 +109,7 @@ class DoclingParser(BaseParser):
do_ocr=self.ocr_enabled,
do_table_structure=self.table_structure,
)
_apply_pipeline_caps(pipeline_options)
if self.ocr_enabled:
ocr_options = self._get_ocr_options()

View File

@@ -514,6 +514,9 @@ ingest_chunk_progress_table = Table(
# same task resumes from the checkpoint, but a separate invocation
# (manual reingest, scheduled sync) resets to a clean re-index.
Column("attempt_id", Text),
# Added in ``0008_ingest_progress_status``. The reconciler flips
# this to 'stalled'; ``init_progress`` resets it to 'active'.
Column("status", Text, nullable=False, server_default="active"),
)

View File

@@ -41,6 +41,9 @@ class IngestChunkProgressRepository:
rows with NULL ``attempt_id`` resume against another NULL
caller (e.g. test fixtures), but get reset the moment a real
``attempt_id`` arrives.
Both branches also reset ``status`` to ``'active'``, clearing a
prior reconciler ``'stalled'`` escalation.
"""
result = self._conn.execute(
text(
@@ -68,7 +71,8 @@ class IngestChunkProgressRepository:
THEN ingest_chunk_progress.embedded_chunks
ELSE 0
END,
attempt_id = EXCLUDED.attempt_id
attempt_id = EXCLUDED.attempt_id,
status = 'active'
RETURNING *
"""
),
@@ -113,6 +117,23 @@ class IngestChunkProgressRepository:
row = result.fetchone()
return row_to_dict(row) if row is not None else None
def delete(self, source_id: str) -> bool:
"""Delete the progress row for ``source_id``.
A manual reingest supersedes any prior ingest state — including a
reconciler ``'stalled'`` escalation — so dropping the row clears
the derived ``failed`` ingest status the sources list shows.
Returns ``True`` when a row was removed.
"""
result = self._conn.execute(
text(
"DELETE FROM ingest_chunk_progress "
"WHERE source_id = CAST(:source_id AS uuid)"
),
{"source_id": str(source_id)},
)
return result.rowcount > 0
def bump_heartbeat(self, source_id: str) -> None:
"""Refresh ``last_updated`` so the row looks alive to the reconciler."""
self._conn.execute(

View File

@@ -107,7 +107,11 @@ class ReconciliationRepository:
def find_and_lock_stalled_ingests(
self, *, age_minutes: int = 30, limit: int = 100,
) -> list[dict]:
"""Lock ingest checkpoints whose heartbeat hasn't ticked recently."""
"""Lock still-active ingest checkpoints with a silent heartbeat.
The ``status = 'active'`` filter skips rows already escalated to
``'stalled'``, so a dead ingest is alerted once, not every tick.
"""
result = self._conn.execute(
text(
"""
@@ -116,6 +120,7 @@ class ReconciliationRepository:
FROM ingest_chunk_progress
WHERE last_updated < now() - make_interval(mins => :age)
AND embedded_chunks < total_chunks
AND status = 'active'
ORDER BY last_updated ASC
LIMIT :limit
FOR UPDATE SKIP LOCKED
@@ -125,11 +130,15 @@ class ReconciliationRepository:
)
return [row_to_dict(r) for r in result.fetchall()]
def touch_ingest_progress(self, source_id: str) -> bool:
"""Bump ``last_updated`` so a once-stalled ingest re-enters the watch window."""
def mark_ingest_stalled(self, source_id: str) -> bool:
"""Escalate a stalled checkpoint to terminal ``status='stalled'``.
Drops the row out of the sweep so the reconciler alerts once;
``init_progress`` flips it back to ``'active'`` on reingest.
"""
result = self._conn.execute(
text(
"UPDATE ingest_chunk_progress SET last_updated = now() "
"UPDATE ingest_chunk_progress SET status = 'stalled' "
"WHERE source_id = CAST(:sid AS uuid)"
),
{"sid": str(source_id)},

View File

@@ -5,10 +5,10 @@ from __future__ import annotations
import json
from typing import Any, Optional
from sqlalchemy import Connection, func, select, text
from sqlalchemy import case, Connection, func, select, text
from application.storage.db.base_repository import looks_like_uuid, row_to_dict
from application.storage.db.models import sources_table
from application.storage.db.models import ingest_chunk_progress_table, sources_table
_SCALAR_COLUMNS = {
@@ -61,6 +61,21 @@ def _coerce_jsonb(value: Any) -> Any:
return value
def _ingest_status_case():
"""Derive a user-facing ingest status from the joined progress row.
``failed`` — reconciler-escalated stall. ``processing`` — embed in
flight. ``None`` — no progress row, or the embed completed.
"""
icp = ingest_chunk_progress_table
return case(
(icp.c.source_id.is_(None), None),
(icp.c.status == "stalled", "failed"),
(icp.c.embedded_chunks < icp.c.total_chunks, "processing"),
else_=None,
).label("ingest_status")
class SourcesRepository:
def __init__(self, conn: Connection) -> None:
self._conn = conn
@@ -192,13 +207,25 @@ class SourcesRepository:
as ``"desc"``.
Returns:
A list of source rows as plain dicts (via ``row_to_dict``).
A list of source rows as plain dicts (via ``row_to_dict``),
each carrying a derived ``ingest_status`` (``failed`` /
``processing`` / ``None``) from the joined progress row.
"""
column_name = sort_field if sort_field in _SORTABLE_COLUMNS else "date"
sort_column = sources_table.c[column_name]
ascending = sort_order.lower() == "asc"
stmt = select(sources_table).where(sources_table.c.user_id == user_id)
stmt = (
select(sources_table, _ingest_status_case())
.select_from(
sources_table.outerjoin(
ingest_chunk_progress_table,
ingest_chunk_progress_table.c.source_id
== sources_table.c.id,
)
)
.where(sources_table.c.user_id == user_id)
)
if search_term:
stmt = stmt.where(
sources_table.c.name.ilike(

View File

@@ -100,6 +100,40 @@ def _stop_ingest_heartbeat(thread, stop_event):
thread.join(timeout=5)
def _make_parse_progress_callback(task, user, source_id, start_pct, end_pct):
"""Build a ``load_data`` callback mapping parse progress to
``[start_pct, end_pct]`` via ``update_state`` + a throttled
``stage='parsing'`` SSE event.
"""
span = end_pct - start_pct
source_id_str = str(source_id)
state = {"last_pct": -1}
def _callback(files_done, total_files):
if not total_files:
return
pct = start_pct + int((files_done / total_files) * span)
task.update_state(
state="PROGRESS",
meta={"current": pct, "status": "Parsing files"},
)
if user and pct > state["last_pct"]:
publish_user_event(
user,
"source.ingest.progress",
{
"current": pct,
"total": total_files,
"files_done": files_done,
"stage": "parsing",
},
scope={"kind": "source", "id": source_id_str},
)
state["last_pct"] = pct
return _callback
# Define a function to extract metadata from a given filename.
@@ -640,7 +674,12 @@ def ingest_worker(
exclude_hidden=exclude,
file_metadata=metadata_from_filename,
)
raw_docs = reader.load_data()
# Parsing/OCR owns 1-50% of the bar; embedding takes 50-100%.
raw_docs = reader.load_data(
progress_callback=_make_parse_progress_callback(
self, user, source_uuid, start_pct=1, end_pct=50,
)
)
directory_structure = getattr(reader, "directory_structure", {})
logging.info(f"Directory structure from reader: {directory_structure}")
@@ -680,6 +719,7 @@ def ingest_worker(
docs, vector_store_path, source_uuid, self,
attempt_id=getattr(self.request, "id", None),
user_id=user,
progress_start=50, progress_end=100,
)
finally:
_stop_ingest_heartbeat(heartbeat_thread, heartbeat_stop)
@@ -810,6 +850,8 @@ def reingest_source_worker(self, source_id, user):
{
"source_id": source_id,
"name": source_name,
# ``filename`` labels the upload toast on auto-create.
"filename": source_name,
"operation": "reingest",
},
scope={"kind": "source", "id": source_id},
@@ -917,6 +959,7 @@ def reingest_source_worker(self, source_id, user):
{
"source_id": source_id,
"name": source_name,
"filename": source_name,
"operation": "reingest",
"no_changes": True,
"chunks_added": 0,
@@ -1104,6 +1147,7 @@ def reingest_source_worker(self, source_id, user):
completed_payload: dict = {
"source_id": source_id,
"name": source_name,
"filename": source_name,
"operation": "reingest",
"chunks_added": added,
"chunks_deleted": deleted,
@@ -1143,6 +1187,7 @@ def reingest_source_worker(self, source_id, user):
{
"source_id": str(source_id),
"name": source_name,
"filename": source_name,
"operation": "reingest",
"error": str(e)[:1024],
},
@@ -1804,14 +1849,15 @@ def ingest_connector(
exclude_hidden=True,
file_metadata=metadata_from_filename,
)
raw_docs = reader.load_data()
# Parsing/OCR fills 40-60% of the bar; embedding takes 60-100%.
raw_docs = reader.load_data(
progress_callback=_make_parse_progress_callback(
self, user, source_uuid, start_pct=40, end_pct=60,
)
)
directory_structure = getattr(reader, "directory_structure", {})
# Step 4: Process documents (chunking, embedding, etc.)
self.update_state(
state="PROGRESS", meta={"current": 60, "status": "Processing documents"}
)
chunker = Chunker(
chunking_strategy="classic_chunk",
max_tokens=MAX_TOKENS,
@@ -1848,12 +1894,13 @@ def ingest_connector(
os.makedirs(vector_store_path, exist_ok=True)
self.update_state(
state="PROGRESS", meta={"current": 80, "status": "Storing documents"}
state="PROGRESS", meta={"current": 60, "status": "Storing documents"}
)
embed_and_store_documents(
docs, vector_store_path, source_uuid, self,
attempt_id=getattr(self.request, "id", None),
user_id=user,
progress_start=60, progress_end=100,
)
assert_index_complete(source_uuid)

View File

@@ -34,6 +34,7 @@ const endpoints = {
LOGS: `/api/get_user_logs`,
MANAGE_SYNC: '/api/manage_sync',
SYNC_SOURCE: '/api/sync_source',
REINGEST_SOURCE: '/api/sources/reingest',
GET_AVAILABLE_TOOLS: '/api/available_tools',
GET_USER_TOOLS: '/api/get_tools',
CREATE_TOOL: '/api/create_tool',

View File

@@ -73,6 +73,8 @@ const userService = {
apiClient.post(endpoints.USER.MANAGE_SYNC, data, token),
syncSource: (data: any, token: string | null): Promise<any> =>
apiClient.post(endpoints.USER.SYNC_SOURCE, data, token),
reingestSource: (data: any, token: string | null): Promise<any> =>
apiClient.post(endpoints.USER.REINGEST_SOURCE, data, token),
getAvailableTools: (token: string | null): Promise<any> =>
apiClient.get(endpoints.USER.GET_AVAILABLE_TOOLS, token),
getUserTools: (token: string | null): Promise<any> =>

View File

@@ -165,12 +165,19 @@ function UploadRow({
return (
<li className="border-border/50 border-b last:border-b-0">
<div className="flex items-center justify-between px-5 py-3">
<p
className="font-inter dark:text-muted-foreground max-w-[200px] truncate text-[13px] leading-[16.5px] font-normal text-black"
title={task.fileName}
>
{task.fileName}
</p>
<div className="flex min-w-0 flex-col">
<p
className="font-inter dark:text-muted-foreground max-w-[200px] truncate text-[13px] leading-[16.5px] font-normal text-black"
title={task.fileName}
>
{task.fileName}
</p>
{task.status === 'training' && task.stage && (
<span className="font-inter text-muted-foreground mt-0.5 text-[11px] leading-[14px]">
{t(`modals.uploadDoc.progress.${task.stage}`)}
</span>
)}
</div>
<div className="flex items-center gap-2">
{showProgress && (

View File

@@ -1,10 +1,4 @@
import {
useCallback,
useEffect,
useRef,
useState,
RefObject,
} from 'react';
import { useCallback, useEffect, useRef, useState, RefObject } from 'react';
export function useOutsideAlerter<T extends HTMLElement>(
ref: RefObject<T | null>,

View File

@@ -70,6 +70,9 @@
"sync": "Synchronisieren",
"syncNow": "Jetzt synchronisieren",
"syncing": "Synchronisiere...",
"reingest": "Erneut indexieren",
"ingestFailed": "Indexierung fehlgeschlagen",
"ingestProcessing": "Indexierung...",
"syncConfirmation": "Bist du sicher, dass du \"{{sourceName}}\" synchronisieren möchtest? Dies aktualisiert den Inhalt mit deinem Cloud-Speicher und kann Änderungen an einzelnen Chunks überschreiben.",
"syncFrequency": {
"never": "Nie",
@@ -353,6 +356,8 @@
"failed": "Upload fehlgeschlagen",
"wait": "Dies kann einige Minuten dauern",
"preparing": "Upload wird vorbereitet",
"parsing": "Dateien werden verarbeitet...",
"embedding": "Einbettung...",
"tokenLimit": "Token-Limit überschritten, bitte lade ein kleineres Dokument hoch",
"expandDetails": "Upload-Details erweitern",
"collapseDetails": "Upload-Details einklappen",

View File

@@ -70,6 +70,9 @@
"sync": "Sync",
"syncNow": "Sync now",
"syncing": "Syncing...",
"reingest": "Reingest",
"ingestFailed": "Indexing failed",
"ingestProcessing": "Indexing…",
"syncConfirmation": "Are you sure you want to sync \"{{sourceName}}\"? This will update the content with your cloud storage and may override any edits you made to individual chunks.",
"syncFrequency": {
"never": "Never",
@@ -365,6 +368,8 @@
"failed": "Upload failed",
"wait": "This may take several minutes",
"preparing": "Preparing upload",
"parsing": "Parsing files…",
"embedding": "Embedding…",
"tokenLimit": "Over the token limit, please consider uploading smaller document",
"expandDetails": "Expand upload details",
"collapseDetails": "Collapse upload details",

View File

@@ -70,6 +70,9 @@
"sync": "Sincronizar",
"syncNow": "Sincronizar ahora",
"syncing": "Sincronizando...",
"reingest": "Reindexar",
"ingestFailed": "Error de indexación",
"ingestProcessing": "Indexando...",
"syncConfirmation": "¿Estás seguro de que deseas sincronizar \"{{sourceName}}\"? Esto actualizará el contenido con tu almacenamiento en la nube y puede anular cualquier edición que hayas realizado en fragmentos individuales.",
"syncFrequency": {
"never": "Nunca",
@@ -353,6 +356,8 @@
"failed": "Error al subir",
"wait": "Esto puede tardar varios minutos",
"preparing": "Preparando subida",
"parsing": "Analizando archivos...",
"embedding": "Generando incrustaciones...",
"tokenLimit": "Excede el límite de tokens, considere cargar un documento más pequeño",
"expandDetails": "Expandir detalles de subida",
"collapseDetails": "Contraer detalles de subida",

View File

@@ -70,6 +70,9 @@
"sync": "同期",
"syncNow": "今すぐ同期",
"syncing": "同期中...",
"reingest": "再インデックス",
"ingestFailed": "インデックス作成に失敗しました",
"ingestProcessing": "インデックス作成中...",
"syncConfirmation": "\"{{sourceName}}\"を同期してもよろしいですか?これにより、コンテンツがクラウドストレージで更新され、個々のチャンクに加えた編集が上書きされる可能性があります。",
"syncFrequency": {
"never": "なし",
@@ -353,6 +356,8 @@
"failed": "アップロード失敗",
"wait": "数分かかる場合があります",
"preparing": "アップロードを準備中",
"parsing": "ファイルを解析中...",
"embedding": "埋め込み処理中...",
"tokenLimit": "トークン制限を超えています。より小さいドキュメントをアップロードしてください",
"expandDetails": "アップロードの詳細を展開",
"collapseDetails": "アップロードの詳細を折りたたむ",

View File

@@ -70,6 +70,9 @@
"sync": "Синхронизация",
"syncNow": "Синхронизировать сейчас",
"syncing": "Синхронизация...",
"reingest": "Переиндексировать",
"ingestFailed": "Ошибка индексации",
"ingestProcessing": "Индексация...",
"syncConfirmation": "Вы уверены, что хотите синхронизировать \"{{sourceName}}\"? Это обновит содержимое с вашим облачным хранилищем и может перезаписать любые изменения, внесенные вами в отдельные фрагменты.",
"syncFrequency": {
"never": "Никогда",
@@ -353,6 +356,8 @@
"failed": "Ошибка загрузки",
"wait": "Это может занять несколько минут",
"preparing": "Подготовка загрузки",
"parsing": "Обработка файлов...",
"embedding": "Создание эмбеддингов...",
"tokenLimit": "Превышен лимит токенов, рассмотрите возможность загрузки документа меньшего размера",
"expandDetails": "Развернуть детали загрузки",
"collapseDetails": "Свернуть детали загрузки",

View File

@@ -70,6 +70,9 @@
"sync": "同步",
"syncNow": "立即同步",
"syncing": "同步中...",
"reingest": "重新索引",
"ingestFailed": "索引失敗",
"ingestProcessing": "索引中...",
"syncConfirmation": "您確定要同步 \"{{sourceName}}\" 嗎?這將使用您的雲端儲存更新內容,並可能覆蓋您對個別文本塊所做的任何編輯。",
"syncFrequency": {
"never": "從不",
@@ -353,6 +356,8 @@
"failed": "上傳失敗",
"wait": "這可能需要幾分鐘",
"preparing": "準備上傳",
"parsing": "正在解析檔案...",
"embedding": "正在生成嵌入...",
"tokenLimit": "超出令牌限制,請考慮上傳較小的文檔",
"expandDetails": "展開上傳詳情",
"collapseDetails": "摺疊上傳詳情",

View File

@@ -70,6 +70,9 @@
"sync": "同步",
"syncNow": "立即同步",
"syncing": "同步中...",
"reingest": "重新索引",
"ingestFailed": "索引失败",
"ingestProcessing": "索引中...",
"syncConfirmation": "您确定要同步 \"{{sourceName}}\" 吗?这将使用您的云存储更新内容,并可能覆盖您对单个文本块所做的任何编辑。",
"syncFrequency": {
"never": "从不",
@@ -353,6 +356,8 @@
"failed": "上传失败",
"wait": "这可能需要几分钟",
"preparing": "准备上传",
"parsing": "正在解析文件...",
"embedding": "正在生成嵌入...",
"tokenLimit": "超出令牌限制,请考虑上传较小的文档",
"expandDetails": "展开上传详情",
"collapseDetails": "折叠上传详情",

View File

@@ -14,6 +14,8 @@ export type Doc = {
syncFrequency?: string;
isNested?: boolean;
provider?: string;
// Derived server-side from ingest_chunk_progress (sources API).
ingestStatus?: 'processing' | 'failed';
};
export type GetDocsResponse = {

View File

@@ -27,6 +27,12 @@ import {
setSourceDocs,
} from '../preferences/preferenceSlice';
import Upload from '../upload/Upload';
import {
addUploadTask,
removeUploadTask,
selectUploadTasks,
updateUploadTask,
} from '../upload/uploadSlice';
import { formatDate } from '../utils/dateTimeUtils';
import FileTree from '../components/FileTree';
import ConnectorTree from '../components/ConnectorTree';
@@ -56,6 +62,7 @@ export default function Sources({
const [isDarkTheme] = useDarkTheme();
const dispatch = useDispatch();
const token = useSelector(selectToken);
const uploadTasks = useSelector(selectUploadTasks);
const [searchTerm, setSearchTerm] = useState<string>('');
const debouncedSearchTerm = useDebouncedValue(searchTerm, 500);
@@ -249,6 +256,57 @@ export default function Sources({
}
};
const handleReingest = async (doc: Doc) => {
if (!doc.id) {
return;
}
const sourceId = doc.id;
// Drop stale toast rows for this source (a finished/dismissed task
// would swallow the reingest's SSE events), then open a fresh one.
uploadTasks
.filter((task) => task.sourceId === sourceId)
.forEach((task) => dispatch(removeUploadTask(task.id)));
const reingestTaskId = `reingest-${sourceId}-${Date.now()}`;
dispatch(
addUploadTask({
id: reingestTaskId,
fileName: doc.name || sourceId,
progress: 0,
status: 'training',
sourceId,
}),
);
try {
const response = await userService.reingestSource(
{ source_id: sourceId },
token,
);
const data = await response.json();
if (!data.success) {
console.error('Reingest failed:', data.error || data.message);
dispatch(
updateUploadTask({
id: reingestTaskId,
updates: {
status: 'failed',
errorMessage: data.error || data.message,
},
}),
);
return;
}
refreshDocs(undefined, currentPage, rowsPerPage);
} catch (error) {
console.error('Error reingesting source:', error);
dispatch(
updateUploadTask({
id: reingestTaskId,
updates: { status: 'failed' },
}),
);
}
};
const [documentToDelete, setDocumentToDelete] = useState<{
index: number;
document: Doc;
@@ -283,6 +341,19 @@ export default function Sources({
},
];
if (document.ingestStatus === 'failed') {
actions.push({
icon: SyncIcon,
label: t('settings.sources.reingest'),
onClick: () => {
handleReingest(document);
},
iconWidth: 14,
iconHeight: 14,
variant: 'primary',
});
}
if (document.syncFrequency) {
actions.push({
icon: SyncIcon,
@@ -483,6 +554,16 @@ export default function Sources({
</div>
<div className="flex flex-col items-start justify-start gap-1">
{document.ingestStatus === 'failed' && (
<span className="rounded-full bg-red-100 px-2 py-0.5 text-[11px] leading-[16px] font-medium text-red-700 dark:bg-red-900/30 dark:text-red-400">
{t('settings.sources.ingestFailed')}
</span>
)}
{document.ingestStatus === 'processing' && (
<span className="bg-muted-foreground/10 text-muted-foreground rounded-full px-2 py-0.5 text-[11px] leading-[16px] font-medium">
{t('settings.sources.ingestProcessing')}
</span>
)}
<div className="flex items-center gap-2">
<img
src={CalendarIcon}

View File

@@ -286,6 +286,26 @@ describe('source.ingest.progress', () => {
state = reducer(state, ingest('source.ingest.progress', { current: -10 }));
expect(state.tasks[0].progress).toBe(100);
});
it('records the ingest stage from the payload', () => {
let state = stateWithTask(makeTask({ status: 'training' }));
state = reducer(
state,
ingest('source.ingest.progress', { current: 20, stage: 'parsing' }),
);
expect(state.tasks[0].stage).toBe('parsing');
state = reducer(
state,
ingest('source.ingest.progress', { current: 70, stage: 'embedding' }),
);
expect(state.tasks[0].stage).toBe('embedding');
// An unknown/absent stage leaves the last known value intact.
state = reducer(
state,
ingest('source.ingest.progress', { current: 80, stage: 'bogus' }),
);
expect(state.tasks[0].stage).toBe('embedding');
});
});
describe('source.ingest.completed', () => {

View File

@@ -66,6 +66,12 @@ export interface UploadTask {
sourceId?: string;
errorMessage?: string;
dismissed?: boolean;
/**
* Ingest phase from the latest ``source.ingest.progress`` event:
* ``parsing`` (parse/OCR, lower band of the bar) or ``embedding``
* (upper band). Drives the phase label in ``UploadToast``.
*/
stage?: 'parsing' | 'embedding';
/**
* Flipped when ``source.ingest.completed`` carries
* ``payload.limited === true`` (the worker hit a token cap during
@@ -334,6 +340,9 @@ export const uploadSlice = createSlice({
if (task.status === 'completed' || task.status === 'failed') break;
task.status = 'training';
if (clamped > task.progress) task.progress = clamped;
if (payload.stage === 'parsing' || payload.stage === 'embedding') {
task.stage = payload.stage;
}
break;
}
case 'source.ingest.completed':

View File

@@ -6,6 +6,7 @@ from unittest.mock import MagicMock, patch
import pytest
from flask import Flask
from sqlalchemy import text
@pytest.fixture
@@ -256,9 +257,39 @@ class TestPaginatedSources:
for key in (
"id", "name", "date", "model", "location", "tokens",
"retriever", "syncFrequency", "provider", "isNested", "type",
"ingestStatus",
):
assert key in row
def test_exposes_stalled_ingest_status(self, app, pg_conn):
"""A source whose ingest the reconciler escalated to 'stalled'
surfaces ingestStatus='failed' so the UI can badge it.
"""
from application.api.user.sources.routes import PaginatedSources
user = "u-ingest-status"
src = _seed_source(pg_conn, user, name="stalled-doc", type="file")
pg_conn.execute(
text(
"""
INSERT INTO ingest_chunk_progress (
source_id, total_chunks, embedded_chunks, last_index,
status
)
VALUES (CAST(:sid AS uuid), 907, 9, 8, 'stalled')
"""
),
{"sid": str(src["id"])},
)
with _patch_db(pg_conn), app.test_request_context(
"/api/sources/paginated?page=1&rows=10"
):
from flask import request
request.decoded_token = {"sub": user}
response = PaginatedSources().get()
row = response.json["paginated"][0]
assert row["ingestStatus"] == "failed"
class TestDeleteOldIndexes:
def test_returns_401_unauthenticated(self, app):
@@ -605,6 +636,135 @@ class TestSyncSource:
assert response.status_code == 400
class TestReingestSource:
def test_returns_401_unauthenticated(self, app):
from application.api.user.sources.routes import ReingestSource
with app.test_request_context(
"/api/sources/reingest", method="POST", json={"source_id": "x"}
):
from flask import request
request.decoded_token = None
response = ReingestSource().post()
assert response.status_code == 401
def test_returns_400_missing_id(self, app):
from application.api.user.sources.routes import ReingestSource
with app.test_request_context(
"/api/sources/reingest", method="POST", json={}
):
from flask import request
request.decoded_token = {"sub": "u"}
response = ReingestSource().post()
assert response.status_code == 400
def test_returns_404_missing_source(self, app, pg_conn):
from application.api.user.sources.routes import ReingestSource
with _patch_db(pg_conn), app.test_request_context(
"/api/sources/reingest",
method="POST",
json={"source_id": "00000000-0000-0000-0000-000000000000"},
):
from flask import request
request.decoded_token = {"sub": "u"}
response = ReingestSource().post()
assert response.status_code == 404
def test_triggers_reingest_task(self, app, pg_conn):
from application.api.user.sources.routes import ReingestSource
user = "u-reingest"
src = _seed_source(pg_conn, user, name="stalled-src", type="file")
fake_task = MagicMock(id="reingest-task-1")
with _patch_db(pg_conn), patch(
"application.api.user.sources.routes.reingest_source_task.delay",
return_value=fake_task,
) as mock_delay, app.test_request_context(
"/api/sources/reingest",
method="POST",
json={"source_id": str(src["id"])},
):
from flask import request
request.decoded_token = {"sub": user}
response = ReingestSource().post()
assert response.status_code == 200
assert response.json["task_id"] == "reingest-task-1"
assert mock_delay.call_args.kwargs["source_id"] == str(src["id"])
assert mock_delay.call_args.kwargs["user"] == user
# Scoped idempotency key engages the task's lease so repeated
# clicks collapse onto one reingest instead of racing.
assert mock_delay.call_args.kwargs["idempotency_key"] == (
f"reingest-source:{user}:{src['id']}"
)
def test_clears_stalled_ingest_progress_row(self, app, pg_conn):
"""Reingest drops the stale chunk-progress row so the sources
list stops deriving a 'failed' ingest status for the source.
"""
from application.api.user.sources.routes import ReingestSource
user = "u-reingest-clear"
src = _seed_source(pg_conn, user, name="stalled-doc", type="file")
pg_conn.execute(
text(
"""
INSERT INTO ingest_chunk_progress (
source_id, total_chunks, embedded_chunks, last_index,
status
)
VALUES (CAST(:sid AS uuid), 100, 9, 8, 'stalled')
"""
),
{"sid": str(src["id"])},
)
fake_task = MagicMock(id="reingest-task-2")
with _patch_db(pg_conn), patch(
"application.api.user.sources.routes.reingest_source_task.delay",
return_value=fake_task,
), app.test_request_context(
"/api/sources/reingest",
method="POST",
json={"source_id": str(src["id"])},
):
from flask import request
request.decoded_token = {"sub": user}
response = ReingestSource().post()
assert response.status_code == 200
remaining = pg_conn.execute(
text(
"SELECT count(*) FROM ingest_chunk_progress "
"WHERE source_id = CAST(:sid AS uuid)"
),
{"sid": str(src["id"])},
).scalar()
assert remaining == 0
def test_reingest_task_raises_returns_400(self, app, pg_conn):
from application.api.user.sources.routes import ReingestSource
user = "u-reingest-fail"
src = _seed_source(pg_conn, user, name="fail-src", type="file")
with _patch_db(pg_conn), patch(
"application.api.user.sources.routes.reingest_source_task.delay",
side_effect=RuntimeError("boom"),
), app.test_request_context(
"/api/sources/reingest",
method="POST",
json={"source_id": str(src["id"])},
):
from flask import request
request.decoded_token = {"sub": user}
response = ReingestSource().post()
assert response.status_code == 400
class TestDirectoryStructure:
def test_returns_401_unauthenticated(self, app):
from application.api.user.sources.routes import DirectoryStructure

View File

@@ -417,3 +417,181 @@ class TestSuccessfulRunClearsLease:
assert row[0] == "completed"
assert row[1] is None
assert row[2] is None
@pytest.mark.unit
class TestSynthesizedKeyGuardsKeylessDispatch:
"""A keyless dispatch carrying ``source_id`` is still poison-guarded:
the wrapper synthesizes a deterministic key from ``source_id``.
"""
def test_keyless_with_source_id_records_dedup_row(self, pg_conn):
from application.api.user.idempotency import with_idempotency
@with_idempotency(task_name="ingest")
def task(self, idempotency_key=None, source_id=None):
return {"ran": True}
with _patch_decorator_db(pg_conn):
result = task(_fake_celery_self(), source_id="src-abc")
assert result == {"ran": True}
row = _row_for(pg_conn, "auto:ingest:src-abc")
assert row is not None
assert row[0] == "ingest"
assert row[2] == "completed"
def test_synthesized_key_stable_across_redeliveries(self, pg_conn):
"""Same ``source_id`` → same key → a redelivery short-circuits to
the cached result instead of re-running the body.
"""
from application.api.user.idempotency import with_idempotency
runs = {"count": 0}
@with_idempotency(task_name="ingest")
def task(self, idempotency_key=None, source_id=None):
runs["count"] += 1
return {"n": runs["count"]}
with _patch_decorator_db(pg_conn):
first = task(_fake_celery_self(), source_id="src-1")
second = task(_fake_celery_self(), source_id="src-1")
assert first == second == {"n": 1}
assert runs["count"] == 1
def test_poison_guard_trips_for_keyless_dispatch(self, pg_conn):
"""The core fix: a keyless OOM-looping dispatch is bounded — the
guard trips after MAX_TASK_ATTEMPTS with no explicit key.
"""
from application.api.user.idempotency import (
MAX_TASK_ATTEMPTS, with_idempotency,
)
runs = {"count": 0}
@with_idempotency(task_name="ingest")
def task(self, idempotency_key=None, source_id=None):
runs["count"] += 1
raise RuntimeError("OOM-style failure")
with _patch_decorator_db(pg_conn):
for _ in range(MAX_TASK_ATTEMPTS):
with pytest.raises(RuntimeError):
task(_fake_celery_self(), source_id="src-poison")
result = task(_fake_celery_self(), source_id="src-poison")
assert runs["count"] == MAX_TASK_ATTEMPTS
assert result["success"] is False
assert "poison-loop" in result["error"]
assert _row_for(pg_conn, "auto:ingest:src-poison")[2] == "failed"
def test_no_source_id_no_key_runs_unguarded(self, pg_conn):
"""No explicit key and no ``source_id`` anchor → pass through with
no DB writes, exactly as before.
"""
from application.api.user.idempotency import with_idempotency
@with_idempotency(task_name="store_attachment")
def task(self, idempotency_key=None):
return {"ran": True}
with patch(
"application.api.user.idempotency.db_session"
) as mock_session, patch(
"application.api.user.idempotency.db_readonly"
) as mock_readonly:
result = task(_fake_celery_self())
assert result == {"ran": True}
assert mock_session.call_count == 0
assert mock_readonly.call_count == 0
def test_explicit_key_takes_precedence_over_source_id(self, pg_conn):
"""An explicit key wins; the synthesized ``auto:`` key is unused."""
from application.api.user.idempotency import with_idempotency
@with_idempotency(task_name="ingest")
def task(self, idempotency_key=None, source_id=None):
return {"ran": True}
with _patch_decorator_db(pg_conn):
task(
_fake_celery_self(),
idempotency_key="explicit-k",
source_id="src-x",
)
assert _row_for(pg_conn, "explicit-k") is not None
assert _row_for(pg_conn, "auto:ingest:src-x") is None
@pytest.mark.unit
class TestPoisonHook:
"""``on_poison`` fires on the poison-guard branch with the task's
bound arguments, and never on the success path.
"""
def test_hook_invoked_with_bound_args_on_poison(self, pg_conn):
from application.api.user.idempotency import (
MAX_TASK_ATTEMPTS, with_idempotency,
)
captured = []
def _hook(task_name, bound):
captured.append((task_name, bound))
@with_idempotency(task_name="ingest", on_poison=_hook)
def task(self, idempotency_key=None, source_id=None):
raise RuntimeError("never converges")
with _patch_decorator_db(pg_conn):
for _ in range(MAX_TASK_ATTEMPTS):
with pytest.raises(RuntimeError):
task(_fake_celery_self(), source_id="src-h")
task(_fake_celery_self(), source_id="src-h")
assert len(captured) == 1
task_name, bound = captured[0]
assert task_name == "ingest"
assert bound["source_id"] == "src-h"
def test_hook_not_invoked_on_success(self, pg_conn):
from application.api.user.idempotency import with_idempotency
calls = []
@with_idempotency(
task_name="ingest", on_poison=lambda *a: calls.append(a)
)
def task(self, idempotency_key=None, source_id=None):
return {"ok": True}
with _patch_decorator_db(pg_conn):
task(_fake_celery_self(), source_id="src-ok")
assert calls == []
def test_hook_failure_does_not_break_poison_return(self, pg_conn):
"""A throwing hook must not change the poison-guard outcome."""
from application.api.user.idempotency import (
MAX_TASK_ATTEMPTS, with_idempotency,
)
def _bad_hook(task_name, bound):
raise ValueError("hook blew up")
@with_idempotency(task_name="ingest", on_poison=_bad_hook)
def task(self, idempotency_key=None, source_id=None):
raise RuntimeError("never converges")
with _patch_decorator_db(pg_conn):
for _ in range(MAX_TASK_ATTEMPTS):
with pytest.raises(RuntimeError):
task(_fake_celery_self(), source_id="src-bad")
result = task(_fake_celery_self(), source_id="src-bad")
assert result["success"] is False
assert "poison-loop" in result["error"]

View File

@@ -529,6 +529,142 @@ class TestStuckExecutedToolCalls:
assert row[0] == "executed"
# ---------------------------------------------------------------------------
# Q4 — stalled ingest checkpoints (escalate to terminal 'stalled' + alert)
# ---------------------------------------------------------------------------
def _seed_ingest_progress(
conn,
*,
source_id: str,
embedded: int,
total: int,
age_minutes: int = 31,
status: str = "active",
) -> str:
"""Insert an ingest_chunk_progress row with a backdated last_updated."""
conn.execute(
text(
"""
INSERT INTO ingest_chunk_progress (
source_id, total_chunks, embedded_chunks, last_index,
last_updated, status
)
VALUES (
CAST(:sid AS uuid), :total, :embedded, :embedded - 1,
clock_timestamp() - make_interval(mins => :age),
:status
)
"""
),
{
"sid": source_id,
"total": total,
"embedded": embedded,
"age": age_minutes,
"status": status,
},
)
return source_id
def _ingest_status(conn, source_id: str) -> str | None:
"""Return the ``status`` of an ingest_chunk_progress row, or None."""
row = conn.execute(
text(
"SELECT status FROM ingest_chunk_progress "
"WHERE source_id = CAST(:sid AS uuid)"
),
{"sid": source_id},
).fetchone()
return row[0] if row is not None else None
class TestStalledIngests:
@pytest.mark.unit
def test_stalled_ingest_escalated_with_alert(self, pg_conn, caplog):
from application.api.user.reconciliation import run_reconciliation
sid = "1a000000-0000-0000-0000-0000000000a1"
_seed_ingest_progress(pg_conn, source_id=sid, embedded=9, total=907)
before = _stack_logs_count(pg_conn, "reconciler_ingest_stalled")
with _route_engine_to(pg_conn), caplog.at_level(
logging.ERROR, logger="application.api.user.reconciliation",
):
r = run_reconciliation()
assert r["ingests_stalled"] == 1
# Escalated to a terminal status so the next tick skips it.
assert _ingest_status(pg_conn, sid) == "stalled"
# Structured alert + stack_logs row both surface the failure.
assert any(
getattr(rec, "alert", None) == "reconciler_ingest_stalled"
and rec.levelname == "ERROR"
for rec in caplog.records
)
assert (
_stack_logs_count(pg_conn, "reconciler_ingest_stalled")
== before + 1
)
@pytest.mark.unit
def test_stalled_ingest_alerts_once_not_every_tick(self, pg_conn):
"""The escalate-to-'stalled' write ends the re-alert loop: a
second tick neither re-counts nor re-logs the same dead ingest.
"""
from application.api.user.reconciliation import run_reconciliation
sid = "1a000000-0000-0000-0000-0000000000a2"
_seed_ingest_progress(pg_conn, source_id=sid, embedded=1, total=95)
before = _stack_logs_count(pg_conn, "reconciler_ingest_stalled")
with _route_engine_to(pg_conn):
r1 = run_reconciliation()
r2 = run_reconciliation()
assert r1["ingests_stalled"] == 1
assert r2["ingests_stalled"] == 0
# Only the first tick wrote an alert row.
assert (
_stack_logs_count(pg_conn, "reconciler_ingest_stalled")
== before + 1
)
@pytest.mark.unit
def test_fresh_ingest_left_alone(self, pg_conn):
from application.api.user.reconciliation import run_reconciliation
sid = "1a000000-0000-0000-0000-0000000000a3"
# 2 minutes old — well under the 30-minute staleness threshold.
_seed_ingest_progress(
pg_conn, source_id=sid, embedded=3, total=20, age_minutes=2,
)
with _route_engine_to(pg_conn):
r = run_reconciliation()
assert r["ingests_stalled"] == 0
assert _ingest_status(pg_conn, sid) == "active"
@pytest.mark.unit
def test_completed_ingest_left_alone(self, pg_conn):
"""A stale checkpoint that finished embedding (embedded == total)
is not a stall and must not be flagged.
"""
from application.api.user.reconciliation import run_reconciliation
sid = "1a000000-0000-0000-0000-0000000000a4"
_seed_ingest_progress(pg_conn, source_id=sid, embedded=50, total=50)
with _route_engine_to(pg_conn):
r = run_reconciliation()
assert r["ingests_stalled"] == 0
assert _ingest_status(pg_conn, sid) == "active"
# ---------------------------------------------------------------------------
# Q5 — stuck idempotency pending rows (lease expired + attempts exhausted)
# ---------------------------------------------------------------------------

View File

@@ -546,3 +546,63 @@ class TestIngestIdempotency:
assert first == second
assert first == {"status": "ok", "directory": "dir"}
assert len(worker_calls) == 1
class TestIngestPoisonEvent:
"""The poison hook publishes a terminal source.ingest.failed so the
upload toast resolves instead of hanging on "training".
"""
@pytest.mark.unit
def test_publishes_failed_event(self):
from application.api.user.tasks import _emit_ingest_poison_event
published = []
def _fake_publish(user, event_type, payload, *, scope=None):
published.append((user, event_type, payload, scope))
with patch(
"application.events.publisher.publish_user_event",
side_effect=_fake_publish,
):
_emit_ingest_poison_event(
"ingest",
{"user": "u1", "source_id": "src-9", "filename": "doc.pdf"},
)
assert len(published) == 1
user, event_type, payload, scope = published[0]
assert user == "u1"
assert event_type == "source.ingest.failed"
assert payload["source_id"] == "src-9"
assert payload["filename"] == "doc.pdf"
assert payload["operation"] == "upload"
assert scope == {"kind": "source", "id": "src-9"}
@pytest.mark.unit
def test_skips_when_source_id_missing(self):
from application.api.user.tasks import _emit_ingest_poison_event
with patch(
"application.events.publisher.publish_user_event",
) as mock_publish:
_emit_ingest_poison_event("ingest", {"user": "u1"})
mock_publish.assert_not_called()
@pytest.mark.unit
def test_reingest_uses_reingest_operation(self):
from application.api.user.tasks import _emit_ingest_poison_event
published = []
with patch(
"application.events.publisher.publish_user_event",
side_effect=lambda *a, **k: published.append((a, k)),
):
_emit_ingest_poison_event(
"reingest_source_task",
{"user": "u1", "source_id": "src-r"},
)
assert published[0][0][2]["operation"] == "reingest"

View File

@@ -158,6 +158,35 @@ class TestSimpleDirectoryReaderLoadData:
for doc in docs:
assert isinstance(doc, Document)
def test_load_data_progress_callback_fires_per_file(self, temp_dir):
from application.parser.file.bulk import SimpleDirectoryReader
reader = SimpleDirectoryReader(
input_dir=str(temp_dir), recursive=False, exclude_hidden=True,
)
calls = []
reader.load_data(progress_callback=lambda done, total: calls.append((done, total)))
total_files = len(reader.input_files)
assert total_files >= 1
# One callback per file, monotonically increasing, ending at total.
assert [c[0] for c in calls] == list(range(1, total_files + 1))
assert all(c[1] == total_files for c in calls)
def test_load_data_progress_callback_errors_swallowed(self, temp_dir):
from application.parser.file.bulk import SimpleDirectoryReader
reader = SimpleDirectoryReader(
input_dir=str(temp_dir), recursive=False, exclude_hidden=True,
)
def _boom(done, total):
raise RuntimeError("callback blew up")
# A failing callback must not abort ingestion.
docs = reader.load_data(progress_callback=_boom)
assert len(docs) >= 1
def test_load_data_concatenate(self, temp_dir):
from application.parser.file.bulk import SimpleDirectoryReader

View File

@@ -421,3 +421,85 @@ class TestDoclingParserGaps:
parser = DoclingCSVParser()
assert parser.export_format == "markdown"
assert parser.ocr_enabled is True
# =====================================================================
# Pipeline memory caps
# =====================================================================
@pytest.mark.unit
class TestApplyPipelineCaps:
"""_apply_pipeline_caps bounds docling's threaded-pipeline buffering."""
def test_caps_threaded_pipeline_knobs(self, monkeypatch):
from application.core.settings import settings
from application.parser.file.docling_parser import _apply_pipeline_caps
monkeypatch.setattr(
settings, "DOCLING_PIPELINE_QUEUE_MAX_SIZE", 2, raising=False
)
class Opts:
# docling >= 2.94 threaded pipeline — all knobs present.
queue_max_size = 100
layout_batch_size = 4
table_batch_size = 4
ocr_batch_size = 4
opts = Opts()
_apply_pipeline_caps(opts)
assert opts.queue_max_size == 2
assert opts.layout_batch_size == 1
assert opts.table_batch_size == 1
assert opts.ocr_batch_size == 1
def test_queue_size_is_settings_driven(self, monkeypatch):
from application.core.settings import settings
from application.parser.file.docling_parser import _apply_pipeline_caps
monkeypatch.setattr(
settings, "DOCLING_PIPELINE_QUEUE_MAX_SIZE", 6, raising=False
)
class Opts:
queue_max_size = 100
opts = Opts()
_apply_pipeline_caps(opts)
assert opts.queue_max_size == 6
def test_misconfigured_zero_floors_to_one(self, monkeypatch):
"""A 0 queue depth could deadlock the threaded pipeline — floor it."""
from application.core.settings import settings
from application.parser.file.docling_parser import _apply_pipeline_caps
monkeypatch.setattr(
settings, "DOCLING_PIPELINE_QUEUE_MAX_SIZE", 0, raising=False
)
class Opts:
queue_max_size = 100
opts = Opts()
_apply_pipeline_caps(opts)
assert opts.queue_max_size == 1
def test_noop_on_docling_without_threaded_pipeline(self):
"""Builds predating the threaded pipeline lack the knobs — the cap
must be a silent no-op, not an AttributeError."""
from application.parser.file.docling_parser import _apply_pipeline_caps
class LegacyOpts:
__slots__ = ("do_ocr", "do_table_structure")
def __init__(self):
self.do_ocr = False
self.do_table_structure = True
opts = LegacyOpts()
_apply_pipeline_caps(opts) # must not raise
assert not hasattr(opts, "queue_max_size")
assert not hasattr(opts, "layout_batch_size")

View File

@@ -94,6 +94,35 @@ def test_embed_and_store_documents_non_faiss(tmp_path, mock_settings, mock_vecto
assert folder_name.exists()
def test_embed_and_store_documents_progress_band(
tmp_path, mock_settings, mock_vector_creator
):
"""progress_start/progress_end remap the embed loop into a sub-band
so an earlier stage (parsing) can own the lower part of the bar.
"""
mock_settings.VECTOR_STORE = "chromadb"
docs = [MagicMock(page_content=f"d{i}", metadata={}) for i in range(4)]
task_status = MagicMock()
mock_vector_creator.create_vectorstore.return_value = MagicMock()
embed_and_store_documents(
docs, str(tmp_path / "store"), "sid", task_status,
progress_start=50, progress_end=100,
)
currents = [
call.kwargs["meta"]["current"]
for call in task_status.update_state.call_args_list
if "meta" in call.kwargs and "current" in call.kwargs["meta"]
]
assert currents, "expected progress updates"
# Embedding stays in the upper band and tops out at 100.
assert min(currents) > 50
assert max(currents) == 100
assert currents == sorted(currents)
@patch("application.parser.embedding_pipeline.add_text_to_store_with_retry")
def test_embed_and_store_documents_partial_failure_raises(
mock_add_retry, tmp_path, mock_settings, mock_vector_creator, caplog

View File

@@ -0,0 +1,74 @@
"""Tests for IngestChunkProgressRepository against ephemeral Postgres."""
from __future__ import annotations
from sqlalchemy import text
from application.storage.db.repositories.ingest_chunk_progress import (
IngestChunkProgressRepository,
)
def _status(conn, source_id: str) -> str:
return conn.execute(
text(
"SELECT status FROM ingest_chunk_progress "
"WHERE source_id = CAST(:sid AS uuid)"
),
{"sid": source_id},
).scalar()
def _mark_stalled(conn, source_id: str) -> None:
conn.execute(
text(
"UPDATE ingest_chunk_progress SET status = 'stalled' "
"WHERE source_id = CAST(:sid AS uuid)"
),
{"sid": source_id},
)
class TestInitProgressStatus:
def test_new_row_starts_active(self, pg_conn):
sid = "3c000000-0000-0000-0000-0000000000c1"
IngestChunkProgressRepository(pg_conn).init_progress(sid, 10, "att-1")
assert _status(pg_conn, sid) == "active"
def test_reingest_resets_stalled_to_active(self, pg_conn):
"""A reconciler-escalated 'stalled' row flips back to 'active'
when the source is reingested under a fresh attempt id.
"""
sid = "3c000000-0000-0000-0000-0000000000c2"
repo = IngestChunkProgressRepository(pg_conn)
repo.init_progress(sid, 10, "att-1")
_mark_stalled(pg_conn, sid)
repo.init_progress(sid, 10, "att-2")
assert _status(pg_conn, sid) == "active"
def test_same_attempt_retry_also_clears_stalled(self, pg_conn):
"""A same-attempt resume (Celery autoretry) also clears a stale
'stalled' flag — the task is running again.
"""
sid = "3c000000-0000-0000-0000-0000000000c3"
repo = IngestChunkProgressRepository(pg_conn)
repo.init_progress(sid, 10, "att-1")
_mark_stalled(pg_conn, sid)
repo.init_progress(sid, 10, "att-1")
assert _status(pg_conn, sid) == "active"
class TestDelete:
def test_delete_removes_row(self, pg_conn):
sid = "3c000000-0000-0000-0000-0000000000d1"
repo = IngestChunkProgressRepository(pg_conn)
repo.init_progress(sid, 10, "att-1")
assert repo.delete(sid) is True
assert repo.get_progress(sid) is None
def test_delete_missing_row_returns_false(self, pg_conn):
repo = IngestChunkProgressRepository(pg_conn)
assert repo.delete("3c000000-0000-0000-0000-0000000000df") is False

View File

@@ -342,6 +342,89 @@ class TestMarkToolCallFailed:
assert row[1] == "oops"
def _seed_ingest_progress(
conn,
*,
source_id: str,
embedded: int,
total: int,
age_minutes: int = 31,
status: str = "active",
) -> None:
"""Seed an ingest_chunk_progress row with a backdated last_updated."""
conn.execute(
text(
"""
INSERT INTO ingest_chunk_progress (
source_id, total_chunks, embedded_chunks, last_index,
last_updated, status
)
VALUES (
CAST(:sid AS uuid), :total, :embedded, :embedded - 1,
clock_timestamp() - make_interval(mins => :age), :status
)
"""
),
{
"sid": source_id, "total": total, "embedded": embedded,
"age": age_minutes, "status": status,
},
)
class TestFindAndLockStalledIngests:
def test_returns_stale_active_partial(self, pg_conn):
sid = "2b000000-0000-0000-0000-0000000000b1"
_seed_ingest_progress(pg_conn, source_id=sid, embedded=2, total=10)
rows = ReconciliationRepository(pg_conn).find_and_lock_stalled_ingests()
assert any(str(r["source_id"]) == sid for r in rows)
def test_excludes_already_stalled(self, pg_conn):
sid = "2b000000-0000-0000-0000-0000000000b2"
_seed_ingest_progress(
pg_conn, source_id=sid, embedded=2, total=10, status="stalled",
)
rows = ReconciliationRepository(pg_conn).find_and_lock_stalled_ingests()
assert all(str(r["source_id"]) != sid for r in rows)
def test_excludes_completed(self, pg_conn):
sid = "2b000000-0000-0000-0000-0000000000b3"
_seed_ingest_progress(pg_conn, source_id=sid, embedded=10, total=10)
rows = ReconciliationRepository(pg_conn).find_and_lock_stalled_ingests()
assert all(str(r["source_id"]) != sid for r in rows)
def test_excludes_under_age_threshold(self, pg_conn):
sid = "2b000000-0000-0000-0000-0000000000b4"
_seed_ingest_progress(
pg_conn, source_id=sid, embedded=2, total=10, age_minutes=2,
)
rows = ReconciliationRepository(pg_conn).find_and_lock_stalled_ingests()
assert all(str(r["source_id"]) != sid for r in rows)
class TestMarkIngestStalled:
def test_flips_status_to_stalled(self, pg_conn):
sid = "2b000000-0000-0000-0000-0000000000b5"
_seed_ingest_progress(pg_conn, source_id=sid, embedded=2, total=10)
repo = ReconciliationRepository(pg_conn)
assert repo.mark_ingest_stalled(sid) is True
row = pg_conn.execute(
text(
"SELECT status FROM ingest_chunk_progress "
"WHERE source_id = CAST(:sid AS uuid)"
),
{"sid": sid},
).fetchone()
assert row[0] == "stalled"
def test_returns_false_for_missing_source(self, pg_conn):
repo = ReconciliationRepository(pg_conn)
assert (
repo.mark_ingest_stalled("2b000000-0000-0000-0000-0000000000bf")
is False
)
def _seed_stuck_idempotency(
conn,
*,