Compare commits

...

1 Commits

Author SHA1 Message Date
Pavel
29477b40b3 define conversation_id and initial_user_id on BaseAgent (#2474)
These attributes were only set by StreamProcessor after agent creation,
causing an AttributeError in _perform_mid_execution_compression when
the context limit was hit through other code paths (e.g. worker).
Declaring them as None in init lets the handler fall through to
in-memory compression gracefully.
2026-05-15 15:33:34 +01:00
2 changed files with 116 additions and 42 deletions

View File

@@ -114,6 +114,8 @@ class BaseAgent(ABC):
self.compressed_summary = compressed_summary
self.current_token_count = 0
self.context_limit_reached = False
self.conversation_id: Optional[str] = None
self.initial_user_id: Optional[str] = None
@log_activity()
def gen(

View File

@@ -4,19 +4,24 @@ Fixed 5-second generation (100 tokens × 50 ms/token). No auth. Emits SSE
chunks in OpenAI's chat.completions streaming format, or a single response
when stream=false. Run on 127.0.0.1:8090 — point DocsGPT at it via
OPENAI_BASE_URL=http://127.0.0.1:8090/v1.
Flags:
--tool-calls First response returns a tool call instead of text.
Subsequent responses (after a tool_result) return text.
Useful for triggering the tool-execution loop.
"""
import asyncio
import argparse
import json
import logging
import time
import uuid
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse, StreamingResponse
from flask import Flask, Response, request, jsonify
TOKEN_COUNT = 100
TOKEN_DELAY_S = 0.05 # 100 * 0.05 = 5.0 s
TOOL_CALL_MODE = False
logger = logging.getLogger("mock_llm")
logging.basicConfig(level=logging.INFO, format="%(asctime)s mock: %(message)s")
@@ -39,7 +44,7 @@ FILLER_TOKENS = [
".",
]
app = FastAPI()
app = Flask(__name__)
def _token_stream_id() -> str:
@@ -63,11 +68,57 @@ def _sse_chunk(completion_id: str, model: str, delta: dict, finish_reason=None)
return f"data: {json.dumps(payload)}\n\n"
async def _stream_response(model: str, req_id: str):
def _gen_tool_call_stream(model: str, req_id: str):
"""Emit two tool_calls (search) in streaming format.
Two calls ensure the handler executes the first (which can return a
huge result), then hits _check_context_limit before the second.
"""
completion_id = _token_stream_id()
call_id_1 = f"call_{uuid.uuid4().hex[:12]}"
call_id_2 = f"call_{uuid.uuid4().hex[:12]}"
yield _sse_chunk(completion_id, model, {
"role": "assistant",
"content": None,
"tool_calls": [
{
"index": 0,
"id": call_id_1,
"type": "function",
"function": {"name": "search", "arguments": ""},
},
{
"index": 1,
"id": call_id_2,
"type": "function",
"function": {"name": "search", "arguments": ""},
},
],
})
args_json = json.dumps({"query": "Python programming basics"})
for ch in args_json:
time.sleep(TOKEN_DELAY_S)
yield _sse_chunk(completion_id, model, {
"tool_calls": [
{"index": 0, "function": {"arguments": ch}},
{"index": 1, "function": {"arguments": ch}},
],
})
yield _sse_chunk(completion_id, model, {}, finish_reason="tool_calls")
yield "data: [DONE]\n\n"
logger.info("[%s] tool_call stream done (ids=%s, %s)", req_id, call_id_1, call_id_2)
def _has_tool_result(messages: list) -> bool:
return any(m.get("role") == "tool" for m in messages)
def _gen_text_stream(model: str, req_id: str):
completion_id = _token_stream_id()
yield _sse_chunk(completion_id, model, {"role": "assistant", "content": ""})
for i, tok in enumerate(FILLER_TOKENS[:TOKEN_COUNT]):
await asyncio.sleep(TOKEN_DELAY_S)
for tok in FILLER_TOKENS[:TOKEN_COUNT]:
time.sleep(TOKEN_DELAY_S)
yield _sse_chunk(completion_id, model, {"content": tok})
yield _sse_chunk(completion_id, model, {}, finish_reason="stop")
yield "data: [DONE]\n\n"
@@ -75,63 +126,84 @@ async def _stream_response(model: str, req_id: str):
@app.post("/v1/chat/completions")
async def chat_completions(request: Request):
body = await request.json()
def chat_completions():
body = request.get_json(force=True)
model = body.get("model", "mock")
stream = bool(body.get("stream", False))
messages = body.get("messages", [])
tools = body.get("tools")
req_id = uuid.uuid4().hex[:8]
logger.info("[%s] /chat/completions stream=%s model=%s max_tokens=%s", req_id, stream, model, body.get("max_tokens"))
logger.info(
"[%s] /chat/completions stream=%s model=%s tools=%s msgs=%d",
req_id, stream, model, bool(tools), len(messages),
)
use_tool_call = (
TOOL_CALL_MODE
and tools
and not _has_tool_result(messages)
)
if stream:
return StreamingResponse(
_stream_response(model, req_id),
media_type="text/event-stream",
gen = (
_gen_tool_call_stream(model, req_id) if use_tool_call
else _gen_text_stream(model, req_id)
)
return Response(
gen,
mimetype="text/event-stream",
headers={
"Cache-Control": "no-cache, no-transform",
"X-Accel-Buffering": "no",
},
)
await asyncio.sleep(TOKEN_COUNT * TOKEN_DELAY_S)
time.sleep(TOKEN_COUNT * TOKEN_DELAY_S)
logger.info("[%s] non-stream done", req_id)
text = "".join(FILLER_TOKENS[:TOKEN_COUNT])
completion_id = _token_stream_id()
return JSONResponse(
{
"id": completion_id,
"object": "chat.completion",
"created": int(time.time()),
"model": model,
"choices": [
{
"index": 0,
"message": {"role": "assistant", "content": text},
"finish_reason": "stop",
}
],
"usage": {
"prompt_tokens": 10,
"completion_tokens": TOKEN_COUNT,
"total_tokens": 10 + TOKEN_COUNT,
},
}
)
return jsonify({
"id": completion_id,
"object": "chat.completion",
"created": int(time.time()),
"model": model,
"choices": [
{
"index": 0,
"message": {"role": "assistant", "content": text},
"finish_reason": "stop",
}
],
"usage": {
"prompt_tokens": 10,
"completion_tokens": TOKEN_COUNT,
"total_tokens": 10 + TOKEN_COUNT,
},
})
@app.get("/v1/models")
async def list_models():
return {
def list_models():
return jsonify({
"object": "list",
"data": [{"id": "mock", "object": "model", "owned_by": "mock"}],
}
})
@app.get("/health")
async def health():
return {"status": "ok"}
def health():
return jsonify({"status": "ok"})
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=8090, log_level="info")
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--tool-calls", action="store_true",
help="First response returns a tool_call; subsequent responses return text.",
)
parser.add_argument("--port", type=int, default=8090)
args = parser.parse_args()
TOOL_CALL_MODE = args.tool_calls
if TOOL_CALL_MODE:
logger.info("Tool-call mode enabled")
app.run(host="127.0.0.1", port=args.port, debug=False, threaded=True)