From 40e7f066e493b918705d0043e27d709dd5e8cd58 Mon Sep 17 00:00:00 2001 From: Ravens2121 Date: Fri, 12 Dec 2025 01:59:06 +0800 Subject: [PATCH] feat(kiro): enhance Kiro executor with retry, deduplication and event filtering --- internal/api/server.go | 6 ++ internal/runtime/executor/kiro_executor.go | 82 +++++++++++++++++++--- 2 files changed, 80 insertions(+), 8 deletions(-) diff --git a/internal/api/server.go b/internal/api/server.go index e1cea9e9..ade08fef 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -349,6 +349,12 @@ func (s *Server) setupRoutes() { }, }) }) + + // Event logging endpoint - handles Claude Code telemetry requests + // Returns 200 OK to prevent 404 errors in logs + s.engine.POST("/api/event_logging/batch", func(c *gin.Context) { + c.JSON(http.StatusOK, gin.H{"status": "ok"}) + }) s.engine.POST("/v1internal:method", geminiCLIHandlers.CLIHandler) // OAuth callback endpoints (reuse main server port) diff --git a/internal/runtime/executor/kiro_executor.go b/internal/runtime/executor/kiro_executor.go index 84fd990c..bcdebe1f 100644 --- a/internal/runtime/executor/kiro_executor.go +++ b/internal/runtime/executor/kiro_executor.go @@ -255,6 +255,26 @@ func (e *KiroExecutor) executeWithRetry(ctx context.Context, auth *cliproxyauth. return resp, statusErr{code: httpResp.StatusCode, msg: string(respBody)} } + // Handle 5xx server errors with exponential backoff retry + if httpResp.StatusCode >= 500 && httpResp.StatusCode < 600 { + respBody, _ := io.ReadAll(httpResp.Body) + _ = httpResp.Body.Close() + appendAPIResponseChunk(ctx, e.cfg, respBody) + + if attempt < maxRetries { + // Exponential backoff: 1s, 2s, 4s... (max 30s) + backoff := time.Duration(1< 30*time.Second { + backoff = 30 * time.Second + } + log.Warnf("kiro: server error %d, retrying in %v (attempt %d/%d)", httpResp.StatusCode, backoff, attempt+1, maxRetries) + time.Sleep(backoff) + continue + } + log.Errorf("kiro: server error %d after %d retries", httpResp.StatusCode, maxRetries) + return resp, statusErr{code: httpResp.StatusCode, msg: string(respBody)} + } + // Handle 401/403 errors with token refresh and retry if httpResp.StatusCode == 401 || httpResp.StatusCode == 403 { respBody, _ := io.ReadAll(httpResp.Body) @@ -485,6 +505,26 @@ func (e *KiroExecutor) executeStreamWithRetry(ctx context.Context, auth *cliprox return nil, statusErr{code: httpResp.StatusCode, msg: string(respBody)} } + // Handle 5xx server errors with exponential backoff retry + if httpResp.StatusCode >= 500 && httpResp.StatusCode < 600 { + respBody, _ := io.ReadAll(httpResp.Body) + _ = httpResp.Body.Close() + appendAPIResponseChunk(ctx, e.cfg, respBody) + + if attempt < maxRetries { + // Exponential backoff: 1s, 2s, 4s... (max 30s) + backoff := time.Duration(1< 30*time.Second { + backoff = 30 * time.Second + } + log.Warnf("kiro: stream server error %d, retrying in %v (attempt %d/%d)", httpResp.StatusCode, backoff, attempt+1, maxRetries) + time.Sleep(backoff) + continue + } + log.Errorf("kiro: stream server error %d after %d retries", httpResp.StatusCode, maxRetries) + return nil, statusErr{code: httpResp.StatusCode, msg: string(respBody)} + } + // Handle 401/403 errors with token refresh and retry if httpResp.StatusCode == 401 || httpResp.StatusCode == 403 { respBody, _ := io.ReadAll(httpResp.Body) @@ -1162,6 +1202,11 @@ func (e *KiroExecutor) parseEventStream(body io.Reader) (string, []kiroToolUse, // Handle different event types switch eventType { + case "followupPromptEvent": + // Filter out followupPrompt events - these are UI suggestions, not content + log.Debugf("kiro: parseEventStream ignoring followupPrompt event") + continue + case "assistantResponseEvent": if assistantResp, ok := event["assistantResponseEvent"].(map[string]interface{}); ok { if contentText, ok := assistantResp["content"].(string); ok { @@ -1570,6 +1615,11 @@ func (e *KiroExecutor) streamToChannel(ctx context.Context, body io.Reader, out } switch eventType { + case "followupPromptEvent": + // Filter out followupPrompt events - these are UI suggestions, not content + log.Debugf("kiro: streamToChannel ignoring followupPrompt event") + continue + case "assistantResponseEvent": var contentDelta string var toolUses []map[string]interface{} @@ -1961,7 +2011,8 @@ func (e *KiroExecutor) buildClaudeFinalEvent() []byte { return []byte("event: message_stop\ndata: " + string(result)) } -// CountTokens is not supported for the Kiro provider. +// CountTokens is not supported for Kiro provider. +// Kiro/Amazon Q backend doesn't expose a token counting API. func (e *KiroExecutor) CountTokens(context.Context, *cliproxyauth.Auth, cliproxyexecutor.Request, cliproxyexecutor.Options) (cliproxyexecutor.Response, error) { return cliproxyexecutor.Response{}, statusErr{code: http.StatusNotImplemented, msg: "count tokens not supported for kiro"} } @@ -2988,18 +3039,33 @@ func (e *KiroExecutor) processToolUseEvent(event map[string]interface{}, current return toolUses, currentToolUse } -// deduplicateToolUses removes duplicate tool uses based on toolUseId. +// deduplicateToolUses removes duplicate tool uses based on toolUseId and content (name+arguments). +// This prevents both ID-based duplicates and content-based duplicates (same tool call with different IDs). func deduplicateToolUses(toolUses []kiroToolUse) []kiroToolUse { - seen := make(map[string]bool) + seenIDs := make(map[string]bool) + seenContent := make(map[string]bool) // Content-based deduplication (name + arguments) var unique []kiroToolUse for _, tu := range toolUses { - if !seen[tu.ToolUseID] { - seen[tu.ToolUseID] = true - unique = append(unique, tu) - } else { - log.Debugf("kiro: removing duplicate tool use: %s", tu.ToolUseID) + // Skip if we've already seen this ID + if seenIDs[tu.ToolUseID] { + log.Debugf("kiro: removing ID-duplicate tool use: %s (name: %s)", tu.ToolUseID, tu.Name) + continue } + + // Build content key for content-based deduplication + inputJSON, _ := json.Marshal(tu.Input) + contentKey := tu.Name + ":" + string(inputJSON) + + // Skip if we've already seen this content (same name + arguments) + if seenContent[contentKey] { + log.Debugf("kiro: removing content-duplicate tool use: %s (id: %s)", tu.Name, tu.ToolUseID) + continue + } + + seenIDs[tu.ToolUseID] = true + seenContent[contentKey] = true + unique = append(unique, tu) } return unique