feat(kiro): enhance Kiro executor with retry, deduplication and event filtering

This commit is contained in:
Ravens2121
2025-12-12 01:59:06 +08:00
parent ef0edbfe69
commit 40e7f066e4
2 changed files with 80 additions and 8 deletions

View File

@@ -349,6 +349,12 @@ func (s *Server) setupRoutes() {
},
})
})
// Event logging endpoint - handles Claude Code telemetry requests
// Returns 200 OK to prevent 404 errors in logs
s.engine.POST("/api/event_logging/batch", func(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"status": "ok"})
})
s.engine.POST("/v1internal:method", geminiCLIHandlers.CLIHandler)
// OAuth callback endpoints (reuse main server port)

View File

@@ -255,6 +255,26 @@ func (e *KiroExecutor) executeWithRetry(ctx context.Context, auth *cliproxyauth.
return resp, statusErr{code: httpResp.StatusCode, msg: string(respBody)}
}
// Handle 5xx server errors with exponential backoff retry
if httpResp.StatusCode >= 500 && httpResp.StatusCode < 600 {
respBody, _ := io.ReadAll(httpResp.Body)
_ = httpResp.Body.Close()
appendAPIResponseChunk(ctx, e.cfg, respBody)
if attempt < maxRetries {
// Exponential backoff: 1s, 2s, 4s... (max 30s)
backoff := time.Duration(1<<attempt) * time.Second
if backoff > 30*time.Second {
backoff = 30 * time.Second
}
log.Warnf("kiro: server error %d, retrying in %v (attempt %d/%d)", httpResp.StatusCode, backoff, attempt+1, maxRetries)
time.Sleep(backoff)
continue
}
log.Errorf("kiro: server error %d after %d retries", httpResp.StatusCode, maxRetries)
return resp, statusErr{code: httpResp.StatusCode, msg: string(respBody)}
}
// Handle 401/403 errors with token refresh and retry
if httpResp.StatusCode == 401 || httpResp.StatusCode == 403 {
respBody, _ := io.ReadAll(httpResp.Body)
@@ -485,6 +505,26 @@ func (e *KiroExecutor) executeStreamWithRetry(ctx context.Context, auth *cliprox
return nil, statusErr{code: httpResp.StatusCode, msg: string(respBody)}
}
// Handle 5xx server errors with exponential backoff retry
if httpResp.StatusCode >= 500 && httpResp.StatusCode < 600 {
respBody, _ := io.ReadAll(httpResp.Body)
_ = httpResp.Body.Close()
appendAPIResponseChunk(ctx, e.cfg, respBody)
if attempt < maxRetries {
// Exponential backoff: 1s, 2s, 4s... (max 30s)
backoff := time.Duration(1<<attempt) * time.Second
if backoff > 30*time.Second {
backoff = 30 * time.Second
}
log.Warnf("kiro: stream server error %d, retrying in %v (attempt %d/%d)", httpResp.StatusCode, backoff, attempt+1, maxRetries)
time.Sleep(backoff)
continue
}
log.Errorf("kiro: stream server error %d after %d retries", httpResp.StatusCode, maxRetries)
return nil, statusErr{code: httpResp.StatusCode, msg: string(respBody)}
}
// Handle 401/403 errors with token refresh and retry
if httpResp.StatusCode == 401 || httpResp.StatusCode == 403 {
respBody, _ := io.ReadAll(httpResp.Body)
@@ -1162,6 +1202,11 @@ func (e *KiroExecutor) parseEventStream(body io.Reader) (string, []kiroToolUse,
// Handle different event types
switch eventType {
case "followupPromptEvent":
// Filter out followupPrompt events - these are UI suggestions, not content
log.Debugf("kiro: parseEventStream ignoring followupPrompt event")
continue
case "assistantResponseEvent":
if assistantResp, ok := event["assistantResponseEvent"].(map[string]interface{}); ok {
if contentText, ok := assistantResp["content"].(string); ok {
@@ -1570,6 +1615,11 @@ func (e *KiroExecutor) streamToChannel(ctx context.Context, body io.Reader, out
}
switch eventType {
case "followupPromptEvent":
// Filter out followupPrompt events - these are UI suggestions, not content
log.Debugf("kiro: streamToChannel ignoring followupPrompt event")
continue
case "assistantResponseEvent":
var contentDelta string
var toolUses []map[string]interface{}
@@ -1961,7 +2011,8 @@ func (e *KiroExecutor) buildClaudeFinalEvent() []byte {
return []byte("event: message_stop\ndata: " + string(result))
}
// CountTokens is not supported for the Kiro provider.
// CountTokens is not supported for Kiro provider.
// Kiro/Amazon Q backend doesn't expose a token counting API.
func (e *KiroExecutor) CountTokens(context.Context, *cliproxyauth.Auth, cliproxyexecutor.Request, cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {
return cliproxyexecutor.Response{}, statusErr{code: http.StatusNotImplemented, msg: "count tokens not supported for kiro"}
}
@@ -2988,18 +3039,33 @@ func (e *KiroExecutor) processToolUseEvent(event map[string]interface{}, current
return toolUses, currentToolUse
}
// deduplicateToolUses removes duplicate tool uses based on toolUseId.
// deduplicateToolUses removes duplicate tool uses based on toolUseId and content (name+arguments).
// This prevents both ID-based duplicates and content-based duplicates (same tool call with different IDs).
func deduplicateToolUses(toolUses []kiroToolUse) []kiroToolUse {
seen := make(map[string]bool)
seenIDs := make(map[string]bool)
seenContent := make(map[string]bool) // Content-based deduplication (name + arguments)
var unique []kiroToolUse
for _, tu := range toolUses {
if !seen[tu.ToolUseID] {
seen[tu.ToolUseID] = true
unique = append(unique, tu)
} else {
log.Debugf("kiro: removing duplicate tool use: %s", tu.ToolUseID)
// Skip if we've already seen this ID
if seenIDs[tu.ToolUseID] {
log.Debugf("kiro: removing ID-duplicate tool use: %s (name: %s)", tu.ToolUseID, tu.Name)
continue
}
// Build content key for content-based deduplication
inputJSON, _ := json.Marshal(tu.Input)
contentKey := tu.Name + ":" + string(inputJSON)
// Skip if we've already seen this content (same name + arguments)
if seenContent[contentKey] {
log.Debugf("kiro: removing content-duplicate tool use: %s (id: %s)", tu.Name, tu.ToolUseID)
continue
}
seenIDs[tu.ToolUseID] = true
seenContent[contentKey] = true
unique = append(unique, tu)
}
return unique