From 8d5f89ccfd02f4a3228a431f6d3cbbaf8a8887a1 Mon Sep 17 00:00:00 2001 From: Ravens Date: Thu, 11 Dec 2025 01:15:00 +0800 Subject: [PATCH] fix(kiro): fix translator format mismatch for OpenAI protocol Amp-Thread-ID: https://ampcode.com/threads/T-019b092b-f2de-72a1-b428-72511c0de628 Co-authored-by: Amp --- internal/runtime/executor/kiro_executor.go | 51 ++++++++--------- .../translator/kiro/claude/kiro_claude.go | 55 ++----------------- .../chat-completions/kiro_openai_response.go | 48 +++++++++++++++- 3 files changed, 77 insertions(+), 77 deletions(-) diff --git a/internal/runtime/executor/kiro_executor.go b/internal/runtime/executor/kiro_executor.go index b965c9ca..b69fd8be 100644 --- a/internal/runtime/executor/kiro_executor.go +++ b/internal/runtime/executor/kiro_executor.go @@ -1323,7 +1323,7 @@ func (e *KiroExecutor) streamToChannel(ctx context.Context, body io.Reader, out // Send message_start on first event if !messageStartSent { msgStart := e.buildClaudeMessageStartEvent(model, totalUsage.InputTokens) - sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("claude"), targetFormat, model, originalReq, claudeBody, msgStart, &translatorParam) + sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, msgStart, &translatorParam) for _, chunk := range sseData { if chunk != "" { out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")} @@ -1372,7 +1372,7 @@ func (e *KiroExecutor) streamToChannel(ctx context.Context, body io.Reader, out contentBlockIndex++ isTextBlockOpen = true blockStart := e.buildClaudeContentBlockStartEvent(contentBlockIndex, "text", "", "") - sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("claude"), targetFormat, model, originalReq, claudeBody, blockStart, &translatorParam) + sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, blockStart, &translatorParam) for _, chunk := range sseData { if chunk != "" { out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")} @@ -1381,7 +1381,7 @@ func (e *KiroExecutor) streamToChannel(ctx context.Context, body io.Reader, out } claudeEvent := e.buildClaudeStreamEvent(contentDelta, contentBlockIndex) - sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("claude"), targetFormat, model, originalReq, claudeBody, claudeEvent, &translatorParam) + sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, claudeEvent, &translatorParam) for _, chunk := range sseData { if chunk != "" { out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")} @@ -1404,7 +1404,7 @@ func (e *KiroExecutor) streamToChannel(ctx context.Context, body io.Reader, out // Close text block if open before starting tool_use block if isTextBlockOpen && contentBlockIndex >= 0 { blockStop := e.buildClaudeContentBlockStopEvent(contentBlockIndex) - sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("claude"), targetFormat, model, originalReq, claudeBody, blockStop, &translatorParam) + sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, blockStop, &translatorParam) for _, chunk := range sseData { if chunk != "" { out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")} @@ -1418,7 +1418,7 @@ func (e *KiroExecutor) streamToChannel(ctx context.Context, body io.Reader, out toolName := getString(tu, "name") blockStart := e.buildClaudeContentBlockStartEvent(contentBlockIndex, "tool_use", toolUseID, toolName) - sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("claude"), targetFormat, model, originalReq, claudeBody, blockStart, &translatorParam) + sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, blockStart, &translatorParam) for _, chunk := range sseData { if chunk != "" { out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")} @@ -1433,7 +1433,7 @@ func (e *KiroExecutor) streamToChannel(ctx context.Context, body io.Reader, out // Don't continue - still need to close the block } else { inputDelta := e.buildClaudeInputJsonDeltaEvent(string(inputJSON), contentBlockIndex) - sseData = sdktranslator.TranslateStream(ctx, sdktranslator.FromString("claude"), targetFormat, model, originalReq, claudeBody, inputDelta, &translatorParam) + sseData = sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, inputDelta, &translatorParam) for _, chunk := range sseData { if chunk != "" { out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")} @@ -1444,7 +1444,7 @@ func (e *KiroExecutor) streamToChannel(ctx context.Context, body io.Reader, out // Close tool_use block (always close even if input marshal failed) blockStop := e.buildClaudeContentBlockStopEvent(contentBlockIndex) - sseData = sdktranslator.TranslateStream(ctx, sdktranslator.FromString("claude"), targetFormat, model, originalReq, claudeBody, blockStop, &translatorParam) + sseData = sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, blockStop, &translatorParam) for _, chunk := range sseData { if chunk != "" { out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")} @@ -1464,7 +1464,7 @@ func (e *KiroExecutor) streamToChannel(ctx context.Context, body io.Reader, out // Close text block if open if isTextBlockOpen && contentBlockIndex >= 0 { blockStop := e.buildClaudeContentBlockStopEvent(contentBlockIndex) - sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("claude"), targetFormat, model, originalReq, claudeBody, blockStop, &translatorParam) + sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, blockStop, &translatorParam) for _, chunk := range sseData { if chunk != "" { out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")} @@ -1476,7 +1476,7 @@ func (e *KiroExecutor) streamToChannel(ctx context.Context, body io.Reader, out contentBlockIndex++ blockStart := e.buildClaudeContentBlockStartEvent(contentBlockIndex, "tool_use", tu.ToolUseID, tu.Name) - sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("claude"), targetFormat, model, originalReq, claudeBody, blockStart, &translatorParam) + sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, blockStart, &translatorParam) for _, chunk := range sseData { if chunk != "" { out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")} @@ -1489,7 +1489,7 @@ func (e *KiroExecutor) streamToChannel(ctx context.Context, body io.Reader, out log.Debugf("kiro: failed to marshal tool input in toolUseEvent: %v", err) } else { inputDelta := e.buildClaudeInputJsonDeltaEvent(string(inputJSON), contentBlockIndex) - sseData = sdktranslator.TranslateStream(ctx, sdktranslator.FromString("claude"), targetFormat, model, originalReq, claudeBody, inputDelta, &translatorParam) + sseData = sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, inputDelta, &translatorParam) for _, chunk := range sseData { if chunk != "" { out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")} @@ -1499,7 +1499,7 @@ func (e *KiroExecutor) streamToChannel(ctx context.Context, body io.Reader, out } blockStop := e.buildClaudeContentBlockStopEvent(contentBlockIndex) - sseData = sdktranslator.TranslateStream(ctx, sdktranslator.FromString("claude"), targetFormat, model, originalReq, claudeBody, blockStop, &translatorParam) + sseData = sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, blockStop, &translatorParam) for _, chunk := range sseData { if chunk != "" { out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")} @@ -1530,7 +1530,7 @@ func (e *KiroExecutor) streamToChannel(ctx context.Context, body io.Reader, out // Close content block if open if isTextBlockOpen && contentBlockIndex >= 0 { blockStop := e.buildClaudeContentBlockStopEvent(contentBlockIndex) - sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("claude"), targetFormat, model, originalReq, claudeBody, blockStop, &translatorParam) + sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, blockStop, &translatorParam) for _, chunk := range sseData { if chunk != "" { out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")} @@ -1555,7 +1555,7 @@ func (e *KiroExecutor) streamToChannel(ctx context.Context, body io.Reader, out // Send message_delta and message_stop msgStop := e.buildClaudeMessageStopEvent(stopReason, totalUsage) - sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("claude"), targetFormat, model, originalReq, claudeBody, msgStop, &translatorParam) + sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, msgStop, &translatorParam) for _, chunk := range sseData { if chunk != "" { out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")} @@ -1566,6 +1566,7 @@ func (e *KiroExecutor) streamToChannel(ctx context.Context, body io.Reader, out // Claude SSE event builders +// All builders return complete SSE format with "event:" line for Claude client compatibility. func (e *KiroExecutor) buildClaudeMessageStartEvent(model string, inputTokens int64) []byte { event := map[string]interface{}{ "type": "message_start", @@ -1581,7 +1582,7 @@ func (e *KiroExecutor) buildClaudeMessageStartEvent(model string, inputTokens in }, } result, _ := json.Marshal(event) - return []byte("data: " + string(result)) + return []byte("event: message_start\ndata: " + string(result)) } func (e *KiroExecutor) buildClaudeContentBlockStartEvent(index int, blockType, toolUseID, toolName string) []byte { @@ -1606,7 +1607,7 @@ func (e *KiroExecutor) buildClaudeContentBlockStartEvent(index int, blockType, t "content_block": contentBlock, } result, _ := json.Marshal(event) - return []byte("data: " + string(result)) + return []byte("event: content_block_start\ndata: " + string(result)) } func (e *KiroExecutor) buildClaudeStreamEvent(contentDelta string, index int) []byte { @@ -1619,7 +1620,7 @@ func (e *KiroExecutor) buildClaudeStreamEvent(contentDelta string, index int) [] }, } result, _ := json.Marshal(event) - return []byte("data: " + string(result)) + return []byte("event: content_block_delta\ndata: " + string(result)) } // buildClaudeInputJsonDeltaEvent creates an input_json_delta event for tool use streaming @@ -1633,7 +1634,7 @@ func (e *KiroExecutor) buildClaudeInputJsonDeltaEvent(partialJSON string, index }, } result, _ := json.Marshal(event) - return []byte("data: " + string(result)) + return []byte("event: content_block_delta\ndata: " + string(result)) } func (e *KiroExecutor) buildClaudeContentBlockStopEvent(index int) []byte { @@ -1642,7 +1643,7 @@ func (e *KiroExecutor) buildClaudeContentBlockStopEvent(index int) []byte { "index": index, } result, _ := json.Marshal(event) - return []byte("data: " + string(result)) + return []byte("event: content_block_stop\ndata: " + string(result)) } func (e *KiroExecutor) buildClaudeMessageStopEvent(stopReason string, usageInfo usage.Detail) []byte { @@ -1666,7 +1667,7 @@ func (e *KiroExecutor) buildClaudeMessageStopEvent(stopReason string, usageInfo } stopResult, _ := json.Marshal(stopEvent) - return []byte("data: " + string(deltaResult) + "\n\ndata: " + string(stopResult)) + return []byte("event: message_delta\ndata: " + string(deltaResult) + "\n\nevent: message_stop\ndata: " + string(stopResult)) } // buildClaudeFinalEvent constructs the final Claude-style event. @@ -1675,7 +1676,7 @@ func (e *KiroExecutor) buildClaudeFinalEvent() []byte { "type": "message_stop", } result, _ := json.Marshal(event) - return []byte("data: " + string(result)) + return []byte("event: message_stop\ndata: " + string(result)) } // CountTokens is not supported for the Kiro provider. @@ -1890,7 +1891,7 @@ func (e *KiroExecutor) streamEventStream(ctx context.Context, body io.Reader, c if !messageStartSent { msgStart := e.buildClaudeMessageStartEvent(model, totalUsage.InputTokens) - sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("claude"), targetFormat, model, originalReq, claudeBody, msgStart, &translatorParam) + sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, msgStart, &translatorParam) for _, chunk := range sseData { if chunk != "" { c.Writer.Write([]byte(chunk + "\n\n")) @@ -1921,7 +1922,7 @@ func (e *KiroExecutor) streamEventStream(ctx context.Context, body io.Reader, c contentBlockIndex++ isBlockOpen = true blockStart := e.buildClaudeContentBlockStartEvent(contentBlockIndex, "text", "", "") - sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("claude"), targetFormat, model, originalReq, claudeBody, blockStart, &translatorParam) + sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, blockStart, &translatorParam) for _, chunk := range sseData { if chunk != "" { c.Writer.Write([]byte(chunk + "\n\n")) @@ -1931,7 +1932,7 @@ func (e *KiroExecutor) streamEventStream(ctx context.Context, body io.Reader, c } claudeEvent := e.buildClaudeStreamEvent(contentDelta, contentBlockIndex) - sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("claude"), targetFormat, model, originalReq, claudeBody, claudeEvent, &translatorParam) + sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, claudeEvent, &translatorParam) for _, chunk := range sseData { if chunk != "" { c.Writer.Write([]byte(chunk + "\n\n")) @@ -1964,7 +1965,7 @@ func (e *KiroExecutor) streamEventStream(ctx context.Context, body io.Reader, c // Close content block if open if isBlockOpen && contentBlockIndex >= 0 { blockStop := e.buildClaudeContentBlockStopEvent(contentBlockIndex) - sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("claude"), targetFormat, model, originalReq, claudeBody, blockStop, &translatorParam) + sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, blockStop, &translatorParam) for _, chunk := range sseData { if chunk != "" { c.Writer.Write([]byte(chunk + "\n\n")) @@ -1984,7 +1985,7 @@ func (e *KiroExecutor) streamEventStream(ctx context.Context, body io.Reader, c // Always use end_turn (no tool_use support) msgStop := e.buildClaudeMessageStopEvent("end_turn", totalUsage) - sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("claude"), targetFormat, model, originalReq, claudeBody, msgStop, &translatorParam) + sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, msgStop, &translatorParam) for _, chunk := range sseData { if chunk != "" { c.Writer.Write([]byte(chunk + "\n\n")) diff --git a/internal/translator/kiro/claude/kiro_claude.go b/internal/translator/kiro/claude/kiro_claude.go index 335873a7..554dbf21 100644 --- a/internal/translator/kiro/claude/kiro_claude.go +++ b/internal/translator/kiro/claude/kiro_claude.go @@ -1,14 +1,11 @@ // Package claude provides translation between Kiro and Claude formats. -// Since Kiro uses Claude-compatible format internally, translations are mostly pass-through. -// However, SSE events require proper "event: " prefix for Claude clients. +// Since Kiro executor generates Claude-compatible SSE format internally (with event: prefix), +// translations are pass-through. package claude import ( "bytes" "context" - "strings" - - "github.com/tidwall/gjson" ) // ConvertClaudeRequestToKiro converts Claude request to Kiro format. @@ -18,52 +15,10 @@ func ConvertClaudeRequestToKiro(modelName string, inputRawJSON []byte, stream bo } // ConvertKiroResponseToClaude converts Kiro streaming response to Claude format. -// It adds the required "event: " prefix for SSE compliance with Claude clients. -// Input format: "data: {\"type\":\"message_start\",...}" -// Output format: "event: message_start\ndata: {\"type\":\"message_start\",...}" +// Kiro executor already generates complete SSE format with "event:" prefix, +// so this is a simple pass-through. func ConvertKiroResponseToClaude(ctx context.Context, model string, originalRequest, request, rawResponse []byte, param *any) []string { - raw := string(rawResponse) - - // Handle multiple data blocks (e.g., message_delta + message_stop) - lines := strings.Split(raw, "\n\n") - var results []string - - for _, line := range lines { - line = strings.TrimSpace(line) - if line == "" { - continue - } - - // Extract event type from JSON and add "event:" prefix - formatted := addEventPrefix(line) - if formatted != "" { - results = append(results, formatted) - } - } - - if len(results) == 0 { - return []string{raw} - } - - return results -} - -// addEventPrefix extracts the event type from the data line and adds the event: prefix. -// Input: "data: {\"type\":\"message_start\",...}" -// Output: "event: message_start\ndata: {\"type\":\"message_start\",...}" -func addEventPrefix(dataLine string) string { - if !strings.HasPrefix(dataLine, "data: ") { - return dataLine - } - - jsonPart := strings.TrimPrefix(dataLine, "data: ") - eventType := gjson.Get(jsonPart, "type").String() - - if eventType == "" { - return dataLine - } - - return "event: " + eventType + "\n" + dataLine + return []string{string(rawResponse)} } // ConvertKiroResponseToClaudeNonStream converts Kiro non-streaming response to Claude format. diff --git a/internal/translator/kiro/openai/chat-completions/kiro_openai_response.go b/internal/translator/kiro/openai/chat-completions/kiro_openai_response.go index 6a0ad250..df75cc07 100644 --- a/internal/translator/kiro/openai/chat-completions/kiro_openai_response.go +++ b/internal/translator/kiro/openai/chat-completions/kiro_openai_response.go @@ -4,6 +4,7 @@ package chat_completions import ( "context" "encoding/json" + "strings" "time" "github.com/google/uuid" @@ -13,15 +14,58 @@ import ( // ConvertKiroResponseToOpenAI converts Kiro streaming response to OpenAI SSE format. // Handles Claude SSE events: content_block_start, content_block_delta, input_json_delta, // content_block_stop, message_delta, and message_stop. +// Input may be in SSE format: "event: xxx\ndata: {...}" or raw JSON. func ConvertKiroResponseToOpenAI(ctx context.Context, model string, originalRequest, request, rawResponse []byte, param *any) []string { - root := gjson.ParseBytes(rawResponse) + raw := string(rawResponse) + var results []string + + // Handle SSE format: extract JSON from "data: " lines + // Input format: "event: message_start\ndata: {...}" + lines := strings.Split(raw, "\n") + for _, line := range lines { + line = strings.TrimSpace(line) + if strings.HasPrefix(line, "data: ") { + jsonPart := strings.TrimPrefix(line, "data: ") + chunks := convertClaudeEventToOpenAI(jsonPart, model) + results = append(results, chunks...) + } else if strings.HasPrefix(line, "{") { + // Raw JSON (backward compatibility) + chunks := convertClaudeEventToOpenAI(line, model) + results = append(results, chunks...) + } + } + + return results +} + +// convertClaudeEventToOpenAI converts a single Claude JSON event to OpenAI format +func convertClaudeEventToOpenAI(jsonStr string, model string) []string { + root := gjson.Parse(jsonStr) var results []string eventType := root.Get("type").String() switch eventType { case "message_start": - // Initial message event - could emit initial chunk if needed + // Initial message event - emit initial chunk with role + response := map[string]interface{}{ + "id": "chatcmpl-" + uuid.New().String()[:24], + "object": "chat.completion.chunk", + "created": time.Now().Unix(), + "model": model, + "choices": []map[string]interface{}{ + { + "index": 0, + "delta": map[string]interface{}{ + "role": "assistant", + "content": "", + }, + "finish_reason": nil, + }, + }, + } + result, _ := json.Marshal(response) + results = append(results, string(result)) return results case "content_block_start":