feat(kiro): enhance request format, stream handling, and usage tracking

## English Description

### Request Format Fixes
- Fix conversationState field order (chatTriggerType must be first)
- Add conditional profileArn inclusion based on auth method
- builder-id auth (AWS SSO) doesn't require profileArn
- social auth (Google OAuth) requires profileArn

### Stream Processing Enhancements
- Add headersLen boundary validation to prevent slice out of bounds
- Handle incomplete tool use at EOF by flushing pending data
- Separate message_delta and message_stop events for proper streaming
- Add error logging for JSON unmarshal failures

### JSON Repair Improvements
- Add escapeNewlinesInStrings() to handle control characters in JSON strings
- Remove incorrect unquotedKeyPattern that broke valid JSON content
- Fix handling of streaming fragments with embedded newlines/tabs

### Debug Info Filtering (Optional)
- Add filterHeliosDebugInfo() to remove [HELIOS_CHK] blocks
- Pattern matches internal state tracking from Kiro/Amazon Q
- Currently disabled pending further testing

### Usage Tracking
- Add usage information extraction in message_delta response
- Include prompt_tokens, completion_tokens, total_tokens in OpenAI format

---

## 中文描述

### 请求格式修复
- 修复 conversationState 字段顺序(chatTriggerType 必须在第一位)
- 根据认证方式条件性包含 profileArn
- builder-id 认证(AWS SSO)不需要 profileArn
- social 认证(Google OAuth)需要 profileArn

### 流处理增强
- 添加 headersLen 边界验证,防止切片越界
- 在 EOF 时处理未完成的工具调用,刷新待处理数据
- 分离 message_delta 和 message_stop 事件以实现正确的流式传输
- 添加 JSON 反序列化失败的错误日志

### JSON 修复改进
- 添加 escapeNewlinesInStrings() 处理 JSON 字符串中的控制字符
- 移除错误的 unquotedKeyPattern,该模式会破坏有效的 JSON 内容
- 修复包含嵌入换行符/制表符的流式片段处理

### 调试信息过滤(可选)
- 添加 filterHeliosDebugInfo() 移除 [HELIOS_CHK] 块
- 模式匹配来自 Kiro/Amazon Q 的内部状态跟踪信息
- 目前已禁用,等待进一步测试

### 使用量跟踪
- 在 message_delta 响应中添加 usage 信息提取
- 以 OpenAI 格式包含 prompt_tokens、completion_tokens、total_tokens
This commit is contained in:
Ravens2121
2025-12-11 05:24:21 +08:00
parent b27a175fef
commit cd4e84a360
2 changed files with 260 additions and 32 deletions

View File

@@ -121,7 +121,12 @@ func (e *KiroExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req
return resp, fmt.Errorf("kiro: access token not found in auth")
}
if profileArn == "" {
log.Warnf("kiro: profile ARN not found in auth, API calls may fail")
// Only warn if not using builder-id auth (which doesn't need profileArn)
if auth == nil || auth.Metadata == nil {
log.Debugf("kiro: profile ARN not found in auth (may be normal for builder-id)")
} else if authMethod, ok := auth.Metadata["auth_method"].(string); !ok || authMethod != "builder-id" {
log.Warnf("kiro: profile ARN not found in auth, API calls may fail")
}
}
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
@@ -161,10 +166,19 @@ func (e *KiroExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req
currentOrigin = "CLI"
}
kiroPayload := e.buildKiroPayload(body, kiroModelID, profileArn, currentOrigin, isAgentic, isChatOnly)
// Determine if profileArn should be included based on auth method
// profileArn is only needed for social auth (Google OAuth), not for builder-id (AWS SSO)
effectiveProfileArn := profileArn
if auth != nil && auth.Metadata != nil {
if authMethod, ok := auth.Metadata["auth_method"].(string); ok && authMethod == "builder-id" {
effectiveProfileArn = "" // Don't include profileArn for builder-id auth
}
}
kiroPayload := e.buildKiroPayload(body, kiroModelID, effectiveProfileArn, currentOrigin, isAgentic, isChatOnly)
// Execute with retry on 401/403 and 429 (quota exhausted)
resp, err = e.executeWithRetry(ctx, auth, req, opts, accessToken, profileArn, kiroPayload, body, from, to, reporter, currentOrigin, kiroModelID, isAgentic, isChatOnly)
resp, err = e.executeWithRetry(ctx, auth, req, opts, accessToken, effectiveProfileArn, kiroPayload, body, from, to, reporter, currentOrigin, kiroModelID, isAgentic, isChatOnly)
return resp, err
}
@@ -330,7 +344,12 @@ func (e *KiroExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Aut
return nil, fmt.Errorf("kiro: access token not found in auth")
}
if profileArn == "" {
log.Warnf("kiro: profile ARN not found in auth, API calls may fail")
// Only warn if not using builder-id auth (which doesn't need profileArn)
if auth == nil || auth.Metadata == nil {
log.Debugf("kiro: profile ARN not found in auth (may be normal for builder-id)")
} else if authMethod, ok := auth.Metadata["auth_method"].(string); !ok || authMethod != "builder-id" {
log.Warnf("kiro: profile ARN not found in auth, API calls may fail")
}
}
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
@@ -370,10 +389,19 @@ func (e *KiroExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Aut
currentOrigin = "CLI"
}
kiroPayload := e.buildKiroPayload(body, kiroModelID, profileArn, currentOrigin, isAgentic, isChatOnly)
// Determine if profileArn should be included based on auth method
// profileArn is only needed for social auth (Google OAuth), not for builder-id (AWS SSO)
effectiveProfileArn := profileArn
if auth != nil && auth.Metadata != nil {
if authMethod, ok := auth.Metadata["auth_method"].(string); ok && authMethod == "builder-id" {
effectiveProfileArn = "" // Don't include profileArn for builder-id auth
}
}
kiroPayload := e.buildKiroPayload(body, kiroModelID, effectiveProfileArn, currentOrigin, isAgentic, isChatOnly)
// Execute stream with retry on 401/403 and 429 (quota exhausted)
return e.executeStreamWithRetry(ctx, auth, req, opts, accessToken, profileArn, kiroPayload, body, from, reporter, currentOrigin, kiroModelID, isAgentic, isChatOnly)
return e.executeStreamWithRetry(ctx, auth, req, opts, accessToken, effectiveProfileArn, kiroPayload, body, from, reporter, currentOrigin, kiroModelID, isAgentic, isChatOnly)
}
// executeStreamWithRetry performs the streaming HTTP request with automatic retry on auth errors.
@@ -587,10 +615,10 @@ type kiroPayload struct {
}
type kiroConversationState struct {
ChatTriggerType string `json:"chatTriggerType"` // Required: "MANUAL" - must be first field
ConversationID string `json:"conversationId"`
History []kiroHistoryMessage `json:"history"`
CurrentMessage kiroCurrentMessage `json:"currentMessage"`
ChatTriggerType string `json:"chatTriggerType"` // Required: "MANUAL"
History []kiroHistoryMessage `json:"history,omitempty"` // Only include when non-empty
}
type kiroCurrentMessage struct {
@@ -805,21 +833,18 @@ func (e *KiroExecutor) buildKiroPayload(claudeBody []byte, modelID, profileArn,
}}
}
// Build payload with correct field order (matches struct definition)
// Note: history is omitempty, so nil/empty slice won't be serialized
payload := kiroPayload{
ConversationState: kiroConversationState{
ChatTriggerType: "MANUAL", // Required by Kiro API - must be first
ConversationID: uuid.New().String(),
History: history,
CurrentMessage: currentMessage,
ChatTriggerType: "MANUAL", // Required by Kiro API
History: history, // Will be omitted if empty due to omitempty tag
},
ProfileArn: profileArn,
}
// Ensure history is not nil (empty array)
if payload.ConversationState.History == nil {
payload.ConversationState.History = []kiroHistoryMessage{}
}
result, err := json.Marshal(payload)
if err != nil {
log.Debugf("kiro: failed to marshal payload: %v", err)
@@ -1001,6 +1026,12 @@ func (e *KiroExecutor) parseEventStream(body io.Reader) (string, []kiroToolUse,
return content.String(), toolUses, usageInfo, fmt.Errorf("failed to read message: %w", err)
}
// Validate headersLen to prevent slice out of bounds
if headersLen+4 > uint32(len(remaining)) {
log.Warnf("kiro: invalid headersLen %d exceeds remaining buffer %d", headersLen, len(remaining))
continue
}
// Extract event type from headers
eventType := e.extractEventType(remaining[:headersLen+4])
@@ -1111,6 +1142,11 @@ func (e *KiroExecutor) parseEventStream(body io.Reader) (string, []kiroToolUse,
// Deduplicate all tool uses
toolUses = deduplicateToolUses(toolUses)
// OPTIONAL: Filter out [HELIOS_CHK] debug blocks from Kiro/Amazon Q internal systems
// These blocks contain internal state tracking information that should not be exposed to users.
// To enable filtering, uncomment the following line:
// cleanedContent = filterHeliosDebugInfo(cleanedContent)
return cleanedContent, toolUses, usageInfo, nil
}
@@ -1279,6 +1315,51 @@ func (e *KiroExecutor) streamToChannel(ctx context.Context, body io.Reader, out
prelude := make([]byte, 8)
_, err := io.ReadFull(reader, prelude)
if err == io.EOF {
// Flush any incomplete tool use before ending stream
if currentToolUse != nil && !processedIDs[currentToolUse.toolUseID] {
log.Warnf("kiro: flushing incomplete tool use at EOF: %s (ID: %s)", currentToolUse.name, currentToolUse.toolUseID)
fullInput := currentToolUse.inputBuffer.String()
repairedJSON := repairJSON(fullInput)
var finalInput map[string]interface{}
if err := json.Unmarshal([]byte(repairedJSON), &finalInput); err != nil {
log.Warnf("kiro: failed to parse incomplete tool input at EOF: %v", err)
finalInput = make(map[string]interface{})
}
processedIDs[currentToolUse.toolUseID] = true
contentBlockIndex++
// Send tool_use content block
blockStart := e.buildClaudeContentBlockStartEvent(contentBlockIndex, "tool_use", currentToolUse.toolUseID, currentToolUse.name)
sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, blockStart, &translatorParam)
for _, chunk := range sseData {
if chunk != "" {
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")}
}
}
// Send tool input as delta
inputBytes, _ := json.Marshal(finalInput)
inputDelta := e.buildClaudeInputJsonDeltaEvent(string(inputBytes), contentBlockIndex)
sseData = sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, inputDelta, &translatorParam)
for _, chunk := range sseData {
if chunk != "" {
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")}
}
}
// Close block
blockStop := e.buildClaudeContentBlockStopEvent(contentBlockIndex)
sseData = sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, blockStop, &translatorParam)
for _, chunk := range sseData {
if chunk != "" {
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")}
}
}
hasToolUses = true
currentToolUse = nil
}
break
}
if err != nil {
@@ -1304,6 +1385,12 @@ func (e *KiroExecutor) streamToChannel(ctx context.Context, body io.Reader, out
return
}
// Validate headersLen to prevent slice out of bounds
if headersLen+4 > uint32(len(remaining)) {
log.Warnf("kiro: invalid headersLen %d exceeds remaining buffer %d", headersLen, len(remaining))
continue
}
eventType := e.extractEventType(remaining[:headersLen+4])
payloadStart := 4 + headersLen
@@ -1317,6 +1404,7 @@ func (e *KiroExecutor) streamToChannel(ctx context.Context, body io.Reader, out
var event map[string]interface{}
if err := json.Unmarshal(payload, &event); err != nil {
log.Warnf("kiro: failed to unmarshal event payload: %v, raw: %s", err, string(payload))
continue
}
@@ -1553,9 +1641,18 @@ func (e *KiroExecutor) streamToChannel(ctx context.Context, body io.Reader, out
stopReason = "tool_use"
}
// Send message_delta and message_stop
msgStop := e.buildClaudeMessageStopEvent(stopReason, totalUsage)
sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, msgStop, &translatorParam)
// Send message_delta event
msgDelta := e.buildClaudeMessageDeltaEvent(stopReason, totalUsage)
sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, msgDelta, &translatorParam)
for _, chunk := range sseData {
if chunk != "" {
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")}
}
}
// Send message_stop event separately
msgStop := e.buildClaudeMessageStopOnlyEvent()
sseData = sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, msgStop, &translatorParam)
for _, chunk := range sseData {
if chunk != "" {
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")}
@@ -1646,8 +1743,8 @@ func (e *KiroExecutor) buildClaudeContentBlockStopEvent(index int) []byte {
return []byte("event: content_block_stop\ndata: " + string(result))
}
func (e *KiroExecutor) buildClaudeMessageStopEvent(stopReason string, usageInfo usage.Detail) []byte {
// First message_delta
// buildClaudeMessageDeltaEvent creates the message_delta event with stop_reason and usage.
func (e *KiroExecutor) buildClaudeMessageDeltaEvent(stopReason string, usageInfo usage.Detail) []byte {
deltaEvent := map[string]interface{}{
"type": "message_delta",
"delta": map[string]interface{}{
@@ -1660,14 +1757,16 @@ func (e *KiroExecutor) buildClaudeMessageStopEvent(stopReason string, usageInfo
},
}
deltaResult, _ := json.Marshal(deltaEvent)
return []byte("event: message_delta\ndata: " + string(deltaResult))
}
// Then message_stop
// buildClaudeMessageStopOnlyEvent creates only the message_stop event.
func (e *KiroExecutor) buildClaudeMessageStopOnlyEvent() []byte {
stopEvent := map[string]interface{}{
"type": "message_stop",
}
stopResult, _ := json.Marshal(stopEvent)
return []byte("event: message_delta\ndata: " + string(deltaResult) + "\n\nevent: message_stop\ndata: " + string(stopResult))
return []byte("event: message_stop\ndata: " + string(stopResult))
}
// buildClaudeFinalEvent constructs the final Claude-style event.
@@ -1873,6 +1972,12 @@ func (e *KiroExecutor) streamEventStream(ctx context.Context, body io.Reader, c
return fmt.Errorf("failed to read message: %w", err)
}
// Validate headersLen to prevent slice out of bounds
if headersLen+4 > uint32(len(remaining)) {
log.Warnf("kiro: invalid headersLen %d exceeds remaining buffer %d", headersLen, len(remaining))
continue
}
eventType := e.extractEventType(remaining[:headersLen+4])
payloadStart := 4 + headersLen
@@ -1886,6 +1991,7 @@ func (e *KiroExecutor) streamEventStream(ctx context.Context, body io.Reader, c
var event map[string]interface{}
if err := json.Unmarshal(payload, &event); err != nil {
log.Warnf("kiro: failed to unmarshal event payload: %v, raw: %s", err, string(payload))
continue
}
@@ -1983,9 +2089,19 @@ func (e *KiroExecutor) streamEventStream(ctx context.Context, body io.Reader, c
}
totalUsage.TotalTokens = totalUsage.InputTokens + totalUsage.OutputTokens
// Always use end_turn (no tool_use support)
msgStop := e.buildClaudeMessageStopEvent("end_turn", totalUsage)
sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, msgStop, &translatorParam)
// Send message_delta event
msgDelta := e.buildClaudeMessageDeltaEvent("end_turn", totalUsage)
sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, msgDelta, &translatorParam)
for _, chunk := range sseData {
if chunk != "" {
c.Writer.Write([]byte(chunk + "\n\n"))
}
}
c.Writer.Flush()
// Send message_stop event separately
msgStop := e.buildClaudeMessageStopOnlyEvent()
sseData = sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, msgStop, &translatorParam)
for _, chunk := range sseData {
if chunk != "" {
c.Writer.Write([]byte(chunk + "\n\n"))
@@ -2079,8 +2195,12 @@ var (
whitespaceCollapsePattern = regexp.MustCompile(`\s+`)
// trailingCommaPattern matches trailing commas before closing braces/brackets
trailingCommaPattern = regexp.MustCompile(`,\s*([}\]])`)
// unquotedKeyPattern matches unquoted JSON keys that need quoting
unquotedKeyPattern = regexp.MustCompile(`([{,]\s*)([a-zA-Z_][a-zA-Z0-9_]*)\s*:`)
// heliosDebugPattern matches [HELIOS_CHK] debug blocks from Kiro/Amazon Q internal systems
// These blocks contain internal state tracking information that should not be exposed to users
// Format: [HELIOS_CHK]\nPhase: ...\nHelios: ...\nAction: ...\nTask: ...\nContext-Map: ...\nNext: ...
// The pattern matches from [HELIOS_CHK] to the end of the "Next:" line (which may span multiple lines until double newline)
heliosDebugPattern = regexp.MustCompile(`(?s)\[HELIOS_CHK\].*?(?:Next:.*?)(?:\n\n|\z)`)
)
// parseEmbeddedToolCalls extracts [Called tool_name with args: {...}] format from text.
@@ -2249,13 +2369,70 @@ func findMatchingBracket(text string, startPos int) int {
// Based on AIClient-2-API's JSON repair implementation.
// Uses pre-compiled regex patterns for performance.
func repairJSON(raw string) string {
// First, escape unescaped newlines/tabs within JSON string values
repaired := escapeNewlinesInStrings(raw)
// Remove trailing commas before closing braces/brackets
repaired := trailingCommaPattern.ReplaceAllString(raw, "$1")
// Fix unquoted keys (basic attempt - handles simple cases)
repaired = unquotedKeyPattern.ReplaceAllString(repaired, `$1"$2":`)
repaired = trailingCommaPattern.ReplaceAllString(repaired, "$1")
// Note: unquotedKeyPattern removed - it incorrectly matches content inside
// JSON string values (e.g., "classDef fill:#1a1a2e,stroke:#00d4ff" would have
// ",stroke:" incorrectly treated as an unquoted key)
return repaired
}
// escapeNewlinesInStrings escapes literal newlines, tabs, and other control characters
// that appear inside JSON string values. This handles cases where streaming fragments
// contain unescaped control characters within string content.
func escapeNewlinesInStrings(raw string) string {
var result strings.Builder
result.Grow(len(raw) + 100) // Pre-allocate with some extra space
inString := false
escaped := false
for i := 0; i < len(raw); i++ {
c := raw[i]
if escaped {
// Previous character was backslash, this is an escape sequence
result.WriteByte(c)
escaped = false
continue
}
if c == '\\' && inString {
// Start of escape sequence
result.WriteByte(c)
escaped = true
continue
}
if c == '"' {
// Toggle string state
inString = !inString
result.WriteByte(c)
continue
}
if inString {
// Inside a string, escape control characters
switch c {
case '\n':
result.WriteString("\\n")
case '\r':
result.WriteString("\\r")
case '\t':
result.WriteString("\\t")
default:
result.WriteByte(c)
}
} else {
result.WriteByte(c)
}
}
return result.String()
}
// processToolUseEvent handles a toolUseEvent from the Kiro stream.
// It accumulates input fragments and emits tool_use blocks when complete.
// Returns events to emit and updated state.
@@ -2330,6 +2507,8 @@ func (e *KiroExecutor) processToolUseEvent(event map[string]interface{}, current
// Accumulate input fragments
if currentToolUse != nil && inputFragment != "" {
// Accumulate fragments directly - they form valid JSON when combined
// The fragments are already decoded from JSON, so we just concatenate them
currentToolUse.inputBuffer.WriteString(inputFragment)
log.Debugf("kiro: accumulated input fragment, total length: %d", currentToolUse.inputBuffer.Len())
}
@@ -2389,3 +2568,39 @@ func deduplicateToolUses(toolUses []kiroToolUse) []kiroToolUse {
return unique
}
// filterHeliosDebugInfo removes [HELIOS_CHK] debug blocks from Kiro/Amazon Q responses.
// These blocks contain internal state tracking information from the Helios system
// that should not be exposed to end users.
//
// The [HELIOS_CHK] block format typically looks like:
//
// [HELIOS_CHK]
// Phase: E (Evaluate & Evolve)
// Helios: [P1_Intent: OK] [P2_Research: OK] [P3_Strategy: OK]
// Action: [TEXT_RESPONSE]
// Task: #T001 - Some task description
// Context-Map: [SYNCED]
// Next: Some next action description...
//
// This function is currently DISABLED (commented out in callers) pending further testing.
// To enable, uncomment the filterHeliosDebugInfo calls in parseEventStream() and streamToChannel().
func filterHeliosDebugInfo(content string) string {
if !strings.Contains(content, "[HELIOS_CHK]") {
return content
}
// Remove [HELIOS_CHK] blocks
filtered := heliosDebugPattern.ReplaceAllString(content, "")
// Clean up any resulting double newlines or leading/trailing whitespace
filtered = strings.TrimSpace(filtered)
// Log when filtering occurs for debugging purposes
if filtered != content {
log.Debugf("kiro: filtered HELIOS debug info from response (original len: %d, filtered len: %d)",
len(content), len(filtered))
}
return filtered
}

View File

@@ -171,7 +171,7 @@ func convertClaudeEventToOpenAI(jsonStr string, model string) []string {
return results
case "message_delta":
// Final message delta with stop_reason
// Final message delta with stop_reason and usage
stopReason := root.Get("delta.stop_reason").String()
if stopReason != "" {
finishReason := "stop"
@@ -196,6 +196,19 @@ func convertClaudeEventToOpenAI(jsonStr string, model string) []string {
},
},
}
// Extract and include usage information from message_delta event
usage := root.Get("usage")
if usage.Exists() {
inputTokens := usage.Get("input_tokens").Int()
outputTokens := usage.Get("output_tokens").Int()
response["usage"] = map[string]interface{}{
"prompt_tokens": inputTokens,
"completion_tokens": outputTokens,
"total_tokens": inputTokens + outputTokens,
}
}
result, _ := json.Marshal(response)
results = append(results, string(result))
}