diff --git a/internal/runtime/executor/kiro_executor.go b/internal/runtime/executor/kiro_executor.go index b69fd8be..534e0c58 100644 --- a/internal/runtime/executor/kiro_executor.go +++ b/internal/runtime/executor/kiro_executor.go @@ -121,7 +121,12 @@ func (e *KiroExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req return resp, fmt.Errorf("kiro: access token not found in auth") } if profileArn == "" { - log.Warnf("kiro: profile ARN not found in auth, API calls may fail") + // Only warn if not using builder-id auth (which doesn't need profileArn) + if auth == nil || auth.Metadata == nil { + log.Debugf("kiro: profile ARN not found in auth (may be normal for builder-id)") + } else if authMethod, ok := auth.Metadata["auth_method"].(string); !ok || authMethod != "builder-id" { + log.Warnf("kiro: profile ARN not found in auth, API calls may fail") + } } reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth) @@ -161,10 +166,19 @@ func (e *KiroExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req currentOrigin = "CLI" } - kiroPayload := e.buildKiroPayload(body, kiroModelID, profileArn, currentOrigin, isAgentic, isChatOnly) + // Determine if profileArn should be included based on auth method + // profileArn is only needed for social auth (Google OAuth), not for builder-id (AWS SSO) + effectiveProfileArn := profileArn + if auth != nil && auth.Metadata != nil { + if authMethod, ok := auth.Metadata["auth_method"].(string); ok && authMethod == "builder-id" { + effectiveProfileArn = "" // Don't include profileArn for builder-id auth + } + } + + kiroPayload := e.buildKiroPayload(body, kiroModelID, effectiveProfileArn, currentOrigin, isAgentic, isChatOnly) // Execute with retry on 401/403 and 429 (quota exhausted) - resp, err = e.executeWithRetry(ctx, auth, req, opts, accessToken, profileArn, kiroPayload, body, from, to, reporter, currentOrigin, kiroModelID, isAgentic, isChatOnly) + resp, err = e.executeWithRetry(ctx, auth, req, opts, accessToken, effectiveProfileArn, kiroPayload, body, from, to, reporter, currentOrigin, kiroModelID, isAgentic, isChatOnly) return resp, err } @@ -330,7 +344,12 @@ func (e *KiroExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Aut return nil, fmt.Errorf("kiro: access token not found in auth") } if profileArn == "" { - log.Warnf("kiro: profile ARN not found in auth, API calls may fail") + // Only warn if not using builder-id auth (which doesn't need profileArn) + if auth == nil || auth.Metadata == nil { + log.Debugf("kiro: profile ARN not found in auth (may be normal for builder-id)") + } else if authMethod, ok := auth.Metadata["auth_method"].(string); !ok || authMethod != "builder-id" { + log.Warnf("kiro: profile ARN not found in auth, API calls may fail") + } } reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth) @@ -370,10 +389,19 @@ func (e *KiroExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Aut currentOrigin = "CLI" } - kiroPayload := e.buildKiroPayload(body, kiroModelID, profileArn, currentOrigin, isAgentic, isChatOnly) + // Determine if profileArn should be included based on auth method + // profileArn is only needed for social auth (Google OAuth), not for builder-id (AWS SSO) + effectiveProfileArn := profileArn + if auth != nil && auth.Metadata != nil { + if authMethod, ok := auth.Metadata["auth_method"].(string); ok && authMethod == "builder-id" { + effectiveProfileArn = "" // Don't include profileArn for builder-id auth + } + } + + kiroPayload := e.buildKiroPayload(body, kiroModelID, effectiveProfileArn, currentOrigin, isAgentic, isChatOnly) // Execute stream with retry on 401/403 and 429 (quota exhausted) - return e.executeStreamWithRetry(ctx, auth, req, opts, accessToken, profileArn, kiroPayload, body, from, reporter, currentOrigin, kiroModelID, isAgentic, isChatOnly) + return e.executeStreamWithRetry(ctx, auth, req, opts, accessToken, effectiveProfileArn, kiroPayload, body, from, reporter, currentOrigin, kiroModelID, isAgentic, isChatOnly) } // executeStreamWithRetry performs the streaming HTTP request with automatic retry on auth errors. @@ -587,10 +615,10 @@ type kiroPayload struct { } type kiroConversationState struct { + ChatTriggerType string `json:"chatTriggerType"` // Required: "MANUAL" - must be first field ConversationID string `json:"conversationId"` - History []kiroHistoryMessage `json:"history"` CurrentMessage kiroCurrentMessage `json:"currentMessage"` - ChatTriggerType string `json:"chatTriggerType"` // Required: "MANUAL" + History []kiroHistoryMessage `json:"history,omitempty"` // Only include when non-empty } type kiroCurrentMessage struct { @@ -805,21 +833,18 @@ func (e *KiroExecutor) buildKiroPayload(claudeBody []byte, modelID, profileArn, }} } + // Build payload with correct field order (matches struct definition) + // Note: history is omitempty, so nil/empty slice won't be serialized payload := kiroPayload{ ConversationState: kiroConversationState{ + ChatTriggerType: "MANUAL", // Required by Kiro API - must be first ConversationID: uuid.New().String(), - History: history, CurrentMessage: currentMessage, - ChatTriggerType: "MANUAL", // Required by Kiro API + History: history, // Will be omitted if empty due to omitempty tag }, ProfileArn: profileArn, } - // Ensure history is not nil (empty array) - if payload.ConversationState.History == nil { - payload.ConversationState.History = []kiroHistoryMessage{} - } - result, err := json.Marshal(payload) if err != nil { log.Debugf("kiro: failed to marshal payload: %v", err) @@ -1001,6 +1026,12 @@ func (e *KiroExecutor) parseEventStream(body io.Reader) (string, []kiroToolUse, return content.String(), toolUses, usageInfo, fmt.Errorf("failed to read message: %w", err) } + // Validate headersLen to prevent slice out of bounds + if headersLen+4 > uint32(len(remaining)) { + log.Warnf("kiro: invalid headersLen %d exceeds remaining buffer %d", headersLen, len(remaining)) + continue + } + // Extract event type from headers eventType := e.extractEventType(remaining[:headersLen+4]) @@ -1111,6 +1142,11 @@ func (e *KiroExecutor) parseEventStream(body io.Reader) (string, []kiroToolUse, // Deduplicate all tool uses toolUses = deduplicateToolUses(toolUses) + // OPTIONAL: Filter out [HELIOS_CHK] debug blocks from Kiro/Amazon Q internal systems + // These blocks contain internal state tracking information that should not be exposed to users. + // To enable filtering, uncomment the following line: + // cleanedContent = filterHeliosDebugInfo(cleanedContent) + return cleanedContent, toolUses, usageInfo, nil } @@ -1279,6 +1315,51 @@ func (e *KiroExecutor) streamToChannel(ctx context.Context, body io.Reader, out prelude := make([]byte, 8) _, err := io.ReadFull(reader, prelude) if err == io.EOF { + // Flush any incomplete tool use before ending stream + if currentToolUse != nil && !processedIDs[currentToolUse.toolUseID] { + log.Warnf("kiro: flushing incomplete tool use at EOF: %s (ID: %s)", currentToolUse.name, currentToolUse.toolUseID) + fullInput := currentToolUse.inputBuffer.String() + repairedJSON := repairJSON(fullInput) + var finalInput map[string]interface{} + if err := json.Unmarshal([]byte(repairedJSON), &finalInput); err != nil { + log.Warnf("kiro: failed to parse incomplete tool input at EOF: %v", err) + finalInput = make(map[string]interface{}) + } + + processedIDs[currentToolUse.toolUseID] = true + contentBlockIndex++ + + // Send tool_use content block + blockStart := e.buildClaudeContentBlockStartEvent(contentBlockIndex, "tool_use", currentToolUse.toolUseID, currentToolUse.name) + sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, blockStart, &translatorParam) + for _, chunk := range sseData { + if chunk != "" { + out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")} + } + } + + // Send tool input as delta + inputBytes, _ := json.Marshal(finalInput) + inputDelta := e.buildClaudeInputJsonDeltaEvent(string(inputBytes), contentBlockIndex) + sseData = sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, inputDelta, &translatorParam) + for _, chunk := range sseData { + if chunk != "" { + out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")} + } + } + + // Close block + blockStop := e.buildClaudeContentBlockStopEvent(contentBlockIndex) + sseData = sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, blockStop, &translatorParam) + for _, chunk := range sseData { + if chunk != "" { + out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")} + } + } + + hasToolUses = true + currentToolUse = nil + } break } if err != nil { @@ -1304,6 +1385,12 @@ func (e *KiroExecutor) streamToChannel(ctx context.Context, body io.Reader, out return } + // Validate headersLen to prevent slice out of bounds + if headersLen+4 > uint32(len(remaining)) { + log.Warnf("kiro: invalid headersLen %d exceeds remaining buffer %d", headersLen, len(remaining)) + continue + } + eventType := e.extractEventType(remaining[:headersLen+4]) payloadStart := 4 + headersLen @@ -1317,6 +1404,7 @@ func (e *KiroExecutor) streamToChannel(ctx context.Context, body io.Reader, out var event map[string]interface{} if err := json.Unmarshal(payload, &event); err != nil { + log.Warnf("kiro: failed to unmarshal event payload: %v, raw: %s", err, string(payload)) continue } @@ -1553,9 +1641,18 @@ func (e *KiroExecutor) streamToChannel(ctx context.Context, body io.Reader, out stopReason = "tool_use" } - // Send message_delta and message_stop - msgStop := e.buildClaudeMessageStopEvent(stopReason, totalUsage) - sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, msgStop, &translatorParam) + // Send message_delta event + msgDelta := e.buildClaudeMessageDeltaEvent(stopReason, totalUsage) + sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, msgDelta, &translatorParam) + for _, chunk := range sseData { + if chunk != "" { + out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")} + } + } + + // Send message_stop event separately + msgStop := e.buildClaudeMessageStopOnlyEvent() + sseData = sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, msgStop, &translatorParam) for _, chunk := range sseData { if chunk != "" { out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")} @@ -1646,8 +1743,8 @@ func (e *KiroExecutor) buildClaudeContentBlockStopEvent(index int) []byte { return []byte("event: content_block_stop\ndata: " + string(result)) } -func (e *KiroExecutor) buildClaudeMessageStopEvent(stopReason string, usageInfo usage.Detail) []byte { - // First message_delta +// buildClaudeMessageDeltaEvent creates the message_delta event with stop_reason and usage. +func (e *KiroExecutor) buildClaudeMessageDeltaEvent(stopReason string, usageInfo usage.Detail) []byte { deltaEvent := map[string]interface{}{ "type": "message_delta", "delta": map[string]interface{}{ @@ -1660,14 +1757,16 @@ func (e *KiroExecutor) buildClaudeMessageStopEvent(stopReason string, usageInfo }, } deltaResult, _ := json.Marshal(deltaEvent) + return []byte("event: message_delta\ndata: " + string(deltaResult)) +} - // Then message_stop +// buildClaudeMessageStopOnlyEvent creates only the message_stop event. +func (e *KiroExecutor) buildClaudeMessageStopOnlyEvent() []byte { stopEvent := map[string]interface{}{ "type": "message_stop", } stopResult, _ := json.Marshal(stopEvent) - - return []byte("event: message_delta\ndata: " + string(deltaResult) + "\n\nevent: message_stop\ndata: " + string(stopResult)) + return []byte("event: message_stop\ndata: " + string(stopResult)) } // buildClaudeFinalEvent constructs the final Claude-style event. @@ -1873,6 +1972,12 @@ func (e *KiroExecutor) streamEventStream(ctx context.Context, body io.Reader, c return fmt.Errorf("failed to read message: %w", err) } + // Validate headersLen to prevent slice out of bounds + if headersLen+4 > uint32(len(remaining)) { + log.Warnf("kiro: invalid headersLen %d exceeds remaining buffer %d", headersLen, len(remaining)) + continue + } + eventType := e.extractEventType(remaining[:headersLen+4]) payloadStart := 4 + headersLen @@ -1886,6 +1991,7 @@ func (e *KiroExecutor) streamEventStream(ctx context.Context, body io.Reader, c var event map[string]interface{} if err := json.Unmarshal(payload, &event); err != nil { + log.Warnf("kiro: failed to unmarshal event payload: %v, raw: %s", err, string(payload)) continue } @@ -1983,9 +2089,19 @@ func (e *KiroExecutor) streamEventStream(ctx context.Context, body io.Reader, c } totalUsage.TotalTokens = totalUsage.InputTokens + totalUsage.OutputTokens - // Always use end_turn (no tool_use support) - msgStop := e.buildClaudeMessageStopEvent("end_turn", totalUsage) - sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, msgStop, &translatorParam) + // Send message_delta event + msgDelta := e.buildClaudeMessageDeltaEvent("end_turn", totalUsage) + sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, msgDelta, &translatorParam) + for _, chunk := range sseData { + if chunk != "" { + c.Writer.Write([]byte(chunk + "\n\n")) + } + } + c.Writer.Flush() + + // Send message_stop event separately + msgStop := e.buildClaudeMessageStopOnlyEvent() + sseData = sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, msgStop, &translatorParam) for _, chunk := range sseData { if chunk != "" { c.Writer.Write([]byte(chunk + "\n\n")) @@ -2079,8 +2195,12 @@ var ( whitespaceCollapsePattern = regexp.MustCompile(`\s+`) // trailingCommaPattern matches trailing commas before closing braces/brackets trailingCommaPattern = regexp.MustCompile(`,\s*([}\]])`) - // unquotedKeyPattern matches unquoted JSON keys that need quoting - unquotedKeyPattern = regexp.MustCompile(`([{,]\s*)([a-zA-Z_][a-zA-Z0-9_]*)\s*:`) + + // heliosDebugPattern matches [HELIOS_CHK] debug blocks from Kiro/Amazon Q internal systems + // These blocks contain internal state tracking information that should not be exposed to users + // Format: [HELIOS_CHK]\nPhase: ...\nHelios: ...\nAction: ...\nTask: ...\nContext-Map: ...\nNext: ... + // The pattern matches from [HELIOS_CHK] to the end of the "Next:" line (which may span multiple lines until double newline) + heliosDebugPattern = regexp.MustCompile(`(?s)\[HELIOS_CHK\].*?(?:Next:.*?)(?:\n\n|\z)`) ) // parseEmbeddedToolCalls extracts [Called tool_name with args: {...}] format from text. @@ -2249,13 +2369,70 @@ func findMatchingBracket(text string, startPos int) int { // Based on AIClient-2-API's JSON repair implementation. // Uses pre-compiled regex patterns for performance. func repairJSON(raw string) string { + // First, escape unescaped newlines/tabs within JSON string values + repaired := escapeNewlinesInStrings(raw) // Remove trailing commas before closing braces/brackets - repaired := trailingCommaPattern.ReplaceAllString(raw, "$1") - // Fix unquoted keys (basic attempt - handles simple cases) - repaired = unquotedKeyPattern.ReplaceAllString(repaired, `$1"$2":`) + repaired = trailingCommaPattern.ReplaceAllString(repaired, "$1") + // Note: unquotedKeyPattern removed - it incorrectly matches content inside + // JSON string values (e.g., "classDef fill:#1a1a2e,stroke:#00d4ff" would have + // ",stroke:" incorrectly treated as an unquoted key) return repaired } +// escapeNewlinesInStrings escapes literal newlines, tabs, and other control characters +// that appear inside JSON string values. This handles cases where streaming fragments +// contain unescaped control characters within string content. +func escapeNewlinesInStrings(raw string) string { + var result strings.Builder + result.Grow(len(raw) + 100) // Pre-allocate with some extra space + + inString := false + escaped := false + + for i := 0; i < len(raw); i++ { + c := raw[i] + + if escaped { + // Previous character was backslash, this is an escape sequence + result.WriteByte(c) + escaped = false + continue + } + + if c == '\\' && inString { + // Start of escape sequence + result.WriteByte(c) + escaped = true + continue + } + + if c == '"' { + // Toggle string state + inString = !inString + result.WriteByte(c) + continue + } + + if inString { + // Inside a string, escape control characters + switch c { + case '\n': + result.WriteString("\\n") + case '\r': + result.WriteString("\\r") + case '\t': + result.WriteString("\\t") + default: + result.WriteByte(c) + } + } else { + result.WriteByte(c) + } + } + + return result.String() +} + // processToolUseEvent handles a toolUseEvent from the Kiro stream. // It accumulates input fragments and emits tool_use blocks when complete. // Returns events to emit and updated state. @@ -2330,6 +2507,8 @@ func (e *KiroExecutor) processToolUseEvent(event map[string]interface{}, current // Accumulate input fragments if currentToolUse != nil && inputFragment != "" { + // Accumulate fragments directly - they form valid JSON when combined + // The fragments are already decoded from JSON, so we just concatenate them currentToolUse.inputBuffer.WriteString(inputFragment) log.Debugf("kiro: accumulated input fragment, total length: %d", currentToolUse.inputBuffer.Len()) } @@ -2389,3 +2568,39 @@ func deduplicateToolUses(toolUses []kiroToolUse) []kiroToolUse { return unique } + +// filterHeliosDebugInfo removes [HELIOS_CHK] debug blocks from Kiro/Amazon Q responses. +// These blocks contain internal state tracking information from the Helios system +// that should not be exposed to end users. +// +// The [HELIOS_CHK] block format typically looks like: +// +// [HELIOS_CHK] +// Phase: E (Evaluate & Evolve) +// Helios: [P1_Intent: OK] [P2_Research: OK] [P3_Strategy: OK] +// Action: [TEXT_RESPONSE] +// Task: #T001 - Some task description +// Context-Map: [SYNCED] +// Next: Some next action description... +// +// This function is currently DISABLED (commented out in callers) pending further testing. +// To enable, uncomment the filterHeliosDebugInfo calls in parseEventStream() and streamToChannel(). +func filterHeliosDebugInfo(content string) string { + if !strings.Contains(content, "[HELIOS_CHK]") { + return content + } + + // Remove [HELIOS_CHK] blocks + filtered := heliosDebugPattern.ReplaceAllString(content, "") + + // Clean up any resulting double newlines or leading/trailing whitespace + filtered = strings.TrimSpace(filtered) + + // Log when filtering occurs for debugging purposes + if filtered != content { + log.Debugf("kiro: filtered HELIOS debug info from response (original len: %d, filtered len: %d)", + len(content), len(filtered)) + } + + return filtered +} diff --git a/internal/translator/kiro/openai/chat-completions/kiro_openai_response.go b/internal/translator/kiro/openai/chat-completions/kiro_openai_response.go index df75cc07..d56c94ac 100644 --- a/internal/translator/kiro/openai/chat-completions/kiro_openai_response.go +++ b/internal/translator/kiro/openai/chat-completions/kiro_openai_response.go @@ -171,7 +171,7 @@ func convertClaudeEventToOpenAI(jsonStr string, model string) []string { return results case "message_delta": - // Final message delta with stop_reason + // Final message delta with stop_reason and usage stopReason := root.Get("delta.stop_reason").String() if stopReason != "" { finishReason := "stop" @@ -196,6 +196,19 @@ func convertClaudeEventToOpenAI(jsonStr string, model string) []string { }, }, } + + // Extract and include usage information from message_delta event + usage := root.Get("usage") + if usage.Exists() { + inputTokens := usage.Get("input_tokens").Int() + outputTokens := usage.Get("output_tokens").Int() + response["usage"] = map[string]interface{}{ + "prompt_tokens": inputTokens, + "completion_tokens": outputTokens, + "total_tokens": inputTokens + outputTokens, + } + } + result, _ := json.Marshal(response) results = append(results, string(result)) }