Compare commits

..

28 Commits

Author SHA1 Message Date
Alex
827a0bb382 Merge remote-tracking branch 'origin/main' into feat-notification-system
# Conflicts:
#	frontend/src/api/services/userService.ts
#	frontend/src/utils/providerUtils.ts
2026-05-13 22:57:58 +01:00
Alex
b04cb44ab5 fix: e2e tests 2026-05-13 22:49:37 +01:00
Alex
42384a0e92 fix: better docs 2026-05-13 17:03:14 +01:00
Alex
0bce35ad29 feat: events cleanup 2026-05-13 08:36:52 +01:00
Alex
9de8bb4499 chore(events): rename attachment.processing.progress to attachment.progress
The event-type taxonomy was inconsistent: source ingest emits
source.ingest.progress (three segments) while attachments emitted
attachment.processing.progress (four segments). Drops the
.processing. infix for parity. Worker publish sites, the slice
reducer's match, and the worker tests all flip together.

No external consumers — the event type is purely internal between
the publisher and the in-tab slice; safe to rename in one commit.
2026-05-12 19:38:18 +01:00
Alex
cdbd3f061d fix(cache): enable Redis health_check_interval to surface half-open TCP
Without health_check_interval, a half-open TCP socket (NAT silently
dropped state, ELB idle-close) can leave pubsub.get_message hanging
past the SSE generator's keepalive cadence — the kernel never
surfaces the dead socket because no payload is in flight. Setting
health_check_interval=10 makes redis-py ping every 10s when
otherwise idle, so the next get_message after the dead window
raises and the SSE loop falls into its reconnect path instead of
silently freezing on the user.
2026-05-12 19:38:11 +01:00
Alex
2ac46fd858 refactor(sources): move source-id derivation out of worker module
application/api/user/sources/upload.py imported _derive_source_id
from application.worker — pulling the entire Celery worker module
into the API process at import time just for a two-line helper.

Move DOCSGPT_INGEST_NAMESPACE and the derivation function to a
new application/storage/db/source_ids.py module that both layers
can import without that dependency edge. worker.py re-exports the
old names (_derive_source_id, DOCSGPT_INGEST_NAMESPACE) for
backward-compatible imports from tests and any other in-tree
callers; new code should import from the new module directly.
2026-05-12 19:38:04 +01:00
Alex
daa4320da2 docs(events): enumerate publish_user_event None-return paths
The function returns Optional[str] today, with None conflating five
distinct outcomes (missing args / push disabled / unserialisable /
Redis down / XADD failed). Every current call site is fire-and-
forget and ignores the return, so the right move is to document the
five cases rather than promote to an enum return — keeps the API
small while making the diagnostic surface (logs) obvious. If a
future caller needs to react differently per reason, promote then.
2026-05-12 19:37:56 +01:00
Alex
e70a7a5115 fix(notifications): treat /c/new as no current conversation
useMatch('/c/:conversationId') treats the literal URL /c/new as a
real conversation id, so the toast suppression check confused
'user is on /c/new' with 'user is on the conversation needing
approval'. Explicit guard: when the matched id is 'new', fall
through to the no-match case so approval toasts still surface.
2026-05-12 19:16:08 +01:00
Alex
150d9f4e37 test(tasks): cover cleanup_message_events task body
Adds skipped-when-no-POSTGRES_URI and happy-path coverage for the
Celery janitor. The skipped path returns the documented short-circuit
shape without touching the repo. The happy path seeds a backdated
row, runs the task against the pg_conn fixture, and asserts the
retention window's row is deleted while in-window rows survive.
Mirrors the TestCleanupPendingToolState pattern.
2026-05-12 19:16:08 +01:00
Alex
746bcbc5f9 refactor(events): raise on malformed stream id instead of lex fallback
stream_id_compare's lex-fallback branch was a footgun: a malformed id
that sorts lex-greater than a real one would pin live-tail dedup
forever, dropping every subsequent legitimate event silently. Both
current callers in application/api/events/routes.py pre-validate
inputs against _STREAM_ID_RE before calling, so changing the function
to raise ValueError is a no-op on the happy path and turns the future-
caller footgun into a loud failure.
2026-05-12 19:16:08 +01:00
Alex
aa91117fbf docs(message-events): clarify repo vs wrapper payload contract
MessageEventsRepository.record accepts any JSONB-compatible value; the
streaming wrapper record_event tightens this to dicts only because the
live and replay paths reconstruct non-dict payloads differently. Spell
the split out so the next reader of the repo method doesn't assume the
wrapper's contract applies here.
2026-05-12 19:16:08 +01:00
Alex
abbd56cb66 docs(repo): remove stale planning docs from repo root
notification-channel-design.md, plan.md, and reminder-tool-design.md
were leftover Claude planning artifacts from the SSE substrate work
that landed accidentally. CLAUDE.md prohibits creating planning docs
unless asked — delete them.
2026-05-12 19:16:08 +01:00
Alex
85d8375e6c chore(frontend): drop orphaned getTaskStatus client
After the polling-removal sweep no caller in frontend/src/ references
userService.getTaskStatus or endpoints.USER.TASK_STATUS. The backend
route /api/task_status itself stays — agents, webhooks, e2e specs,
and the public docs still depend on it.
2026-05-12 19:16:07 +01:00
Alex
7e98d21b61 chore(upload): drop dead UploadTask.lastEventAt field
The lastEventAt field on UploadTask had no remaining consumers — the
matching Attachment.lastEventAt was cleaned up earlier. Remove the
field declaration and the slice write site.
2026-05-12 19:16:07 +01:00
Alex
249f9f9fe0 perf(streaming): batch message_events INSERTs per stream
complete_stream previously opened a fresh db_session() per yielded
event, doing one Postgres INSERT + commit per chunk on the WSGI
thread. Streaming answers emit ~100s of answer chunks per response,
so the route was paying ~100 PG roundtrips per stream serialized on
commit latency.

New BatchedJournalWriter in application/streaming/message_journal.py
accumulates rows per stream and flushes on three triggers:
- size: buffer reaches 16 entries
- time: 100ms elapsed since the last flush
- lifecycle: close() at end-of-stream

Live pubsub publishes still fire synchronously per record(), so
subscribers see events in real time — only the durable journal write
is amortized. On bulk INSERT IntegrityError the writer falls back to
per-row record() with the existing seq+1 retry so a single colliding
seq doesn't drop the rest of the batch.

complete_stream wires journal_writer.close() into every exit path
(happy end, tool-approval-paused end, GeneratorExit, error handler)
so the terminal event is committed before the generator returns —
otherwise a reconnecting client could snapshot up to the last flush
boundary and live-tail waiting for an end that's still in memory.

Repository gets bulk_record() — one SQLAlchemy executemany INSERT
for the bulk path. All-or-nothing on collision (Postgres aborts the
whole batch); the writer's per-row fallback handles recovery.
2026-05-12 18:20:19 +01:00
Alex
6c4346eb84 fix(streaming): tighten journal contract + recover from seq collisions
Two related fixes to application/streaming/message_journal.py.

1. record_event now rejects non-dict payloads at the gate. The
   live path (base.py::_emit) wrapped non-dicts as
   {"value": payload}; the replay path in event_replay synthesized
   {"type": event_type}. A reconnecting client would receive a
   different envelope than the one originally streamed. Now both
   paths see byte-identical envelopes because non-dicts can't be
   journaled at all. The corresponding event_replay fallback is
   replaced with a warn-and-skip for any legacy rows.

2. record_event handles IntegrityError on (message_id, sequence_no)
   collisions by reading latest_sequence_no and retrying once with
   latest+1. The most likely cause is a stale seq seed on a
   continuation retry where the route read MAX(seq) from a
   separate connection before another writer committed past it.
   Previously the error was swallowed and the event silently
   dropped from the journal; now it lands at the next available
   seq. The live pubsub publish uses the materialised seq so the
   journal row and the live frame agree.
2026-05-12 17:55:16 +01:00
Alex
cb3ca8a36b fix(events): skip replay budget INCR when no snapshot work possible
_allow_replay incremented the per-user counter on every
/api/events GET, including no-op connects from a fresh client
with no cursor against an empty backlog. React StrictMode dev
double-mounts plus a few tabs trivially tripped the default
30-per-60s budget on idle reconnects.

XLEN pre-check: when last_event_id is None and the user stream
is empty, the connect can't do snapshot work — return True
without INCR. Cursor-bearing connects still INCR unconditionally
(probing the cursor's relationship to stream contents would
require a redundant XRANGE).
2026-05-12 17:55:08 +01:00
Alex
4c8230fb6c fix(notifications): dedupe sseEventReceived against immediate dupes
Snapshot replay + live tail can both deliver the same id when the
live pubsub frame and the replay XRANGE overlap. The route's own
dedup floor catches the common case, but consumers walking
``recentEvents`` (FileTree, ConnectorTree, MCPServerModal,
ToolApprovalToast) would otherwise act on the same envelope
twice when a duplicate slipped through.

Belt-and-suspenders: short-circuit when the most recent id in
the ring matches the incoming one.
2026-05-12 17:54:59 +01:00
Alex
649557798d fix(events): drop live publish when journal write fails
application/events/publisher.py returned an envelope to live
pubsub subscribers even when the XADD to the durable journal
failed. The envelope had no ``id`` field, which bypassed the SSE
route's dedup floor and broke ``Last-Event-ID`` semantics for any
reconnecting client.

Best-effort delivery means dropping consistently, not delivering
inconsistent state. Now: if the journal write fails the publisher
returns None and skips the live publish entirely.
2026-05-12 17:54:52 +01:00
Alex
afe8354ca5 chore(mcp-oauth): delete orphaned getMCPOAuthStatus client
The /api/mcp_server/oauth_status/<task_id> endpoint was removed in
the prior commit; the corresponding userService method and the
MCP_OAUTH_STATUS endpoint constant had no remaining callers in the
frontend, so they're deleted along with it.
2026-05-12 16:02:41 +01:00
Alex
5483eb0e27 refactor(mcp-oauth): read status from SSE journal, drop polling endpoint
MCPOAuthManager.get_oauth_status now walks the per-user SSE Streams
journal (user:{user_id}:stream) for the latest mcp.oauth.* envelope
matching the task id, returning the status string derived from the
event type suffix and the payload fields. The worker is the single
source of truth — its publish_user_event calls write the same
record the SSE client receives live.

Removed:
- /api/mcp_server/oauth_status/<task_id> route in
  application/api/user/tools/mcp.py
- mcp_oauth_status worker function and mcp_oauth_status_task Celery
  wrapper
- All mcp_oauth_status:{task_id} Redis setex writes (4 in mcp_oauth,
  2 in DocsGPTOAuth.redirect_handler / callback_handler)
- The update_status closure in mcp_oauth that wrote the polling
  payload

Tests updated:
- get_oauth_status now takes (task_id, user_id); new coverage walks
  a fake xrevrange response for the completed envelope, the no-match
  case, and a Redis-down case
- Removed TestMCPOAuthStatus route tests and TestMcpOauthStatusTask
  celery-wrapper test
- Removed the two oauth_status methods from the integration runner

mcp_oauth:auth_url/state/code/error Redis keys remain — they are
the OAuth flow's own state (not the dropped polling payload).
2026-05-12 16:01:31 +01:00
Alex
bd2985db47 feat(source-ingest): plumb limited flag through SSE for token-cap UX
application/worker.py::ingest_worker and remote_worker now publish
``limited: bool`` on the source.ingest.completed envelope.
uploadSlice routes ``payload.limited === true`` to a failed status
with a ``tokenLimitReached`` flag, and UploadToast surfaces the
translated tokenLimit i18n string. No worker code path sets
limited=true today; this is a forward-looking contract so when
token-cap detection lands, the UX is already wired.
2026-05-12 15:49:15 +01:00
Alex
b99147ba83 refactor(mcp-oauth): carry authorization_url in SSE, remove polling
application/worker.py::mcp_oauth now publishes
authorization_url on the mcp.oauth.awaiting_redirect envelope.
frontend/src/modals/MCPServerModal.tsx consumes it from SSE
instead of polling /oauth_status/<task_id> every 1s.

The URL is generated inside DocsGPTOAuth.redirect_handler when
the FastMCP client triggers OAuth. The worker now plumbs a
publish callback through tool_config -> MCPTool -> DocsGPTOAuth
so the awaiting_redirect publish fires from inside the handler
at the exact point the URL becomes known. The legacy Redis
mcp_oauth_status setex writes and the GET
/api/mcp_server/oauth_status/<task_id> endpoint are kept as
belt-and-suspenders; nothing in the frontend reads them now.
2026-05-12 14:44:42 +01:00
Alex
c3023f8b71 refactor(source-ingest): remove polling, SSE-only
frontend/src/upload/Upload.tsx and
frontend/src/components/FileTree.tsx no longer run getTaskStatus
polling fallbacks. The source.ingest.* SSE reducers in
uploadSlice.ts and FileTree's slice walk are now the sole
drivers of upload/reingest state transitions.
2026-05-12 14:44:33 +01:00
Alex
c168a530f5 feat(connector): consume source.ingest.* SSE, remove polling
frontend/src/components/ConnectorTree.tsx now mirrors FileTree's
slice-walking pattern: it watches notifications.recentEvents
for source.ingest.{completed,failed} envelopes matching the
sync's source id, and no longer polls /task_status every 2s.
2026-05-12 14:44:27 +01:00
Alex
2d539f3199 refactor(attachments): remove polling, SSE-only
frontend/src/components/MessageInput.tsx no longer runs a 2s
setInterval against getTaskStatus for every processing
attachment. The attachment.* SSE reducers in uploadSlice.ts are
now the sole driver of attachment state transitions.
2026-05-12 14:44:21 +01:00
Alex
ed9444cf3d feat: SSE notification system
Adds a per-user SSE pipe (GET /api/events) plus a per-message
chat-stream reconnect endpoint (GET /api/messages/<id>/events).

Backend substrate:
- application/events/ — durable journal (Redis Streams) + live
  pub/sub for user-scoped events, with publish_user_event() as
  the worker-side entrypoint.
- application/streaming/ — broadcast_channel for pub/sub fanout
  and event_replay for the per-message snapshot+tail path.
- application/storage/db/repositories/message_events.py +
  alembic 0007 — Postgres journal for chat-stream events.
- application/worker.py — ingest/reingest/remote/connector/
  attachment/mcp_oauth tasks publish queued/progress/completed/
  failed envelopes alongside their existing status updates.

Frontend client:
- frontend/src/events/ — connect/reconnect, Last-Event-ID cursor,
  backoff with jitter. Each tab runs its own connection; no
  cross-tab dedup (future work).
- frontend/src/notifications/ — recentEvents ring, cursor
  tracking, tool-approval toast.
- frontend/src/upload/uploadSlice.ts — extraReducers for
  source.ingest.* and attachment.* events.

Coverage: 132 SSE tests across events substrate, replay, journal,
routes, and worker publishes.
2026-05-12 14:29:45 +01:00
2 changed files with 42 additions and 116 deletions

View File

@@ -114,8 +114,6 @@ class BaseAgent(ABC):
self.compressed_summary = compressed_summary
self.current_token_count = 0
self.context_limit_reached = False
self.conversation_id: Optional[str] = None
self.initial_user_id: Optional[str] = None
@log_activity()
def gen(

View File

@@ -4,24 +4,19 @@ Fixed 5-second generation (100 tokens × 50 ms/token). No auth. Emits SSE
chunks in OpenAI's chat.completions streaming format, or a single response
when stream=false. Run on 127.0.0.1:8090 — point DocsGPT at it via
OPENAI_BASE_URL=http://127.0.0.1:8090/v1.
Flags:
--tool-calls First response returns a tool call instead of text.
Subsequent responses (after a tool_result) return text.
Useful for triggering the tool-execution loop.
"""
import argparse
import asyncio
import json
import logging
import time
import uuid
from flask import Flask, Response, request, jsonify
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse, StreamingResponse
TOKEN_COUNT = 100
TOKEN_DELAY_S = 0.05 # 100 * 0.05 = 5.0 s
TOOL_CALL_MODE = False
logger = logging.getLogger("mock_llm")
logging.basicConfig(level=logging.INFO, format="%(asctime)s mock: %(message)s")
@@ -44,7 +39,7 @@ FILLER_TOKENS = [
".",
]
app = Flask(__name__)
app = FastAPI()
def _token_stream_id() -> str:
@@ -68,57 +63,11 @@ def _sse_chunk(completion_id: str, model: str, delta: dict, finish_reason=None)
return f"data: {json.dumps(payload)}\n\n"
def _gen_tool_call_stream(model: str, req_id: str):
"""Emit two tool_calls (search) in streaming format.
Two calls ensure the handler executes the first (which can return a
huge result), then hits _check_context_limit before the second.
"""
completion_id = _token_stream_id()
call_id_1 = f"call_{uuid.uuid4().hex[:12]}"
call_id_2 = f"call_{uuid.uuid4().hex[:12]}"
yield _sse_chunk(completion_id, model, {
"role": "assistant",
"content": None,
"tool_calls": [
{
"index": 0,
"id": call_id_1,
"type": "function",
"function": {"name": "search", "arguments": ""},
},
{
"index": 1,
"id": call_id_2,
"type": "function",
"function": {"name": "search", "arguments": ""},
},
],
})
args_json = json.dumps({"query": "Python programming basics"})
for ch in args_json:
time.sleep(TOKEN_DELAY_S)
yield _sse_chunk(completion_id, model, {
"tool_calls": [
{"index": 0, "function": {"arguments": ch}},
{"index": 1, "function": {"arguments": ch}},
],
})
yield _sse_chunk(completion_id, model, {}, finish_reason="tool_calls")
yield "data: [DONE]\n\n"
logger.info("[%s] tool_call stream done (ids=%s, %s)", req_id, call_id_1, call_id_2)
def _has_tool_result(messages: list) -> bool:
return any(m.get("role") == "tool" for m in messages)
def _gen_text_stream(model: str, req_id: str):
async def _stream_response(model: str, req_id: str):
completion_id = _token_stream_id()
yield _sse_chunk(completion_id, model, {"role": "assistant", "content": ""})
for tok in FILLER_TOKENS[:TOKEN_COUNT]:
time.sleep(TOKEN_DELAY_S)
for i, tok in enumerate(FILLER_TOKENS[:TOKEN_COUNT]):
await asyncio.sleep(TOKEN_DELAY_S)
yield _sse_chunk(completion_id, model, {"content": tok})
yield _sse_chunk(completion_id, model, {}, finish_reason="stop")
yield "data: [DONE]\n\n"
@@ -126,84 +75,63 @@ def _gen_text_stream(model: str, req_id: str):
@app.post("/v1/chat/completions")
def chat_completions():
body = request.get_json(force=True)
async def chat_completions(request: Request):
body = await request.json()
model = body.get("model", "mock")
stream = bool(body.get("stream", False))
messages = body.get("messages", [])
tools = body.get("tools")
req_id = uuid.uuid4().hex[:8]
logger.info(
"[%s] /chat/completions stream=%s model=%s tools=%s msgs=%d",
req_id, stream, model, bool(tools), len(messages),
)
use_tool_call = (
TOOL_CALL_MODE
and tools
and not _has_tool_result(messages)
)
logger.info("[%s] /chat/completions stream=%s model=%s max_tokens=%s", req_id, stream, model, body.get("max_tokens"))
if stream:
gen = (
_gen_tool_call_stream(model, req_id) if use_tool_call
else _gen_text_stream(model, req_id)
)
return Response(
gen,
mimetype="text/event-stream",
return StreamingResponse(
_stream_response(model, req_id),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache, no-transform",
"X-Accel-Buffering": "no",
},
)
time.sleep(TOKEN_COUNT * TOKEN_DELAY_S)
await asyncio.sleep(TOKEN_COUNT * TOKEN_DELAY_S)
logger.info("[%s] non-stream done", req_id)
text = "".join(FILLER_TOKENS[:TOKEN_COUNT])
completion_id = _token_stream_id()
return jsonify({
"id": completion_id,
"object": "chat.completion",
"created": int(time.time()),
"model": model,
"choices": [
{
"index": 0,
"message": {"role": "assistant", "content": text},
"finish_reason": "stop",
}
],
"usage": {
"prompt_tokens": 10,
"completion_tokens": TOKEN_COUNT,
"total_tokens": 10 + TOKEN_COUNT,
},
})
return JSONResponse(
{
"id": completion_id,
"object": "chat.completion",
"created": int(time.time()),
"model": model,
"choices": [
{
"index": 0,
"message": {"role": "assistant", "content": text},
"finish_reason": "stop",
}
],
"usage": {
"prompt_tokens": 10,
"completion_tokens": TOKEN_COUNT,
"total_tokens": 10 + TOKEN_COUNT,
},
}
)
@app.get("/v1/models")
def list_models():
return jsonify({
async def list_models():
return {
"object": "list",
"data": [{"id": "mock", "object": "model", "owned_by": "mock"}],
})
}
@app.get("/health")
def health():
return jsonify({"status": "ok"})
async def health():
return {"status": "ok"}
if __name__ == "__main__":
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--tool-calls", action="store_true",
help="First response returns a tool_call; subsequent responses return text.",
)
parser.add_argument("--port", type=int, default=8090)
args = parser.parse_args()
TOOL_CALL_MODE = args.tool_calls
if TOOL_CALL_MODE:
logger.info("Tool-call mode enabled")
app.run(host="127.0.0.1", port=args.port, debug=False, threaded=True)
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=8090, log_level="info")