feat(kiro): enhance thinking support and fix truncation issues

- **Thinking Support**:
    - Enabled thinking support for all Kiro Claude models, including Haiku 4.5 and agentic variants.
    - Updated `model_definitions.go` with thinking configuration (Min: 1024, Max: 32000, ZeroAllowed: true).
    - Fixed `extended_thinking` field names in `model_registry.go` (from `min_budget`/`max_budget` to `min`/`max`) to comply with Claude API specs, enabling thinking control in clients like Claude Code.

- **Kiro Executor Fixes**:
    - Fixed `budget_tokens` handling: explicitly disable thinking when budget is 0 or negative.
    - Removed aggressive duplicate content filtering logic that caused truncation/data loss.
    - Enhanced thinking tag parsing with `extractThinkingFromContent` to correctly handle interleaved thinking/text blocks.
    - Added EOF handling to flush pending thinking tag characters, preventing data loss at stream end.

- **Performance**:
    - Optimized Claude stream handler (v6.2) with reduced buffer size (4KB) and faster flush interval (50ms) to minimize latency and prevent timeouts.
This commit is contained in:
Ravens2121
2025-12-13 03:51:14 +08:00
parent 05b499fb83
commit db80b20bc2
4 changed files with 219 additions and 28 deletions

View File

@@ -219,12 +219,12 @@ func (h *ClaudeCodeAPIHandler) handleStreamingResponse(c *gin.Context, rawJSON [
}
func (h *ClaudeCodeAPIHandler) forwardClaudeStream(c *gin.Context, flusher http.Flusher, cancel func(error), data <-chan []byte, errs <-chan *interfaces.ErrorMessage) {
// v6.1: Intelligent Buffered Streamer strategy
// Enhanced buffering with larger buffer size (16KB) and longer flush interval (120ms).
// Smart flush only when buffer is sufficiently filled (≥50%), dramatically reducing
// flush frequency from ~12.5Hz to ~5-8Hz while maintaining low latency.
writer := bufio.NewWriterSize(c.Writer, 16*1024) // 4KB → 16KB
ticker := time.NewTicker(120 * time.Millisecond) // 80ms → 120ms
// v6.2: Immediate flush strategy for SSE streams
// SSE requires immediate data delivery to prevent client timeouts.
// Previous buffering strategy (16KB buffer, 8KB threshold) caused delays
// because SSE events are typically small (< 1KB), leading to client retries.
writer := bufio.NewWriterSize(c.Writer, 4*1024) // 4KB buffer (smaller for faster flush)
ticker := time.NewTicker(50 * time.Millisecond) // 50ms interval for responsive streaming
defer ticker.Stop()
var chunkIdx int
@@ -238,10 +238,9 @@ func (h *ClaudeCodeAPIHandler) forwardClaudeStream(c *gin.Context, flusher http.
return
case <-ticker.C:
// Smart flush: only flush when buffer has sufficient data (≥50% full)
// This reduces flush frequency while ensuring data flows naturally
buffered := writer.Buffered()
if buffered >= 8*1024 { // At least 8KB (50% of 16KB buffer)
// Flush any buffered data on timer to ensure responsiveness
// For SSE, we flush whenever there's any data to prevent client timeouts
if writer.Buffered() > 0 {
if err := writer.Flush(); err != nil {
// Error flushing, cancel and return
cancel(err)
@@ -254,6 +253,7 @@ func (h *ClaudeCodeAPIHandler) forwardClaudeStream(c *gin.Context, flusher http.
if !ok {
// Stream ended, flush remaining data
_ = writer.Flush()
flusher.Flush()
cancel(nil)
return
}
@@ -263,6 +263,12 @@ func (h *ClaudeCodeAPIHandler) forwardClaudeStream(c *gin.Context, flusher http.
// The handler just needs to forward it without reassembly.
if len(chunk) > 0 {
_, _ = writer.Write(chunk)
// Immediately flush for first few chunks to establish connection quickly
// This prevents client timeout/retry on slow backends like Kiro
if chunkIdx < 3 {
_ = writer.Flush()
flusher.Flush()
}
}
chunkIdx++