Compare commits

..

4 Commits

Author SHA1 Message Date
Luis Pater
69fccf0015 Merge pull request #18 from Ravens2121/master
Kiro Executor Stability and API Compatibility Improvements
2025-12-11 08:35:22 +08:00
Ravens2121
6133bac226 feat(kiro): enhance Kiro executor stability and compatibility
## Changes Overview
This commit includes multiple improvements to the Kiro executor for better
stability, API compatibility, and code quality.

## Detailed Changes

### 1. Output Token Calculation Improvement (lines 317-330)
- Replace simple len(content)/4 estimation with tiktoken-based calculation
- Add fallback to character count estimation if tiktoken fails
- Improves token counting accuracy for usage tracking

### 2. Stream Handler Panic Recovery (lines 528-533)
- Add defer/recover block in streamToChannel goroutine
- Prevents single request crashes from affecting the entire service

### 3. Struct Field Reordering (lines 670-673)
- Reorder kiroToolResult struct fields: Content, Status, ToolUseID
- Ensures consistency with API expectations

### 4. Message Merging Function (lines 778-780, 2356-2483)
- Add mergeAdjacentMessages() to combine consecutive messages with same role
- Add helper functions: mergeMessageContent(), blockToMap(), createMergedMessage()
- Required by Kiro API which doesn't allow adjacent messages from same role

### 5. Empty Content Handling (lines 791-800)
- Add default content for empty history messages
- User messages with tool results: "Tool results provided."
- User messages without tool results: "Continue"

### 6. Assistant Last Message Handling (lines 811-830)
- Detect when last message is from assistant
- Create synthetic "Continue" user message to satisfy Kiro API requirements
- Kiro API requires currentMessage to be userInputMessage type

### 7. Duplicate Content Event Detection (lines 1650-1660)
- Track lastContentEvent to detect duplicate streaming events
- Skip redundant events to prevent duplicate content in responses
- Based on AIClient-2-API implementation for Kiro

### 8. Streaming Token Calculation Enhancement (lines 1785-1817)
- Add accumulatedContent buffer for streaming token calculation
- Use tiktoken for accurate output token counting during streaming
- Add fallback to character count estimation with proper logging

### 9. JSON Repair Enhancement (lines 2665-2818)
- Implement conservative JSON repair strategy
- First try to parse JSON directly - if valid, return unchanged
- Add bracket balancing detection and repair
- Only repair when necessary to avoid corrupting valid JSON
- Validate repaired JSON before returning

### 10. HELIOS_CHK Filtering Removal (lines 2500-2504, 3004-3039)
- Remove filterHeliosDebugInfo function
- Remove heliosDebugPattern regex
- HELIOS_CHK fields now handled by client-side processing

### 11. Comment Translation
- Translate Chinese comments to English for code consistency
- Affected areas: token calculation, buffer handling, message processing
2025-12-11 08:13:33 +08:00
Ravens2121
f302be5ce6 Merge branch 'router-for-me:main' into master 2025-12-11 05:40:10 +08:00
Ravens2121
cd4e84a360 feat(kiro): enhance request format, stream handling, and usage tracking
## English Description

### Request Format Fixes
- Fix conversationState field order (chatTriggerType must be first)
- Add conditional profileArn inclusion based on auth method
- builder-id auth (AWS SSO) doesn't require profileArn
- social auth (Google OAuth) requires profileArn

### Stream Processing Enhancements
- Add headersLen boundary validation to prevent slice out of bounds
- Handle incomplete tool use at EOF by flushing pending data
- Separate message_delta and message_stop events for proper streaming
- Add error logging for JSON unmarshal failures

### JSON Repair Improvements
- Add escapeNewlinesInStrings() to handle control characters in JSON strings
- Remove incorrect unquotedKeyPattern that broke valid JSON content
- Fix handling of streaming fragments with embedded newlines/tabs

### Debug Info Filtering (Optional)
- Add filterHeliosDebugInfo() to remove [HELIOS_CHK] blocks
- Pattern matches internal state tracking from Kiro/Amazon Q
- Currently disabled pending further testing

### Usage Tracking
- Add usage information extraction in message_delta response
- Include prompt_tokens, completion_tokens, total_tokens in OpenAI format

---

## 中文描述

### 请求格式修复
- 修复 conversationState 字段顺序(chatTriggerType 必须在第一位)
- 根据认证方式条件性包含 profileArn
- builder-id 认证(AWS SSO)不需要 profileArn
- social 认证(Google OAuth)需要 profileArn

### 流处理增强
- 添加 headersLen 边界验证,防止切片越界
- 在 EOF 时处理未完成的工具调用,刷新待处理数据
- 分离 message_delta 和 message_stop 事件以实现正确的流式传输
- 添加 JSON 反序列化失败的错误日志

### JSON 修复改进
- 添加 escapeNewlinesInStrings() 处理 JSON 字符串中的控制字符
- 移除错误的 unquotedKeyPattern,该模式会破坏有效的 JSON 内容
- 修复包含嵌入换行符/制表符的流式片段处理

### 调试信息过滤(可选)
- 添加 filterHeliosDebugInfo() 移除 [HELIOS_CHK] 块
- 模式匹配来自 Kiro/Amazon Q 的内部状态跟踪信息
- 目前已禁用,等待进一步测试

### 使用量跟踪
- 在 message_delta 响应中添加 usage 信息提取
- 以 OpenAI 格式包含 prompt_tokens、completion_tokens、total_tokens
2025-12-11 05:37:22 +08:00
2 changed files with 675 additions and 47 deletions

View File

@@ -121,7 +121,12 @@ func (e *KiroExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req
return resp, fmt.Errorf("kiro: access token not found in auth")
}
if profileArn == "" {
log.Warnf("kiro: profile ARN not found in auth, API calls may fail")
// Only warn if not using builder-id auth (which doesn't need profileArn)
if auth == nil || auth.Metadata == nil {
log.Debugf("kiro: profile ARN not found in auth (may be normal for builder-id)")
} else if authMethod, ok := auth.Metadata["auth_method"].(string); !ok || authMethod != "builder-id" {
log.Warnf("kiro: profile ARN not found in auth, API calls may fail")
}
}
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
@@ -161,10 +166,19 @@ func (e *KiroExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req
currentOrigin = "CLI"
}
kiroPayload := e.buildKiroPayload(body, kiroModelID, profileArn, currentOrigin, isAgentic, isChatOnly)
// Determine if profileArn should be included based on auth method
// profileArn is only needed for social auth (Google OAuth), not for builder-id (AWS SSO)
effectiveProfileArn := profileArn
if auth != nil && auth.Metadata != nil {
if authMethod, ok := auth.Metadata["auth_method"].(string); ok && authMethod == "builder-id" {
effectiveProfileArn = "" // Don't include profileArn for builder-id auth
}
}
kiroPayload := e.buildKiroPayload(body, kiroModelID, effectiveProfileArn, currentOrigin, isAgentic, isChatOnly)
// Execute with retry on 401/403 and 429 (quota exhausted)
resp, err = e.executeWithRetry(ctx, auth, req, opts, accessToken, profileArn, kiroPayload, body, from, to, reporter, currentOrigin, kiroModelID, isAgentic, isChatOnly)
resp, err = e.executeWithRetry(ctx, auth, req, opts, accessToken, effectiveProfileArn, kiroPayload, body, from, to, reporter, currentOrigin, kiroModelID, isAgentic, isChatOnly)
return resp, err
}
@@ -301,9 +315,18 @@ func (e *KiroExecutor) executeWithRetry(ctx context.Context, auth *cliproxyauth.
}
}
if len(content) > 0 {
usageInfo.OutputTokens = int64(len(content) / 4)
// Use tiktoken for more accurate output token calculation
if enc, encErr := tokenizerForModel(req.Model); encErr == nil {
if tokenCount, countErr := enc.Count(content); countErr == nil {
usageInfo.OutputTokens = int64(tokenCount)
}
}
// Fallback to character count estimation if tiktoken fails
if usageInfo.OutputTokens == 0 {
usageInfo.OutputTokens = 1
usageInfo.OutputTokens = int64(len(content) / 4)
if usageInfo.OutputTokens == 0 {
usageInfo.OutputTokens = 1
}
}
}
usageInfo.TotalTokens = usageInfo.InputTokens + usageInfo.OutputTokens
@@ -330,7 +353,12 @@ func (e *KiroExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Aut
return nil, fmt.Errorf("kiro: access token not found in auth")
}
if profileArn == "" {
log.Warnf("kiro: profile ARN not found in auth, API calls may fail")
// Only warn if not using builder-id auth (which doesn't need profileArn)
if auth == nil || auth.Metadata == nil {
log.Debugf("kiro: profile ARN not found in auth (may be normal for builder-id)")
} else if authMethod, ok := auth.Metadata["auth_method"].(string); !ok || authMethod != "builder-id" {
log.Warnf("kiro: profile ARN not found in auth, API calls may fail")
}
}
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
@@ -370,10 +398,19 @@ func (e *KiroExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Aut
currentOrigin = "CLI"
}
kiroPayload := e.buildKiroPayload(body, kiroModelID, profileArn, currentOrigin, isAgentic, isChatOnly)
// Determine if profileArn should be included based on auth method
// profileArn is only needed for social auth (Google OAuth), not for builder-id (AWS SSO)
effectiveProfileArn := profileArn
if auth != nil && auth.Metadata != nil {
if authMethod, ok := auth.Metadata["auth_method"].(string); ok && authMethod == "builder-id" {
effectiveProfileArn = "" // Don't include profileArn for builder-id auth
}
}
kiroPayload := e.buildKiroPayload(body, kiroModelID, effectiveProfileArn, currentOrigin, isAgentic, isChatOnly)
// Execute stream with retry on 401/403 and 429 (quota exhausted)
return e.executeStreamWithRetry(ctx, auth, req, opts, accessToken, profileArn, kiroPayload, body, from, reporter, currentOrigin, kiroModelID, isAgentic, isChatOnly)
return e.executeStreamWithRetry(ctx, auth, req, opts, accessToken, effectiveProfileArn, kiroPayload, body, from, reporter, currentOrigin, kiroModelID, isAgentic, isChatOnly)
}
// executeStreamWithRetry performs the streaming HTTP request with automatic retry on auth errors.
@@ -491,6 +528,12 @@ func (e *KiroExecutor) executeStreamWithRetry(ctx context.Context, auth *cliprox
go func(resp *http.Response) {
defer close(out)
defer func() {
if r := recover(); r != nil {
log.Errorf("kiro: panic in stream handler: %v", r)
out <- cliproxyexecutor.StreamChunk{Err: fmt.Errorf("internal error: %v", r)}
}
}()
defer func() {
if errClose := resp.Body.Close(); errClose != nil {
log.Errorf("response body close error: %v", errClose)
@@ -587,10 +630,10 @@ type kiroPayload struct {
}
type kiroConversationState struct {
ChatTriggerType string `json:"chatTriggerType"` // Required: "MANUAL" - must be first field
ConversationID string `json:"conversationId"`
History []kiroHistoryMessage `json:"history"`
CurrentMessage kiroCurrentMessage `json:"currentMessage"`
ChatTriggerType string `json:"chatTriggerType"` // Required: "MANUAL"
History []kiroHistoryMessage `json:"history,omitempty"` // Only include when non-empty
}
type kiroCurrentMessage struct {
@@ -627,9 +670,9 @@ type kiroUserInputMessageContext struct {
}
type kiroToolResult struct {
ToolUseID string `json:"toolUseId"`
Content []kiroTextContent `json:"content"`
Status string `json:"status"`
ToolUseID string `json:"toolUseId"`
}
type kiroTextContent struct {
@@ -735,7 +778,9 @@ func (e *KiroExecutor) buildKiroPayload(claudeBody []byte, modelID, profileArn,
var currentUserMsg *kiroUserInputMessage
var currentToolResults []kiroToolResult
messagesArray := messages.Array()
// Merge adjacent messages with the same role before processing
// This reduces API call complexity and improves compatibility
messagesArray := mergeAdjacentMessages(messages.Array())
for i, msg := range messagesArray {
role := msg.Get("role").String()
isLastMessage := i == len(messagesArray)-1
@@ -746,6 +791,14 @@ func (e *KiroExecutor) buildKiroPayload(claudeBody []byte, modelID, profileArn,
currentUserMsg = &userMsg
currentToolResults = toolResults
} else {
// CRITICAL: Kiro API requires content to be non-empty for history messages too
if strings.TrimSpace(userMsg.Content) == "" {
if len(toolResults) > 0 {
userMsg.Content = "Tool results provided."
} else {
userMsg.Content = "Continue"
}
}
// For history messages, embed tool results in context
if len(toolResults) > 0 {
userMsg.UserInputMessageContext = &kiroUserInputMessageContext{
@@ -758,9 +811,24 @@ func (e *KiroExecutor) buildKiroPayload(claudeBody []byte, modelID, profileArn,
}
} else if role == "assistant" {
assistantMsg := e.buildAssistantMessageStruct(msg)
history = append(history, kiroHistoryMessage{
AssistantResponseMessage: &assistantMsg,
})
// If this is the last message and it's an assistant message,
// we need to add it to history and create a "Continue" user message
// because Kiro API requires currentMessage to be userInputMessage type
if isLastMessage {
history = append(history, kiroHistoryMessage{
AssistantResponseMessage: &assistantMsg,
})
// Create a "Continue" user message as currentMessage
currentUserMsg = &kiroUserInputMessage{
Content: "Continue",
ModelID: modelID,
Origin: origin,
}
} else {
history = append(history, kiroHistoryMessage{
AssistantResponseMessage: &assistantMsg,
})
}
}
}
@@ -777,7 +845,35 @@ func (e *KiroExecutor) buildKiroPayload(claudeBody []byte, modelID, profileArn,
// Add the actual user message
contentBuilder.WriteString(currentUserMsg.Content)
currentUserMsg.Content = contentBuilder.String()
finalContent := contentBuilder.String()
// CRITICAL: Kiro API requires content to be non-empty, even when toolResults are present
// If content is empty or only whitespace, provide a default message
if strings.TrimSpace(finalContent) == "" {
if len(currentToolResults) > 0 {
finalContent = "Tool results provided."
} else {
finalContent = "Continue"
}
log.Debugf("kiro: content was empty, using default: %s", finalContent)
}
currentUserMsg.Content = finalContent
// Deduplicate currentToolResults before adding to context
// Kiro API does not accept duplicate toolUseIds
if len(currentToolResults) > 0 {
seenIDs := make(map[string]bool)
uniqueToolResults := make([]kiroToolResult, 0, len(currentToolResults))
for _, tr := range currentToolResults {
if !seenIDs[tr.ToolUseID] {
seenIDs[tr.ToolUseID] = true
uniqueToolResults = append(uniqueToolResults, tr)
} else {
log.Debugf("kiro: skipping duplicate toolResult in currentMessage: %s", tr.ToolUseID)
}
}
currentToolResults = uniqueToolResults
}
// Build userInputMessageContext with tools and tool results
if len(kiroTools) > 0 || len(currentToolResults) > 0 {
@@ -805,21 +901,18 @@ func (e *KiroExecutor) buildKiroPayload(claudeBody []byte, modelID, profileArn,
}}
}
// Build payload with correct field order (matches struct definition)
// Note: history is omitempty, so nil/empty slice won't be serialized
payload := kiroPayload{
ConversationState: kiroConversationState{
ChatTriggerType: "MANUAL", // Required by Kiro API - must be first
ConversationID: uuid.New().String(),
History: history,
CurrentMessage: currentMessage,
ChatTriggerType: "MANUAL", // Required by Kiro API
History: history, // Will be omitted if empty due to omitempty tag
},
ProfileArn: profileArn,
}
// Ensure history is not nil (empty array)
if payload.ConversationState.History == nil {
payload.ConversationState.History = []kiroHistoryMessage{}
}
result, err := json.Marshal(payload)
if err != nil {
log.Debugf("kiro: failed to marshal payload: %v", err)
@@ -830,11 +923,15 @@ func (e *KiroExecutor) buildKiroPayload(claudeBody []byte, modelID, profileArn,
// buildUserMessageStruct builds a user message and extracts tool results
// origin parameter determines which quota to use: "CLI" for Amazon Q, "AI_EDITOR" for Kiro IDE.
// IMPORTANT: Kiro API does not accept duplicate toolUseIds, so we deduplicate here.
func (e *KiroExecutor) buildUserMessageStruct(msg gjson.Result, modelID, origin string) (kiroUserInputMessage, []kiroToolResult) {
content := msg.Get("content")
var contentBuilder strings.Builder
var toolResults []kiroToolResult
var images []kiroImage
// Track seen toolUseIds to deduplicate - Kiro API rejects duplicate toolUseIds
seenToolUseIDs := make(map[string]bool)
if content.IsArray() {
for _, part := range content.Array() {
@@ -864,6 +961,14 @@ func (e *KiroExecutor) buildUserMessageStruct(msg gjson.Result, modelID, origin
case "tool_result":
// Extract tool result for API
toolUseID := part.Get("tool_use_id").String()
// Skip duplicate toolUseIds - Kiro API does not accept duplicates
if seenToolUseIDs[toolUseID] {
log.Debugf("kiro: skipping duplicate tool_result with toolUseId: %s", toolUseID)
continue
}
seenToolUseIDs[toolUseID] = true
isError := part.Get("is_error").Bool()
resultContent := part.Get("content")
@@ -1001,6 +1106,12 @@ func (e *KiroExecutor) parseEventStream(body io.Reader) (string, []kiroToolUse,
return content.String(), toolUses, usageInfo, fmt.Errorf("failed to read message: %w", err)
}
// Validate headersLen to prevent slice out of bounds
if headersLen+4 > uint32(len(remaining)) {
log.Warnf("kiro: invalid headersLen %d exceeds remaining buffer %d", headersLen, len(remaining))
continue
}
// Extract event type from headers
eventType := e.extractEventType(remaining[:headersLen+4])
@@ -1018,6 +1129,37 @@ func (e *KiroExecutor) parseEventStream(body io.Reader) (string, []kiroToolUse,
continue
}
// DIAGNOSTIC: Log all received event types for debugging
log.Debugf("kiro: parseEventStream received event type: %s", eventType)
if log.IsLevelEnabled(log.TraceLevel) {
log.Tracef("kiro: parseEventStream event payload: %s", string(payload))
}
// Check for error/exception events in the payload (Kiro API may return errors with HTTP 200)
// These can appear as top-level fields or nested within the event
if errType, hasErrType := event["_type"].(string); hasErrType {
// AWS-style error: {"_type": "com.amazon.aws.codewhisperer#ValidationException", "message": "..."}
errMsg := ""
if msg, ok := event["message"].(string); ok {
errMsg = msg
}
log.Errorf("kiro: received AWS error in event stream: type=%s, message=%s", errType, errMsg)
return "", nil, usageInfo, fmt.Errorf("kiro API error: %s - %s", errType, errMsg)
}
if errType, hasErrType := event["type"].(string); hasErrType && (errType == "error" || errType == "exception") {
// Generic error event
errMsg := ""
if msg, ok := event["message"].(string); ok {
errMsg = msg
} else if errObj, ok := event["error"].(map[string]interface{}); ok {
if msg, ok := errObj["message"].(string); ok {
errMsg = msg
}
}
log.Errorf("kiro: received error event in stream: type=%s, message=%s", errType, errMsg)
return "", nil, usageInfo, fmt.Errorf("kiro API error: %s", errMsg)
}
// Handle different event types
switch eventType {
case "assistantResponseEvent":
@@ -1231,8 +1373,9 @@ func (e *KiroExecutor) buildClaudeResponse(content string, toolUses []kiroToolUs
// streamToChannel converts AWS Event Stream to channel-based streaming.
// Supports tool calling - emits tool_use content blocks when tools are used.
// Includes embedded [Called ...] tool call parsing and input buffering for toolUseEvent.
// Implements duplicate content filtering using lastContentEvent detection (based on AIClient-2-API).
func (e *KiroExecutor) streamToChannel(ctx context.Context, body io.Reader, out chan<- cliproxyexecutor.StreamChunk, targetFormat sdktranslator.Format, model string, originalReq, claudeBody []byte, reporter *usageReporter) {
reader := bufio.NewReader(body)
reader := bufio.NewReaderSize(body, 20*1024*1024) // 20MB buffer to match other providers
var totalUsage usage.Detail
var hasToolUses bool // Track if any tool uses were emitted
@@ -1240,6 +1383,15 @@ func (e *KiroExecutor) streamToChannel(ctx context.Context, body io.Reader, out
processedIDs := make(map[string]bool)
var currentToolUse *toolUseState
// Duplicate content detection - tracks last content event to filter duplicates
// Based on AIClient-2-API implementation for Kiro
var lastContentEvent string
// Streaming token calculation - accumulate content for real-time token counting
// Based on AIClient-2-API implementation
var accumulatedContent strings.Builder
accumulatedContent.Grow(4096) // Pre-allocate 4KB capacity to reduce reallocations
// Translator param for maintaining tool call state across streaming events
// IMPORTANT: This must persist across all TranslateStream calls
var translatorParam any
@@ -1279,6 +1431,51 @@ func (e *KiroExecutor) streamToChannel(ctx context.Context, body io.Reader, out
prelude := make([]byte, 8)
_, err := io.ReadFull(reader, prelude)
if err == io.EOF {
// Flush any incomplete tool use before ending stream
if currentToolUse != nil && !processedIDs[currentToolUse.toolUseID] {
log.Warnf("kiro: flushing incomplete tool use at EOF: %s (ID: %s)", currentToolUse.name, currentToolUse.toolUseID)
fullInput := currentToolUse.inputBuffer.String()
repairedJSON := repairJSON(fullInput)
var finalInput map[string]interface{}
if err := json.Unmarshal([]byte(repairedJSON), &finalInput); err != nil {
log.Warnf("kiro: failed to parse incomplete tool input at EOF: %v", err)
finalInput = make(map[string]interface{})
}
processedIDs[currentToolUse.toolUseID] = true
contentBlockIndex++
// Send tool_use content block
blockStart := e.buildClaudeContentBlockStartEvent(contentBlockIndex, "tool_use", currentToolUse.toolUseID, currentToolUse.name)
sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, blockStart, &translatorParam)
for _, chunk := range sseData {
if chunk != "" {
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")}
}
}
// Send tool input as delta
inputBytes, _ := json.Marshal(finalInput)
inputDelta := e.buildClaudeInputJsonDeltaEvent(string(inputBytes), contentBlockIndex)
sseData = sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, inputDelta, &translatorParam)
for _, chunk := range sseData {
if chunk != "" {
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")}
}
}
// Close block
blockStop := e.buildClaudeContentBlockStopEvent(contentBlockIndex)
sseData = sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, blockStop, &translatorParam)
for _, chunk := range sseData {
if chunk != "" {
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")}
}
}
hasToolUses = true
currentToolUse = nil
}
break
}
if err != nil {
@@ -1304,6 +1501,12 @@ func (e *KiroExecutor) streamToChannel(ctx context.Context, body io.Reader, out
return
}
// Validate headersLen to prevent slice out of bounds
if headersLen+4 > uint32(len(remaining)) {
log.Warnf("kiro: invalid headersLen %d exceeds remaining buffer %d", headersLen, len(remaining))
continue
}
eventType := e.extractEventType(remaining[:headersLen+4])
payloadStart := 4 + headersLen
@@ -1317,9 +1520,43 @@ func (e *KiroExecutor) streamToChannel(ctx context.Context, body io.Reader, out
var event map[string]interface{}
if err := json.Unmarshal(payload, &event); err != nil {
log.Warnf("kiro: failed to unmarshal event payload: %v, raw: %s", err, string(payload))
continue
}
// DIAGNOSTIC: Log all received event types for debugging
log.Debugf("kiro: streamToChannel received event type: %s", eventType)
if log.IsLevelEnabled(log.TraceLevel) {
log.Tracef("kiro: streamToChannel event payload: %s", string(payload))
}
// Check for error/exception events in the payload (Kiro API may return errors with HTTP 200)
// These can appear as top-level fields or nested within the event
if errType, hasErrType := event["_type"].(string); hasErrType {
// AWS-style error: {"_type": "com.amazon.aws.codewhisperer#ValidationException", "message": "..."}
errMsg := ""
if msg, ok := event["message"].(string); ok {
errMsg = msg
}
log.Errorf("kiro: received AWS error in stream: type=%s, message=%s", errType, errMsg)
out <- cliproxyexecutor.StreamChunk{Err: fmt.Errorf("kiro API error: %s - %s", errType, errMsg)}
return
}
if errType, hasErrType := event["type"].(string); hasErrType && (errType == "error" || errType == "exception") {
// Generic error event
errMsg := ""
if msg, ok := event["message"].(string); ok {
errMsg = msg
} else if errObj, ok := event["error"].(map[string]interface{}); ok {
if msg, ok := errObj["message"].(string); ok {
errMsg = msg
}
}
log.Errorf("kiro: received error event in stream: type=%s, message=%s", errType, errMsg)
out <- cliproxyexecutor.StreamChunk{Err: fmt.Errorf("kiro API error: %s", errMsg)}
return
}
// Send message_start on first event
if !messageStartSent {
msgStart := e.buildClaudeMessageStartEvent(model, totalUsage.InputTokens)
@@ -1364,9 +1601,19 @@ func (e *KiroExecutor) streamToChannel(ctx context.Context, body io.Reader, out
}
}
// Handle text content
// Handle text content with duplicate detection
if contentDelta != "" {
// Check for duplicate content - skip if identical to last content event
// Based on AIClient-2-API implementation for Kiro
if contentDelta == lastContentEvent {
log.Debugf("kiro: skipping duplicate content event (len: %d)", len(contentDelta))
continue
}
lastContentEvent = contentDelta
outputLen += len(contentDelta)
// Accumulate content for streaming token calculation
accumulatedContent.WriteString(contentDelta)
// Start text content block if needed
if !isTextBlockOpen {
contentBlockIndex++
@@ -1538,8 +1785,32 @@ func (e *KiroExecutor) streamToChannel(ctx context.Context, body io.Reader, out
}
}
// Fallback for output tokens if not received from upstream
if totalUsage.OutputTokens == 0 && outputLen > 0 {
// Streaming token calculation - calculate output tokens from accumulated content
// This provides more accurate token counting than simple character division
if totalUsage.OutputTokens == 0 && accumulatedContent.Len() > 0 {
// Try to use tiktoken for accurate counting
if enc, err := tokenizerForModel(model); err == nil {
if tokenCount, countErr := enc.Count(accumulatedContent.String()); countErr == nil {
totalUsage.OutputTokens = int64(tokenCount)
log.Debugf("kiro: streamToChannel calculated output tokens using tiktoken: %d", totalUsage.OutputTokens)
} else {
// Fallback on count error: estimate from character count
totalUsage.OutputTokens = int64(accumulatedContent.Len() / 4)
if totalUsage.OutputTokens == 0 {
totalUsage.OutputTokens = 1
}
log.Debugf("kiro: streamToChannel tiktoken count failed, estimated from chars: %d", totalUsage.OutputTokens)
}
} else {
// Fallback: estimate from character count (roughly 4 chars per token)
totalUsage.OutputTokens = int64(accumulatedContent.Len() / 4)
if totalUsage.OutputTokens == 0 {
totalUsage.OutputTokens = 1
}
log.Debugf("kiro: streamToChannel estimated output tokens from chars: %d (content len: %d)", totalUsage.OutputTokens, accumulatedContent.Len())
}
} else if totalUsage.OutputTokens == 0 && outputLen > 0 {
// Legacy fallback using outputLen
totalUsage.OutputTokens = int64(outputLen / 4)
if totalUsage.OutputTokens == 0 {
totalUsage.OutputTokens = 1
@@ -1553,9 +1824,18 @@ func (e *KiroExecutor) streamToChannel(ctx context.Context, body io.Reader, out
stopReason = "tool_use"
}
// Send message_delta and message_stop
msgStop := e.buildClaudeMessageStopEvent(stopReason, totalUsage)
sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, msgStop, &translatorParam)
// Send message_delta event
msgDelta := e.buildClaudeMessageDeltaEvent(stopReason, totalUsage)
sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, msgDelta, &translatorParam)
for _, chunk := range sseData {
if chunk != "" {
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")}
}
}
// Send message_stop event separately
msgStop := e.buildClaudeMessageStopOnlyEvent()
sseData = sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, msgStop, &translatorParam)
for _, chunk := range sseData {
if chunk != "" {
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")}
@@ -1646,8 +1926,8 @@ func (e *KiroExecutor) buildClaudeContentBlockStopEvent(index int) []byte {
return []byte("event: content_block_stop\ndata: " + string(result))
}
func (e *KiroExecutor) buildClaudeMessageStopEvent(stopReason string, usageInfo usage.Detail) []byte {
// First message_delta
// buildClaudeMessageDeltaEvent creates the message_delta event with stop_reason and usage.
func (e *KiroExecutor) buildClaudeMessageDeltaEvent(stopReason string, usageInfo usage.Detail) []byte {
deltaEvent := map[string]interface{}{
"type": "message_delta",
"delta": map[string]interface{}{
@@ -1660,14 +1940,16 @@ func (e *KiroExecutor) buildClaudeMessageStopEvent(stopReason string, usageInfo
},
}
deltaResult, _ := json.Marshal(deltaEvent)
return []byte("event: message_delta\ndata: " + string(deltaResult))
}
// Then message_stop
// buildClaudeMessageStopOnlyEvent creates only the message_stop event.
func (e *KiroExecutor) buildClaudeMessageStopOnlyEvent() []byte {
stopEvent := map[string]interface{}{
"type": "message_stop",
}
stopResult, _ := json.Marshal(stopEvent)
return []byte("event: message_delta\ndata: " + string(deltaResult) + "\n\nevent: message_stop\ndata: " + string(stopResult))
return []byte("event: message_stop\ndata: " + string(stopResult))
}
// buildClaudeFinalEvent constructs the final Claude-style event.
@@ -1873,6 +2155,12 @@ func (e *KiroExecutor) streamEventStream(ctx context.Context, body io.Reader, c
return fmt.Errorf("failed to read message: %w", err)
}
// Validate headersLen to prevent slice out of bounds
if headersLen+4 > uint32(len(remaining)) {
log.Warnf("kiro: invalid headersLen %d exceeds remaining buffer %d", headersLen, len(remaining))
continue
}
eventType := e.extractEventType(remaining[:headersLen+4])
payloadStart := 4 + headersLen
@@ -1886,6 +2174,7 @@ func (e *KiroExecutor) streamEventStream(ctx context.Context, body io.Reader, c
var event map[string]interface{}
if err := json.Unmarshal(payload, &event); err != nil {
log.Warnf("kiro: failed to unmarshal event payload: %v, raw: %s", err, string(payload))
continue
}
@@ -1983,9 +2272,19 @@ func (e *KiroExecutor) streamEventStream(ctx context.Context, body io.Reader, c
}
totalUsage.TotalTokens = totalUsage.InputTokens + totalUsage.OutputTokens
// Always use end_turn (no tool_use support)
msgStop := e.buildClaudeMessageStopEvent("end_turn", totalUsage)
sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, msgStop, &translatorParam)
// Send message_delta event
msgDelta := e.buildClaudeMessageDeltaEvent("end_turn", totalUsage)
sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, msgDelta, &translatorParam)
for _, chunk := range sseData {
if chunk != "" {
c.Writer.Write([]byte(chunk + "\n\n"))
}
}
c.Writer.Flush()
// Send message_stop event separately
msgStop := e.buildClaudeMessageStopOnlyEvent()
sseData = sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, msgStop, &translatorParam)
for _, chunk := range sseData {
if chunk != "" {
c.Writer.Write([]byte(chunk + "\n\n"))
@@ -2057,6 +2356,128 @@ func (e *KiroExecutor) isTokenExpired(accessToken string) bool {
return isExpired
}
// ============================================================================
// Message Merging Support - Merge adjacent messages with the same role
// Based on AIClient-2-API implementation
// ============================================================================
// mergeAdjacentMessages merges adjacent messages with the same role.
// This reduces API call complexity and improves compatibility.
// Based on AIClient-2-API implementation.
func mergeAdjacentMessages(messages []gjson.Result) []gjson.Result {
if len(messages) <= 1 {
return messages
}
var merged []gjson.Result
for _, msg := range messages {
if len(merged) == 0 {
merged = append(merged, msg)
continue
}
lastMsg := merged[len(merged)-1]
currentRole := msg.Get("role").String()
lastRole := lastMsg.Get("role").String()
if currentRole == lastRole {
// Merge content from current message into last message
mergedContent := mergeMessageContent(lastMsg, msg)
// Create a new merged message JSON
mergedMsg := createMergedMessage(lastRole, mergedContent)
merged[len(merged)-1] = gjson.Parse(mergedMsg)
} else {
merged = append(merged, msg)
}
}
return merged
}
// mergeMessageContent merges the content of two messages with the same role.
// Handles both string content and array content (with text, tool_use, tool_result blocks).
func mergeMessageContent(msg1, msg2 gjson.Result) string {
content1 := msg1.Get("content")
content2 := msg2.Get("content")
// Extract content blocks from both messages
var blocks1, blocks2 []map[string]interface{}
if content1.IsArray() {
for _, block := range content1.Array() {
blocks1 = append(blocks1, blockToMap(block))
}
} else if content1.Type == gjson.String {
blocks1 = append(blocks1, map[string]interface{}{
"type": "text",
"text": content1.String(),
})
}
if content2.IsArray() {
for _, block := range content2.Array() {
blocks2 = append(blocks2, blockToMap(block))
}
} else if content2.Type == gjson.String {
blocks2 = append(blocks2, map[string]interface{}{
"type": "text",
"text": content2.String(),
})
}
// Merge text blocks if both end/start with text
if len(blocks1) > 0 && len(blocks2) > 0 {
if blocks1[len(blocks1)-1]["type"] == "text" && blocks2[0]["type"] == "text" {
// Merge the last text block of msg1 with the first text block of msg2
text1 := blocks1[len(blocks1)-1]["text"].(string)
text2 := blocks2[0]["text"].(string)
blocks1[len(blocks1)-1]["text"] = text1 + "\n" + text2
blocks2 = blocks2[1:] // Remove the merged block from blocks2
}
}
// Combine all blocks
allBlocks := append(blocks1, blocks2...)
// Convert to JSON
result, _ := json.Marshal(allBlocks)
return string(result)
}
// blockToMap converts a gjson.Result block to a map[string]interface{}
func blockToMap(block gjson.Result) map[string]interface{} {
result := make(map[string]interface{})
block.ForEach(func(key, value gjson.Result) bool {
if value.IsObject() {
result[key.String()] = blockToMap(value)
} else if value.IsArray() {
var arr []interface{}
for _, item := range value.Array() {
if item.IsObject() {
arr = append(arr, blockToMap(item))
} else {
arr = append(arr, item.Value())
}
}
result[key.String()] = arr
} else {
result[key.String()] = value.Value()
}
return true
})
return result
}
// createMergedMessage creates a JSON string for a merged message
func createMergedMessage(role string, content string) string {
msg := map[string]interface{}{
"role": role,
"content": json.RawMessage(content),
}
result, _ := json.Marshal(msg)
return string(result)
}
// ============================================================================
// Tool Calling Support - Embedded tool call parsing and input buffering
// Based on amq2api and AIClient-2-API implementations
@@ -2079,8 +2500,6 @@ var (
whitespaceCollapsePattern = regexp.MustCompile(`\s+`)
// trailingCommaPattern matches trailing commas before closing braces/brackets
trailingCommaPattern = regexp.MustCompile(`,\s*([}\]])`)
// unquotedKeyPattern matches unquoted JSON keys that need quoting
unquotedKeyPattern = regexp.MustCompile(`([{,]\s*)([a-zA-Z_][a-zA-Z0-9_]*)\s*:`)
)
// parseEmbeddedToolCalls extracts [Called tool_name with args: {...}] format from text.
@@ -2246,14 +2665,208 @@ func findMatchingBracket(text string, startPos int) int {
}
// repairJSON attempts to fix common JSON issues that may occur in tool call arguments.
// Based on AIClient-2-API's JSON repair implementation.
// Based on AIClient-2-API's JSON repair implementation with a more conservative strategy.
//
// Conservative repair strategy:
// 1. First try to parse JSON directly - if valid, return as-is
// 2. Only attempt repair if parsing fails
// 3. After repair, validate the result - if still invalid, return original
//
// Handles incomplete JSON by balancing brackets and removing trailing incomplete content.
// Uses pre-compiled regex patterns for performance.
func repairJSON(raw string) string {
func repairJSON(jsonString string) string {
// Handle empty or invalid input
if jsonString == "" {
return "{}"
}
str := strings.TrimSpace(jsonString)
if str == "" {
return "{}"
}
// CONSERVATIVE STRATEGY: First try to parse directly
// If the JSON is already valid, return it unchanged
var testParse interface{}
if err := json.Unmarshal([]byte(str), &testParse); err == nil {
log.Debugf("kiro: repairJSON - JSON is already valid, returning unchanged")
return str
}
log.Debugf("kiro: repairJSON - JSON parse failed, attempting repair")
originalStr := str // Keep original for fallback
// First, escape unescaped newlines/tabs within JSON string values
str = escapeNewlinesInStrings(str)
// Remove trailing commas before closing braces/brackets
repaired := trailingCommaPattern.ReplaceAllString(raw, "$1")
// Fix unquoted keys (basic attempt - handles simple cases)
repaired = unquotedKeyPattern.ReplaceAllString(repaired, `$1"$2":`)
return repaired
str = trailingCommaPattern.ReplaceAllString(str, "$1")
// Calculate bracket balance to detect incomplete JSON
braceCount := 0 // {} balance
bracketCount := 0 // [] balance
inString := false
escape := false
lastValidIndex := -1
for i := 0; i < len(str); i++ {
char := str[i]
// Handle escape sequences
if escape {
escape = false
continue
}
if char == '\\' {
escape = true
continue
}
// Handle string boundaries
if char == '"' {
inString = !inString
continue
}
// Skip characters inside strings (they don't affect bracket balance)
if inString {
continue
}
// Track bracket balance
switch char {
case '{':
braceCount++
case '}':
braceCount--
case '[':
bracketCount++
case ']':
bracketCount--
}
// Record last valid position (where brackets are balanced or positive)
if braceCount >= 0 && bracketCount >= 0 {
lastValidIndex = i
}
}
// If brackets are unbalanced, try to repair
if braceCount > 0 || bracketCount > 0 {
// Truncate to last valid position if we have incomplete content
if lastValidIndex > 0 && lastValidIndex < len(str)-1 {
// Check if truncation would help (only truncate if there's trailing garbage)
truncated := str[:lastValidIndex+1]
// Recount brackets after truncation
braceCount = 0
bracketCount = 0
inString = false
escape = false
for i := 0; i < len(truncated); i++ {
char := truncated[i]
if escape {
escape = false
continue
}
if char == '\\' {
escape = true
continue
}
if char == '"' {
inString = !inString
continue
}
if inString {
continue
}
switch char {
case '{':
braceCount++
case '}':
braceCount--
case '[':
bracketCount++
case ']':
bracketCount--
}
}
str = truncated
}
// Add missing closing brackets
for braceCount > 0 {
str += "}"
braceCount--
}
for bracketCount > 0 {
str += "]"
bracketCount--
}
}
// CONSERVATIVE STRATEGY: Validate repaired JSON
// If repair didn't produce valid JSON, return original string
if err := json.Unmarshal([]byte(str), &testParse); err != nil {
log.Warnf("kiro: repairJSON - repair failed to produce valid JSON, returning original")
return originalStr
}
log.Debugf("kiro: repairJSON - successfully repaired JSON")
return str
}
// escapeNewlinesInStrings escapes literal newlines, tabs, and other control characters
// that appear inside JSON string values. This handles cases where streaming fragments
// contain unescaped control characters within string content.
func escapeNewlinesInStrings(raw string) string {
var result strings.Builder
result.Grow(len(raw) + 100) // Pre-allocate with some extra space
inString := false
escaped := false
for i := 0; i < len(raw); i++ {
c := raw[i]
if escaped {
// Previous character was backslash, this is an escape sequence
result.WriteByte(c)
escaped = false
continue
}
if c == '\\' && inString {
// Start of escape sequence
result.WriteByte(c)
escaped = true
continue
}
if c == '"' {
// Toggle string state
inString = !inString
result.WriteByte(c)
continue
}
if inString {
// Inside a string, escape control characters
switch c {
case '\n':
result.WriteString("\\n")
case '\r':
result.WriteString("\\r")
case '\t':
result.WriteString("\\t")
default:
result.WriteByte(c)
}
} else {
result.WriteByte(c)
}
}
return result.String()
}
// processToolUseEvent handles a toolUseEvent from the Kiro stream.
@@ -2330,6 +2943,8 @@ func (e *KiroExecutor) processToolUseEvent(event map[string]interface{}, current
// Accumulate input fragments
if currentToolUse != nil && inputFragment != "" {
// Accumulate fragments directly - they form valid JSON when combined
// The fragments are already decoded from JSON, so we just concatenate them
currentToolUse.inputBuffer.WriteString(inputFragment)
log.Debugf("kiro: accumulated input fragment, total length: %d", currentToolUse.inputBuffer.Len())
}

View File

@@ -171,7 +171,7 @@ func convertClaudeEventToOpenAI(jsonStr string, model string) []string {
return results
case "message_delta":
// Final message delta with stop_reason
// Final message delta with stop_reason and usage
stopReason := root.Get("delta.stop_reason").String()
if stopReason != "" {
finishReason := "stop"
@@ -196,6 +196,19 @@ func convertClaudeEventToOpenAI(jsonStr string, model string) []string {
},
},
}
// Extract and include usage information from message_delta event
usage := root.Get("usage")
if usage.Exists() {
inputTokens := usage.Get("input_tokens").Int()
outputTokens := usage.Get("output_tokens").Int()
response["usage"] = map[string]interface{}{
"prompt_tokens": inputTokens,
"completion_tokens": outputTokens,
"total_tokens": inputTokens + outputTokens,
}
}
result, _ := json.Marshal(response)
results = append(results, string(result))
}