feat: stream thinking tokens

This commit is contained in:
Alex
2026-02-09 11:53:25 +00:00
parent e602d941ca
commit cd8029840f
8 changed files with 320 additions and 19 deletions

View File

@@ -378,6 +378,22 @@ class GoogleLLM(BaseLLM):
last_preview = f"{last_preview[:preview_chars]}..."
return f"count={message_count}, last='{last_preview}'"
@staticmethod
def _get_text_value(part):
"""Get text from both SDK objects and dict-shaped test doubles."""
if isinstance(part, dict):
value = part.get("text")
return value if isinstance(value, str) else ""
value = getattr(part, "text", None)
return value if isinstance(value, str) else ""
@staticmethod
def _is_thought_part(part):
"""Detect Gemini thinking parts when available."""
if isinstance(part, dict):
return bool(part.get("thought"))
return bool(getattr(part, "thought", False))
def _raw_gen(
self,
baseself,
@@ -438,7 +454,6 @@ class GoogleLLM(BaseLLM):
if tools:
cleaned_tools = self._clean_tools_format(tools)
config.tools = cleaned_tools
# Add response schema for structured output if provided
if response_schema:
config.response_schema = response_schema
@@ -475,10 +490,23 @@ class GoogleLLM(BaseLLM):
for part in candidate.content.parts:
if part.function_call:
yield part
elif part.text:
yield part.text
continue
part_text = self._get_text_value(part)
if not part_text:
continue
if self._is_thought_part(part):
yield {"type": "thought", "thought": part_text}
else:
yield part_text
elif hasattr(chunk, "text"):
yield chunk.text
chunk_text = self._get_text_value(chunk)
if chunk_text:
if self._is_thought_part(chunk):
yield {"type": "thought", "thought": chunk_text}
else:
yield chunk_text
finally:
if hasattr(response, "close"):
response.close()

View File

@@ -878,6 +878,9 @@ class LLMHandler(ABC):
tool_calls = {}
for chunk in self._iterate_stream(response):
if isinstance(chunk, dict) and chunk.get("type") == "thought":
yield chunk
continue
if isinstance(chunk, str):
yield chunk
continue

View File

@@ -151,6 +151,51 @@ class OpenAILLM(BaseLLM):
raise ValueError(f"Unexpected content type: {type(content)}")
return cleaned_messages
@staticmethod
def _normalize_reasoning_value(value):
"""Normalize reasoning payloads from OpenAI-compatible stream chunks."""
if value is None:
return ""
if isinstance(value, str):
return value
if isinstance(value, list):
return "".join(
OpenAILLM._normalize_reasoning_value(item) for item in value
)
if isinstance(value, dict):
for key in ("text", "content", "value", "reasoning_content", "reasoning"):
normalized = OpenAILLM._normalize_reasoning_value(value.get(key))
if normalized:
return normalized
return ""
for attr in ("text", "content", "value"):
if hasattr(value, attr):
normalized = OpenAILLM._normalize_reasoning_value(getattr(value, attr))
if normalized:
return normalized
return ""
@classmethod
def _extract_reasoning_text(cls, delta):
"""Extract reasoning/thinking tokens from OpenAI-compatible delta chunks."""
if delta is None:
return ""
for key in (
"reasoning_content",
"reasoning",
"thinking",
"thinking_content",
):
value = getattr(delta, key, None)
if value is None and isinstance(delta, dict):
value = delta.get(key)
normalized = cls._normalize_reasoning_value(value)
if normalized:
return normalized
return ""
def _raw_gen(
self,
baseself,
@@ -221,14 +266,26 @@ class OpenAILLM(BaseLLM):
try:
for line in response:
logging.debug(f"OpenAI stream line: {line}")
if (
len(line.choices) > 0
and line.choices[0].delta.content is not None
and len(line.choices[0].delta.content) > 0
):
yield line.choices[0].delta.content
elif len(line.choices) > 0:
yield line.choices[0]
if not getattr(line, "choices", None):
continue
choice = line.choices[0]
delta = getattr(choice, "delta", None)
reasoning_text = self._extract_reasoning_text(delta)
if reasoning_text:
yield {"type": "thought", "thought": reasoning_text}
content = getattr(delta, "content", None)
if isinstance(content, str) and content:
yield content
continue
has_tool_calls = bool(getattr(delta, "tool_calls", None))
finish_reason = getattr(choice, "finish_reason", None)
# Yield non-content chunks only when needed for tool-call handling.
if has_tool_calls or finish_reason == "tool_calls":
yield choice
finally:
if hasattr(response, "close"):
response.close()