From 84920cb6709a475f7054d067e5f9d9588c057d62 Mon Sep 17 00:00:00 2001 From: Ravens2121 Date: Fri, 12 Dec 2025 13:43:36 +0800 Subject: [PATCH] feat(kiro): add multi-endpoint fallback & thinking mode support --- PR_DOCUMENTATION.md | 49 ++ internal/api/modules/amp/proxy.go | 10 +- internal/config/config.go | 9 + internal/runtime/executor/kiro_executor.go | 732 +++++++++++++++--- .../chat-completions/kiro_openai_request.go | 29 + .../chat-completions/kiro_openai_response.go | 31 + internal/watcher/watcher.go | 17 + 7 files changed, 787 insertions(+), 90 deletions(-) create mode 100644 PR_DOCUMENTATION.md diff --git a/PR_DOCUMENTATION.md b/PR_DOCUMENTATION.md new file mode 100644 index 00000000..6b830af6 --- /dev/null +++ b/PR_DOCUMENTATION.md @@ -0,0 +1,49 @@ +# PR Title / 拉取请求标题 + +`feat(kiro): Add Thinking Mode support & enhance reliability with multi-quota failover` +`feat(kiro): 支持思考模型 (Thinking Mode) 并通过多配额故障转移增强稳定性` + +--- + +# PR Description / 拉取请求描述 + +## 📝 Summary / 摘要 + +This PR introduces significant upgrades to the Kiro (AWS CodeWhisperer/Amazon Q) module. It adds native support for **Thinking/Reasoning models** (similar to OpenAI o1/Claude 3.7), implements a robust **Multi-Endpoint Failover** system to handle rate limits (429), and optimizes configuration flexibility. + +本次 PR 对 Kiro (AWS CodeWhisperer/Amazon Q) 模块进行了重大升级。它增加了对 **思考/推理模型 (Thinking/Reasoning models)** 的原生支持(类似 OpenAI o1/Claude 3.7),实现了一套健壮的 **多端点故障转移 (Multi-Endpoint Failover)** 系统以应对速率限制 (429),并优化了配置灵活性。 + +## ✨ Key Changes / 主要变更 + +### 1. 🧠 Thinking Mode Support / 思考模式支持 +- **OpenAI Compatibility**: Automatically maps OpenAI's `reasoning_effort` parameter (low/medium/high) to Claude's `budget_tokens` (4k/16k/32k). + - **OpenAI 兼容性**:自动将 OpenAI 的 `reasoning_effort` 参数(low/medium/high)映射为 Claude 的 `budget_tokens`(4k/16k/32k)。 +- **Stream Parsing**: Implemented advanced stream parsing logic to detect and extract content within `...` tags, even across chunk boundaries. + - **流式解析**:实现了高级流式解析逻辑,能够检测并提取 `...` 标签内的内容,即使标签跨越了数据块边界。 +- **Protocol Translation**: Converts Kiro's internal thinking content into OpenAI-compatible `reasoning_content` fields (for non-stream) or `thinking_delta` events (for stream). + - **协议转换**:将 Kiro 内部的思考内容转换为兼容 OpenAI 的 `reasoning_content` 字段(非流式)或 `thinking_delta` 事件(流式)。 + +### 2. 🛡️ Robustness & Failover / 稳健性与故障转移 +- **Dual Quota System**: Explicitly defined `kiroEndpointConfig` to distinguish between **IDE (CodeWhisperer)** and **CLI (Amazon Q)** quotas. + - **双配额系统**:显式定义了 `kiroEndpointConfig` 结构,明确区分 **IDE (CodeWhisperer)** 和 **CLI (Amazon Q)** 的配额来源。 +- **Auto Failover**: Implemented automatic failover logic. If one endpoint returns `429 Too Many Requests`, the request seamlessly retries on the next available endpoint/quota. + - **自动故障转移**:实现了自动故障转移逻辑。如果一个端点返回 `429 Too Many Requests`,请求将无缝在下一个可用端点/配额上重试。 +- **Strict Protocol Compliance**: Enforced strict matching of `Origin` and `X-Amz-Target` headers for each endpoint to prevent `403 Forbidden` errors due to protocol mismatches. + - **严格协议合规**:强制每个端点严格匹配 `Origin` 和 `X-Amz-Target` 头信息,防止因协议不匹配导致的 `403 Forbidden` 错误。 + +### 3. ⚙️ Configuration & Models / 配置与模型 +- **New Config Options**: Added `KiroPreferredEndpoint` (global) and `PreferredEndpoint` (per-key) settings to allow users to prioritize specific quotas (e.g., "ide" or "cli"). + - **新配置项**:添加了 `KiroPreferredEndpoint`(全局)和 `PreferredEndpoint`(单 Key)设置,允许用户优先选择特定的配额(如 "ide" 或 "cli")。 +- **Model Registry**: Normalized model IDs (replaced dots with hyphens) and added `-agentic` variants optimized for large code generation tasks. + - **模型注册表**:规范化了模型 ID(将点号替换为连字符),并添加了针对大型代码生成任务优化的 `-agentic` 变体。 + +### 4. 🔧 Fixes / 修复 +- **AMP Proxy**: Downgraded client-side context cancellation logs from `Error` to `Debug` to reduce log noise. + - **AMP 代理**:将客户端上下文取消的日志级别从 `Error` 降级为 `Debug`,减少日志噪音。 + +## ⚠️ Impact / 影响 + +- **Authentication**: **No changes** to the login/OAuth process. Existing tokens work as is. +- **认证**:登录/OAuth 流程 **无变更**。现有 Token 可直接使用。 +- **Compatibility**: Fully backward compatible. The new failover logic is transparent to the user. +- **兼容性**:完全向后兼容。新的故障转移逻辑对用户是透明的。 \ No newline at end of file diff --git a/internal/api/modules/amp/proxy.go b/internal/api/modules/amp/proxy.go index 33f32c28..6a6b1b54 100644 --- a/internal/api/modules/amp/proxy.go +++ b/internal/api/modules/amp/proxy.go @@ -3,6 +3,8 @@ package amp import ( "bytes" "compress/gzip" + "context" + "errors" "fmt" "io" "net/http" @@ -148,7 +150,13 @@ func createReverseProxy(upstreamURL string, secretSource SecretSource) (*httputi // Error handler for proxy failures proxy.ErrorHandler = func(rw http.ResponseWriter, req *http.Request, err error) { - log.Errorf("amp upstream proxy error for %s %s: %v", req.Method, req.URL.Path, err) + // Check if this is a client-side cancellation (normal behavior) + // Don't log as error for context canceled - it's usually client closing connection + if errors.Is(err, context.Canceled) { + log.Debugf("amp upstream proxy: client canceled request for %s %s", req.Method, req.URL.Path) + } else { + log.Errorf("amp upstream proxy error for %s %s: %v", req.Method, req.URL.Path, err) + } rw.Header().Set("Content-Type", "application/json") rw.WriteHeader(http.StatusBadGateway) _, _ = rw.Write([]byte(`{"error":"amp_upstream_proxy_error","message":"Failed to reach Amp upstream"}`)) diff --git a/internal/config/config.go b/internal/config/config.go index f9da2c29..86b79ad2 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -64,6 +64,10 @@ type Config struct { // KiroKey defines a list of Kiro (AWS CodeWhisperer) configurations. KiroKey []KiroKey `yaml:"kiro" json:"kiro"` + // KiroPreferredEndpoint sets the global default preferred endpoint for all Kiro providers. + // Values: "ide" (default, CodeWhisperer) or "cli" (Amazon Q). + KiroPreferredEndpoint string `yaml:"kiro-preferred-endpoint" json:"kiro-preferred-endpoint"` + // Codex defines a list of Codex API key configurations as specified in the YAML configuration file. CodexKey []CodexKey `yaml:"codex-api-key" json:"codex-api-key"` @@ -278,6 +282,10 @@ type KiroKey struct { // AgentTaskType sets the Kiro API task type. Known values: "vibe", "dev", "chat". // Leave empty to let API use defaults. Different values may inject different system prompts. AgentTaskType string `yaml:"agent-task-type,omitempty" json:"agent-task-type,omitempty"` + + // PreferredEndpoint sets the preferred Kiro API endpoint/quota. + // Values: "codewhisperer" (default, IDE quota) or "amazonq" (CLI quota). + PreferredEndpoint string `yaml:"preferred-endpoint,omitempty" json:"preferred-endpoint,omitempty"` } // OpenAICompatibility represents the configuration for OpenAI API compatibility @@ -504,6 +512,7 @@ func (cfg *Config) SanitizeKiroKeys() { entry.ProfileArn = strings.TrimSpace(entry.ProfileArn) entry.Region = strings.TrimSpace(entry.Region) entry.ProxyURL = strings.TrimSpace(entry.ProxyURL) + entry.PreferredEndpoint = strings.TrimSpace(entry.PreferredEndpoint) } } diff --git a/internal/runtime/executor/kiro_executor.go b/internal/runtime/executor/kiro_executor.go index 2987e2d3..bff3fb57 100644 --- a/internal/runtime/executor/kiro_executor.go +++ b/internal/runtime/executor/kiro_executor.go @@ -31,18 +31,23 @@ import ( ) const ( - // kiroEndpoint is the CodeWhisperer streaming endpoint for chat API (GenerateAssistantResponse). - // Based on AIClient-2-API reference implementation. - // Note: Amazon Q uses a different endpoint (q.us-east-1.amazonaws.com) with different request format. - kiroEndpoint = "https://codewhisperer.us-east-1.amazonaws.com/generateAssistantResponse" - kiroContentType = "application/json" - kiroAcceptStream = "application/json" + // Kiro API common constants + kiroContentType = "application/x-amz-json-1.0" + kiroAcceptStream = "*/*" kiroMaxMessageSize = 10 * 1024 * 1024 // 10MB max message size for event stream kiroMaxToolDescLen = 10237 // Kiro API limit is 10240 bytes, leave room for "..." - // kiroUserAgent matches AIClient-2-API format for x-amz-user-agent header - kiroUserAgent = "aws-sdk-js/1.0.7 KiroIDE-0.1.25" - // kiroFullUserAgent is the complete user-agent header matching AIClient-2-API - kiroFullUserAgent = "aws-sdk-js/1.0.7 ua/2.1 os/linux lang/go api/codewhispererstreaming#1.0.7 m/E KiroIDE-0.1.25" + // kiroUserAgent matches amq2api format for User-Agent header + kiroUserAgent = "aws-sdk-rust/1.3.9 os/macos lang/rust/1.87.0" + // kiroFullUserAgent is the complete x-amz-user-agent header matching amq2api + kiroFullUserAgent = "aws-sdk-rust/1.3.9 ua/2.1 api/ssooidc/1.88.0 os/macos lang/rust/1.87.0 m/E app/AmazonQ-For-CLI" + + // Thinking mode support - based on amq2api implementation + // These tags wrap reasoning content in the response stream + thinkingStartTag = "" + thinkingEndTag = "" + // thinkingHint is injected into the request to enable interleaved thinking mode + // This tells the model to use thinking tags and sets the max thinking length + thinkingHint = "interleaved16000" // kiroAgenticSystemPrompt is injected only for -agentic models to prevent timeouts on large writes. // AWS Kiro API has a 2-3 minute timeout for large file write operations. @@ -97,6 +102,106 @@ You MUST follow these rules for ALL file operations. Violation causes server tim REMEMBER: When in doubt, write LESS per operation. Multiple small operations > one large operation.` ) +// kiroEndpointConfig bundles endpoint URL with its compatible Origin and AmzTarget values. +// This solves the "triple mismatch" problem where different endpoints require matching +// Origin and X-Amz-Target header values. +// +// Based on reference implementations: +// - amq2api-main: Uses Amazon Q endpoint with CLI origin and AmazonQDeveloperStreamingService target +// - AIClient-2-API: Uses CodeWhisperer endpoint with AI_EDITOR origin and AmazonCodeWhispererStreamingService target +type kiroEndpointConfig struct { + URL string // Endpoint URL + Origin string // Request Origin: "CLI" for Amazon Q quota, "AI_EDITOR" for Kiro IDE quota + AmzTarget string // X-Amz-Target header value + Name string // Endpoint name for logging +} + +// kiroEndpointConfigs defines the available Kiro API endpoints with their compatible configurations. +// The order determines fallback priority: primary endpoint first, then fallbacks. +// +// CRITICAL: Each endpoint MUST use its compatible Origin and AmzTarget values: +// - CodeWhisperer endpoint (codewhisperer.us-east-1.amazonaws.com): Uses AI_EDITOR origin and AmazonCodeWhispererStreamingService target +// - Amazon Q endpoint (q.us-east-1.amazonaws.com): Uses CLI origin and AmazonQDeveloperStreamingService target +// +// Mismatched combinations will result in 403 Forbidden errors. +// +// NOTE: CodeWhisperer is set as the default endpoint because: +// 1. Most tokens come from Kiro IDE / VSCode extensions (AWS Builder ID auth) +// 2. These tokens use AI_EDITOR origin which is only compatible with CodeWhisperer endpoint +// 3. Amazon Q endpoint requires CLI origin which is for Amazon Q CLI tokens +// This matches the AIClient-2-API-main project's configuration. +var kiroEndpointConfigs = []kiroEndpointConfig{ + { + URL: "https://codewhisperer.us-east-1.amazonaws.com/generateAssistantResponse", + Origin: "AI_EDITOR", + AmzTarget: "AmazonCodeWhispererStreamingService.GenerateAssistantResponse", + Name: "CodeWhisperer", + }, + { + URL: "https://q.us-east-1.amazonaws.com/", + Origin: "CLI", + AmzTarget: "AmazonQDeveloperStreamingService.SendMessage", + Name: "AmazonQ", + }, +} + +// getKiroEndpointConfigs returns the list of Kiro API endpoint configurations to try in order. +// Supports reordering based on "preferred_endpoint" in auth metadata/attributes. +func getKiroEndpointConfigs(auth *cliproxyauth.Auth) []kiroEndpointConfig { + if auth == nil { + return kiroEndpointConfigs + } + + // Check for preference + var preference string + if auth.Metadata != nil { + if p, ok := auth.Metadata["preferred_endpoint"].(string); ok { + preference = p + } + } + // Check attributes as fallback (e.g. from HTTP headers) + if preference == "" && auth.Attributes != nil { + preference = auth.Attributes["preferred_endpoint"] + } + + if preference == "" { + return kiroEndpointConfigs + } + + preference = strings.ToLower(strings.TrimSpace(preference)) + + // Create new slice to avoid modifying global state + var sorted []kiroEndpointConfig + var remaining []kiroEndpointConfig + + for _, cfg := range kiroEndpointConfigs { + name := strings.ToLower(cfg.Name) + // Check for matches + // CodeWhisperer aliases: codewhisperer, ide + // AmazonQ aliases: amazonq, q, cli + isMatch := false + if (preference == "codewhisperer" || preference == "ide") && name == "codewhisperer" { + isMatch = true + } else if (preference == "amazonq" || preference == "q" || preference == "cli") && name == "amazonq" { + isMatch = true + } + + if isMatch { + sorted = append(sorted, cfg) + } else { + remaining = append(remaining, cfg) + } + } + + // If preference didn't match anything, return default + if len(sorted) == 0 { + return kiroEndpointConfigs + } + + // Combine: preferred first, then others + return append(sorted, remaining...) +} + // KiroExecutor handles requests to AWS CodeWhisperer (Kiro) API. type KiroExecutor struct { cfg *config.Config @@ -181,13 +286,29 @@ func (e *KiroExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req } // executeWithRetry performs the actual HTTP request with automatic retry on auth errors. -// Supports automatic fallback from CLI (Amazon Q) quota to AI_EDITOR (Kiro IDE) quota on 429. +// Supports automatic fallback between endpoints with different quotas: +// - Amazon Q endpoint (CLI origin) uses Amazon Q Developer quota +// - CodeWhisperer endpoint (AI_EDITOR origin) uses Kiro IDE quota +// Also supports multi-endpoint fallback similar to Antigravity implementation. func (e *KiroExecutor) executeWithRetry(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options, accessToken, profileArn string, kiroPayload, body []byte, from, to sdktranslator.Format, reporter *usageReporter, currentOrigin, kiroModelID string, isAgentic, isChatOnly bool) (cliproxyexecutor.Response, error) { var resp cliproxyexecutor.Response - maxRetries := 2 // Allow retries for token refresh + origin fallback + maxRetries := 2 // Allow retries for token refresh + endpoint fallback + endpointConfigs := getKiroEndpointConfigs(auth) + + for endpointIdx := 0; endpointIdx < len(endpointConfigs); endpointIdx++ { + endpointConfig := endpointConfigs[endpointIdx] + url := endpointConfig.URL + // Use this endpoint's compatible Origin (critical for avoiding 403 errors) + currentOrigin = endpointConfig.Origin + + // Rebuild payload with the correct origin for this endpoint + // Each endpoint requires its matching Origin value in the request body + kiroPayload = e.buildKiroPayload(body, kiroModelID, profileArn, currentOrigin, isAgentic, isChatOnly) + + log.Debugf("kiro: trying endpoint %d/%d: %s (Name: %s, Origin: %s)", + endpointIdx+1, len(endpointConfigs), url, endpointConfig.Name, currentOrigin) for attempt := 0; attempt <= maxRetries; attempt++ { - url := kiroEndpoint httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(kiroPayload)) if err != nil { return resp, err @@ -196,11 +317,12 @@ func (e *KiroExecutor) executeWithRetry(ctx context.Context, auth *cliproxyauth. httpReq.Header.Set("Content-Type", kiroContentType) httpReq.Header.Set("Authorization", "Bearer "+accessToken) httpReq.Header.Set("Accept", kiroAcceptStream) - httpReq.Header.Set("x-amz-user-agent", kiroUserAgent) - httpReq.Header.Set("User-Agent", kiroFullUserAgent) - httpReq.Header.Set("amz-sdk-request", "attempt=1; max=1") - httpReq.Header.Set("x-amzn-kiro-agent-mode", "vibe") - httpReq.Header.Set("amz-sdk-invocation-id", uuid.New().String()) + // Use endpoint-specific X-Amz-Target (critical for avoiding 403 errors) + httpReq.Header.Set("X-Amz-Target", endpointConfig.AmzTarget) + httpReq.Header.Set("User-Agent", kiroUserAgent) + httpReq.Header.Set("X-Amz-User-Agent", kiroFullUserAgent) + httpReq.Header.Set("Amz-Sdk-Request", "attempt=1; max=3") + httpReq.Header.Set("Amz-Sdk-Invocation-Id", uuid.New().String()) var attrs map[string]string if auth != nil { @@ -234,27 +356,17 @@ func (e *KiroExecutor) executeWithRetry(ctx context.Context, auth *cliproxyauth. } recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) - // Handle 429 errors (quota exhausted) with origin fallback + // Handle 429 errors (quota exhausted) - try next endpoint + // Each endpoint has its own quota pool, so we can try different endpoints if httpResp.StatusCode == 429 { respBody, _ := io.ReadAll(httpResp.Body) _ = httpResp.Body.Close() appendAPIResponseChunk(ctx, e.cfg, respBody) - // If currently using CLI quota and it's exhausted, switch to AI_EDITOR (Kiro IDE) quota - if currentOrigin == "CLI" { - log.Warnf("kiro: Amazon Q (CLI) quota exhausted (429), switching to Kiro (AI_EDITOR) fallback") - currentOrigin = "AI_EDITOR" - - // Rebuild payload with new origin - kiroPayload = e.buildKiroPayload(body, kiroModelID, profileArn, currentOrigin, isAgentic, isChatOnly) - - // Retry with new origin - continue - } - - // Already on AI_EDITOR or other origin, return the error - log.Debugf("kiro request error, status: %d, body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), respBody)) - return resp, statusErr{code: httpResp.StatusCode, msg: string(respBody)} + log.Warnf("kiro: %s endpoint quota exhausted (429), will try next endpoint", endpointConfig.Name) + + // Break inner retry loop to try next endpoint (which has different quota) + break } // Handle 5xx server errors with exponential backoff retry @@ -277,14 +389,15 @@ func (e *KiroExecutor) executeWithRetry(ctx context.Context, auth *cliproxyauth. return resp, statusErr{code: httpResp.StatusCode, msg: string(respBody)} } - // Handle 401/403 errors with token refresh and retry - if httpResp.StatusCode == 401 || httpResp.StatusCode == 403 { + // Handle 401 errors with token refresh and retry + // 401 = Unauthorized (token expired/invalid) - refresh token + if httpResp.StatusCode == 401 { respBody, _ := io.ReadAll(httpResp.Body) _ = httpResp.Body.Close() appendAPIResponseChunk(ctx, e.cfg, respBody) if attempt < maxRetries { - log.Warnf("kiro: received %d error, attempting token refresh and retry (attempt %d/%d)", httpResp.StatusCode, attempt+1, maxRetries+1) + log.Warnf("kiro: received 401 error, attempting token refresh and retry (attempt %d/%d)", attempt+1, maxRetries+1) refreshedAuth, refreshErr := e.Refresh(ctx, auth) if refreshErr != nil { @@ -302,7 +415,66 @@ func (e *KiroExecutor) executeWithRetry(ctx context.Context, auth *cliproxyauth. } } - log.Debugf("kiro request error, status: %d, body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), respBody)) + log.Warnf("kiro request error, status: 401, body: %s", summarizeErrorBody(httpResp.Header.Get("Content-Type"), respBody)) + return resp, statusErr{code: httpResp.StatusCode, msg: string(respBody)} + } + + // Handle 402 errors - Monthly Limit Reached + if httpResp.StatusCode == 402 { + respBody, _ := io.ReadAll(httpResp.Body) + _ = httpResp.Body.Close() + appendAPIResponseChunk(ctx, e.cfg, respBody) + + log.Warnf("kiro: received 402 (monthly limit). Upstream body: %s", string(respBody)) + + // Return upstream error body directly + return resp, statusErr{code: httpResp.StatusCode, msg: string(respBody)} + } + + // Handle 403 errors - Access Denied / Token Expired + // Do NOT switch endpoints for 403 errors + if httpResp.StatusCode == 403 { + respBody, _ := io.ReadAll(httpResp.Body) + _ = httpResp.Body.Close() + appendAPIResponseChunk(ctx, e.cfg, respBody) + + // Log the 403 error details for debugging + log.Warnf("kiro: received 403 error (attempt %d/%d), body: %s", attempt+1, maxRetries+1, summarizeErrorBody(httpResp.Header.Get("Content-Type"), respBody)) + + respBodyStr := string(respBody) + + // Check for SUSPENDED status - return immediately without retry + if strings.Contains(respBodyStr, "SUSPENDED") || strings.Contains(respBodyStr, "TEMPORARILY_SUSPENDED") { + log.Errorf("kiro: account is suspended, cannot proceed") + return resp, statusErr{code: httpResp.StatusCode, msg: "account suspended: " + string(respBody)} + } + + // Check if this looks like a token-related 403 (some APIs return 403 for expired tokens) + isTokenRelated := strings.Contains(respBodyStr, "token") || + strings.Contains(respBodyStr, "expired") || + strings.Contains(respBodyStr, "invalid") || + strings.Contains(respBodyStr, "unauthorized") + + if isTokenRelated && attempt < maxRetries { + log.Warnf("kiro: 403 appears token-related, attempting token refresh") + refreshedAuth, refreshErr := e.Refresh(ctx, auth) + if refreshErr != nil { + log.Errorf("kiro: token refresh failed: %v", refreshErr) + // Token refresh failed - return error immediately + return resp, statusErr{code: httpResp.StatusCode, msg: string(respBody)} + } + if refreshedAuth != nil { + auth = refreshedAuth + accessToken, profileArn = kiroCredentials(auth) + kiroPayload = e.buildKiroPayload(body, kiroModelID, profileArn, currentOrigin, isAgentic, isChatOnly) + log.Infof("kiro: token refreshed for 403, retrying request") + continue + } + } + + // For non-token 403 or after max retries, return error immediately + // Do NOT switch endpoints for 403 errors + log.Warnf("kiro: 403 error, returning immediately (no endpoint switch)") return resp, statusErr{code: httpResp.StatusCode, msg: string(respBody)} } @@ -362,9 +534,14 @@ func (e *KiroExecutor) executeWithRetry(ctx context.Context, auth *cliproxyauth. out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, kiroResponse, nil) resp = cliproxyexecutor.Response{Payload: []byte(out)} return resp, nil + } + // Inner retry loop exhausted for this endpoint, try next endpoint + // Note: This code is unreachable because all paths in the inner loop + // either return or continue. Kept as comment for documentation. } - return resp, fmt.Errorf("kiro: max retries exceeded") + // All endpoints exhausted + return resp, fmt.Errorf("kiro: all endpoints exhausted") } // ExecuteStream handles streaming requests to Kiro API. @@ -431,12 +608,28 @@ func (e *KiroExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Aut } // executeStreamWithRetry performs the streaming HTTP request with automatic retry on auth errors. -// Supports automatic fallback from CLI (Amazon Q) quota to AI_EDITOR (Kiro IDE) quota on 429. +// Supports automatic fallback between endpoints with different quotas: +// - Amazon Q endpoint (CLI origin) uses Amazon Q Developer quota +// - CodeWhisperer endpoint (AI_EDITOR origin) uses Kiro IDE quota +// Also supports multi-endpoint fallback similar to Antigravity implementation. func (e *KiroExecutor) executeStreamWithRetry(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options, accessToken, profileArn string, kiroPayload, body []byte, from sdktranslator.Format, reporter *usageReporter, currentOrigin, kiroModelID string, isAgentic, isChatOnly bool) (<-chan cliproxyexecutor.StreamChunk, error) { - maxRetries := 2 // Allow retries for token refresh + origin fallback + maxRetries := 2 // Allow retries for token refresh + endpoint fallback + endpointConfigs := getKiroEndpointConfigs(auth) + + for endpointIdx := 0; endpointIdx < len(endpointConfigs); endpointIdx++ { + endpointConfig := endpointConfigs[endpointIdx] + url := endpointConfig.URL + // Use this endpoint's compatible Origin (critical for avoiding 403 errors) + currentOrigin = endpointConfig.Origin + + // Rebuild payload with the correct origin for this endpoint + // Each endpoint requires its matching Origin value in the request body + kiroPayload = e.buildKiroPayload(body, kiroModelID, profileArn, currentOrigin, isAgentic, isChatOnly) + + log.Debugf("kiro: stream trying endpoint %d/%d: %s (Name: %s, Origin: %s)", + endpointIdx+1, len(endpointConfigs), url, endpointConfig.Name, currentOrigin) for attempt := 0; attempt <= maxRetries; attempt++ { - url := kiroEndpoint httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(kiroPayload)) if err != nil { return nil, err @@ -445,11 +638,12 @@ func (e *KiroExecutor) executeStreamWithRetry(ctx context.Context, auth *cliprox httpReq.Header.Set("Content-Type", kiroContentType) httpReq.Header.Set("Authorization", "Bearer "+accessToken) httpReq.Header.Set("Accept", kiroAcceptStream) - httpReq.Header.Set("x-amz-user-agent", kiroUserAgent) - httpReq.Header.Set("User-Agent", kiroFullUserAgent) - httpReq.Header.Set("amz-sdk-request", "attempt=1; max=1") - httpReq.Header.Set("x-amzn-kiro-agent-mode", "vibe") - httpReq.Header.Set("amz-sdk-invocation-id", uuid.New().String()) + // Use endpoint-specific X-Amz-Target (critical for avoiding 403 errors) + httpReq.Header.Set("X-Amz-Target", endpointConfig.AmzTarget) + httpReq.Header.Set("User-Agent", kiroUserAgent) + httpReq.Header.Set("X-Amz-User-Agent", kiroFullUserAgent) + httpReq.Header.Set("Amz-Sdk-Request", "attempt=1; max=3") + httpReq.Header.Set("Amz-Sdk-Invocation-Id", uuid.New().String()) var attrs map[string]string if auth != nil { @@ -483,27 +677,17 @@ func (e *KiroExecutor) executeStreamWithRetry(ctx context.Context, auth *cliprox } recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) - // Handle 429 errors (quota exhausted) with origin fallback + // Handle 429 errors (quota exhausted) - try next endpoint + // Each endpoint has its own quota pool, so we can try different endpoints if httpResp.StatusCode == 429 { respBody, _ := io.ReadAll(httpResp.Body) _ = httpResp.Body.Close() appendAPIResponseChunk(ctx, e.cfg, respBody) - // If currently using CLI quota and it's exhausted, switch to AI_EDITOR (Kiro IDE) quota - if currentOrigin == "CLI" { - log.Warnf("kiro: stream Amazon Q (CLI) quota exhausted (429), switching to Kiro (AI_EDITOR) fallback") - currentOrigin = "AI_EDITOR" - - // Rebuild payload with new origin - kiroPayload = e.buildKiroPayload(body, kiroModelID, profileArn, currentOrigin, isAgentic, isChatOnly) - - // Retry with new origin - continue - } - - // Already on AI_EDITOR or other origin, return the error - log.Debugf("kiro stream error, status: %d, body: %s", httpResp.StatusCode, string(respBody)) - return nil, statusErr{code: httpResp.StatusCode, msg: string(respBody)} + log.Warnf("kiro: stream %s endpoint quota exhausted (429), will try next endpoint", endpointConfig.Name) + + // Break inner retry loop to try next endpoint (which has different quota) + break } // Handle 5xx server errors with exponential backoff retry @@ -526,14 +710,28 @@ func (e *KiroExecutor) executeStreamWithRetry(ctx context.Context, auth *cliprox return nil, statusErr{code: httpResp.StatusCode, msg: string(respBody)} } - // Handle 401/403 errors with token refresh and retry - if httpResp.StatusCode == 401 || httpResp.StatusCode == 403 { + // Handle 400 errors - Credential/Validation issues + // Do NOT switch endpoints - return error immediately + if httpResp.StatusCode == 400 { + respBody, _ := io.ReadAll(httpResp.Body) + _ = httpResp.Body.Close() + appendAPIResponseChunk(ctx, e.cfg, respBody) + + log.Warnf("kiro: received 400 error (attempt %d/%d), body: %s", attempt+1, maxRetries+1, summarizeErrorBody(httpResp.Header.Get("Content-Type"), respBody)) + + // 400 errors indicate request validation issues - return immediately without retry + return nil, statusErr{code: httpResp.StatusCode, msg: string(respBody)} + } + + // Handle 401 errors with token refresh and retry + // 401 = Unauthorized (token expired/invalid) - refresh token + if httpResp.StatusCode == 401 { respBody, _ := io.ReadAll(httpResp.Body) _ = httpResp.Body.Close() appendAPIResponseChunk(ctx, e.cfg, respBody) if attempt < maxRetries { - log.Warnf("kiro: stream received %d error, attempting token refresh and retry (attempt %d/%d)", httpResp.StatusCode, attempt+1, maxRetries+1) + log.Warnf("kiro: stream received 401 error, attempting token refresh and retry (attempt %d/%d)", attempt+1, maxRetries+1) refreshedAuth, refreshErr := e.Refresh(ctx, auth) if refreshErr != nil { @@ -551,7 +749,66 @@ func (e *KiroExecutor) executeStreamWithRetry(ctx context.Context, auth *cliprox } } - log.Debugf("kiro stream error, status: %d, body: %s", httpResp.StatusCode, string(respBody)) + log.Warnf("kiro stream error, status: 401, body: %s", string(respBody)) + return nil, statusErr{code: httpResp.StatusCode, msg: string(respBody)} + } + + // Handle 402 errors - Monthly Limit Reached + if httpResp.StatusCode == 402 { + respBody, _ := io.ReadAll(httpResp.Body) + _ = httpResp.Body.Close() + appendAPIResponseChunk(ctx, e.cfg, respBody) + + log.Warnf("kiro: stream received 402 (monthly limit). Upstream body: %s", string(respBody)) + + // Return upstream error body directly + return nil, statusErr{code: httpResp.StatusCode, msg: string(respBody)} + } + + // Handle 403 errors - Access Denied / Token Expired + // Do NOT switch endpoints for 403 errors + if httpResp.StatusCode == 403 { + respBody, _ := io.ReadAll(httpResp.Body) + _ = httpResp.Body.Close() + appendAPIResponseChunk(ctx, e.cfg, respBody) + + // Log the 403 error details for debugging + log.Warnf("kiro: stream received 403 error (attempt %d/%d), body: %s", attempt+1, maxRetries+1, string(respBody)) + + respBodyStr := string(respBody) + + // Check for SUSPENDED status - return immediately without retry + if strings.Contains(respBodyStr, "SUSPENDED") || strings.Contains(respBodyStr, "TEMPORARILY_SUSPENDED") { + log.Errorf("kiro: account is suspended, cannot proceed") + return nil, statusErr{code: httpResp.StatusCode, msg: "account suspended: " + string(respBody)} + } + + // Check if this looks like a token-related 403 (some APIs return 403 for expired tokens) + isTokenRelated := strings.Contains(respBodyStr, "token") || + strings.Contains(respBodyStr, "expired") || + strings.Contains(respBodyStr, "invalid") || + strings.Contains(respBodyStr, "unauthorized") + + if isTokenRelated && attempt < maxRetries { + log.Warnf("kiro: 403 appears token-related, attempting token refresh") + refreshedAuth, refreshErr := e.Refresh(ctx, auth) + if refreshErr != nil { + log.Errorf("kiro: token refresh failed: %v", refreshErr) + // Token refresh failed - return error immediately + return nil, statusErr{code: httpResp.StatusCode, msg: string(respBody)} + } + if refreshedAuth != nil { + auth = refreshedAuth + accessToken, profileArn = kiroCredentials(auth) + kiroPayload = e.buildKiroPayload(body, kiroModelID, profileArn, currentOrigin, isAgentic, isChatOnly) + log.Infof("kiro: token refreshed for 403, retrying stream request") + continue + } + } + + // For non-token 403 or after max retries, return error immediately + // Do NOT switch endpoints for 403 errors + log.Warnf("kiro: 403 error, returning immediately (no endpoint switch)") return nil, statusErr{code: httpResp.StatusCode, msg: string(respBody)} } @@ -585,9 +842,14 @@ func (e *KiroExecutor) executeStreamWithRetry(ctx context.Context, auth *cliprox }(httpResp) return out, nil + } + // Inner retry loop exhausted for this endpoint, try next endpoint + // Note: This code is unreachable because all paths in the inner loop + // either return or continue. Kept as comment for documentation. } - return nil, fmt.Errorf("kiro: max retries exceeded for stream") + // All endpoints exhausted + return nil, fmt.Errorf("kiro: stream all endpoints exhausted") } @@ -795,6 +1057,7 @@ type kiroToolUse struct { // origin parameter determines which quota to use: "CLI" for Amazon Q, "AI_EDITOR" for Kiro IDE. // isAgentic parameter enables chunked write optimization prompt for -agentic model variants. // isChatOnly parameter disables tool calling for -chat model variants (pure conversation mode). +// Supports thinking mode - when Claude API thinking parameter is present, injects thinkingHint. func (e *KiroExecutor) buildKiroPayload(claudeBody []byte, modelID, profileArn, origin string, isAgentic, isChatOnly bool) []byte { // Normalize origin value for Kiro API compatibility // Kiro API only accepts "CLI" or "AI_EDITOR" as valid origin values @@ -840,6 +1103,39 @@ func (e *KiroExecutor) buildKiroPayload(claudeBody []byte, modelID, profileArn, systemPrompt = systemField.String() } + // Check for thinking parameter in Claude API request + // Claude API format: {"thinking": {"type": "enabled", "budget_tokens": 16000}} + // When thinking is enabled, inject dynamic thinkingHint based on budget_tokens + // This allows reasoning_effort (low/medium/high) to control actual thinking length + thinkingEnabled := false + var budgetTokens int64 = 16000 // Default value (same as OpenAI reasoning_effort "medium") + thinkingField := gjson.GetBytes(claudeBody, "thinking") + if thinkingField.Exists() { + // Check if thinking.type is "enabled" + thinkingType := thinkingField.Get("type").String() + if thinkingType == "enabled" { + thinkingEnabled = true + // Read budget_tokens if specified - this value comes from: + // - Claude API: thinking.budget_tokens directly + // - OpenAI API: reasoning_effort -> budget_tokens (low:4000, medium:16000, high:32000) + if bt := thinkingField.Get("budget_tokens"); bt.Exists() && bt.Int() > 0 { + budgetTokens = bt.Int() + } + log.Debugf("kiro: thinking mode enabled via Claude API parameter, budget_tokens: %d", budgetTokens) + } + } + + // Inject timestamp context for better temporal awareness + // Based on amq2api implementation - helps model understand current time context + timestamp := time.Now().Format("2006-01-02 15:04:05 MST") + timestampContext := fmt.Sprintf("[Context: Current time is %s]", timestamp) + if systemPrompt != "" { + systemPrompt = timestampContext + "\n\n" + systemPrompt + } else { + systemPrompt = timestampContext + } + log.Debugf("kiro: injected timestamp context: %s", timestamp) + // Inject agentic optimization prompt for -agentic model variants // This prevents AWS Kiro API timeouts during large file write operations if isAgentic { @@ -849,6 +1145,20 @@ func (e *KiroExecutor) buildKiroPayload(claudeBody []byte, modelID, profileArn, systemPrompt += kiroAgenticSystemPrompt } + // Inject thinking hint when thinking mode is enabled + // This tells the model to use tags in its response + // DYNAMICALLY set max_thinking_length based on budget_tokens from request + // This respects the reasoning_effort setting: low(4000), medium(16000), high(32000) + if thinkingEnabled { + if systemPrompt != "" { + systemPrompt += "\n" + } + // Build dynamic thinking hint with the actual budget_tokens value + dynamicThinkingHint := fmt.Sprintf("interleaved%d", budgetTokens) + systemPrompt += dynamicThinkingHint + log.Debugf("kiro: injected dynamic thinking hint into system prompt, max_thinking_length: %d", budgetTokens) + } + // Convert Claude tools to Kiro format var kiroTools []kiroToolWrapper if tools.IsArray() { @@ -859,13 +1169,15 @@ func (e *KiroExecutor) buildKiroPayload(claudeBody []byte, modelID, profileArn, // Truncate long descriptions (Kiro API limit is in bytes) // Truncate at valid UTF-8 boundary to avoid breaking multi-byte chars + // Add truncation notice to help model understand the description is incomplete if len(description) > kiroMaxToolDescLen { // Find a valid UTF-8 boundary before the limit - truncLen := kiroMaxToolDescLen + // Reserve space for truncation notice (about 30 bytes) + truncLen := kiroMaxToolDescLen - 30 for truncLen > 0 && !utf8.RuneStart(description[truncLen]) { truncLen-- } - description = description[:truncLen] + "..." + description = description[:truncLen] + "... (description truncated)" } kiroTools = append(kiroTools, kiroToolWrapper{ @@ -1505,6 +1817,14 @@ func (e *KiroExecutor) streamToChannel(ctx context.Context, body io.Reader, out // IMPORTANT: This must persist across all TranslateStream calls var translatorParam any + // Thinking mode state tracking - based on amq2api implementation + // Tracks whether we're inside a block and handles partial tags + inThinkBlock := false + pendingStartTagChars := 0 // Number of chars that might be start of + pendingEndTagChars := 0 // Number of chars that might be start of + isThinkingBlockOpen := false // Track if thinking content block is open + thinkingBlockIndex := -1 // Index of the thinking content block + // Pre-calculate input tokens from request if possible if enc, err := tokenizerForModel(model); err == nil { // Try OpenAI format first, then fall back to raw byte count estimation @@ -1715,7 +2035,7 @@ func (e *KiroExecutor) streamToChannel(ctx context.Context, body io.Reader, out } } - // Handle text content with duplicate detection + // Handle text content with duplicate detection and thinking mode support if contentDelta != "" { // Check for duplicate content - skip if identical to last content event // Based on AIClient-2-API implementation for Kiro @@ -1728,24 +2048,218 @@ func (e *KiroExecutor) streamToChannel(ctx context.Context, body io.Reader, out outputLen += len(contentDelta) // Accumulate content for streaming token calculation accumulatedContent.WriteString(contentDelta) - // Start text content block if needed - if !isTextBlockOpen { - contentBlockIndex++ - isTextBlockOpen = true - blockStart := e.buildClaudeContentBlockStartEvent(contentBlockIndex, "text", "", "") - sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, blockStart, &translatorParam) - for _, chunk := range sseData { - if chunk != "" { - out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")} - } - } + + // Process content with thinking tag detection - based on amq2api implementation + // This handles and tags that may span across chunks + remaining := contentDelta + + // If we have pending start tag chars from previous chunk, prepend them + if pendingStartTagChars > 0 { + remaining = thinkingStartTag[:pendingStartTagChars] + remaining + pendingStartTagChars = 0 + } + + // If we have pending end tag chars from previous chunk, prepend them + if pendingEndTagChars > 0 { + remaining = thinkingEndTag[:pendingEndTagChars] + remaining + pendingEndTagChars = 0 } - claudeEvent := e.buildClaudeStreamEvent(contentDelta, contentBlockIndex) - sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, claudeEvent, &translatorParam) - for _, chunk := range sseData { - if chunk != "" { - out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")} + for len(remaining) > 0 { + if inThinkBlock { + // Inside thinking block - look for end tag + endIdx := strings.Index(remaining, thinkingEndTag) + if endIdx >= 0 { + // Found end tag - emit any content before end tag, then close block + thinkContent := remaining[:endIdx] + if thinkContent != "" { + // TRUE STREAMING: Emit thinking content immediately + // Start thinking block if not open + if !isThinkingBlockOpen { + contentBlockIndex++ + thinkingBlockIndex = contentBlockIndex + isThinkingBlockOpen = true + blockStart := e.buildClaudeContentBlockStartEvent(thinkingBlockIndex, "thinking", "", "") + sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, blockStart, &translatorParam) + for _, chunk := range sseData { + if chunk != "" { + out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")} + } + } + } + + // Send thinking delta immediately + thinkingEvent := e.buildClaudeThinkingDeltaEvent(thinkContent, thinkingBlockIndex) + sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, thinkingEvent, &translatorParam) + for _, chunk := range sseData { + if chunk != "" { + out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")} + } + } + } + + // Note: Partial tag handling is done via pendingEndTagChars + // When the next chunk arrives, the partial tag will be reconstructed + + // Close thinking block + if isThinkingBlockOpen { + blockStop := e.buildClaudeContentBlockStopEvent(thinkingBlockIndex) + sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, blockStop, &translatorParam) + for _, chunk := range sseData { + if chunk != "" { + out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")} + } + } + isThinkingBlockOpen = false + } + + inThinkBlock = false + remaining = remaining[endIdx+len(thinkingEndTag):] + log.Debugf("kiro: exited thinking block") + } else { + // No end tag found - TRUE STREAMING: emit content immediately + // Only save potential partial tag length for next iteration + pendingEnd := pendingTagSuffix(remaining, thinkingEndTag) + + // Calculate content to emit immediately (excluding potential partial tag) + var contentToEmit string + if pendingEnd > 0 { + contentToEmit = remaining[:len(remaining)-pendingEnd] + // Save partial tag length for next iteration (will be reconstructed from thinkingEndTag) + pendingEndTagChars = pendingEnd + } else { + contentToEmit = remaining + } + + // TRUE STREAMING: Emit thinking content immediately + if contentToEmit != "" { + // Start thinking block if not open + if !isThinkingBlockOpen { + contentBlockIndex++ + thinkingBlockIndex = contentBlockIndex + isThinkingBlockOpen = true + blockStart := e.buildClaudeContentBlockStartEvent(thinkingBlockIndex, "thinking", "", "") + sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, blockStart, &translatorParam) + for _, chunk := range sseData { + if chunk != "" { + out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")} + } + } + } + + // Send thinking delta immediately - TRUE STREAMING! + thinkingEvent := e.buildClaudeThinkingDeltaEvent(contentToEmit, thinkingBlockIndex) + sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, thinkingEvent, &translatorParam) + for _, chunk := range sseData { + if chunk != "" { + out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")} + } + } + } + + remaining = "" + } + } else { + // Outside thinking block - look for start tag + startIdx := strings.Index(remaining, thinkingStartTag) + if startIdx >= 0 { + // Found start tag - emit text before it and switch to thinking mode + textBefore := remaining[:startIdx] + if textBefore != "" { + // Start text content block if needed + if !isTextBlockOpen { + contentBlockIndex++ + isTextBlockOpen = true + blockStart := e.buildClaudeContentBlockStartEvent(contentBlockIndex, "text", "", "") + sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, blockStart, &translatorParam) + for _, chunk := range sseData { + if chunk != "" { + out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")} + } + } + } + + claudeEvent := e.buildClaudeStreamEvent(textBefore, contentBlockIndex) + sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, claudeEvent, &translatorParam) + for _, chunk := range sseData { + if chunk != "" { + out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")} + } + } + } + + // Close text block before starting thinking block + if isTextBlockOpen { + blockStop := e.buildClaudeContentBlockStopEvent(contentBlockIndex) + sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, blockStop, &translatorParam) + for _, chunk := range sseData { + if chunk != "" { + out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")} + } + } + isTextBlockOpen = false + } + + inThinkBlock = true + remaining = remaining[startIdx+len(thinkingStartTag):] + log.Debugf("kiro: entered thinking block") + } else { + // No start tag found - check for partial start tag at buffer end + pendingStart := pendingTagSuffix(remaining, thinkingStartTag) + if pendingStart > 0 { + // Emit text except potential partial tag + textToEmit := remaining[:len(remaining)-pendingStart] + if textToEmit != "" { + // Start text content block if needed + if !isTextBlockOpen { + contentBlockIndex++ + isTextBlockOpen = true + blockStart := e.buildClaudeContentBlockStartEvent(contentBlockIndex, "text", "", "") + sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, blockStart, &translatorParam) + for _, chunk := range sseData { + if chunk != "" { + out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")} + } + } + } + + claudeEvent := e.buildClaudeStreamEvent(textToEmit, contentBlockIndex) + sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, claudeEvent, &translatorParam) + for _, chunk := range sseData { + if chunk != "" { + out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")} + } + } + } + pendingStartTagChars = pendingStart + remaining = "" + } else { + // No partial tag - emit all as text + if remaining != "" { + // Start text content block if needed + if !isTextBlockOpen { + contentBlockIndex++ + isTextBlockOpen = true + blockStart := e.buildClaudeContentBlockStartEvent(contentBlockIndex, "text", "", "") + sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, blockStart, &translatorParam) + for _, chunk := range sseData { + if chunk != "" { + out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")} + } + } + } + + claudeEvent := e.buildClaudeStreamEvent(remaining, contentBlockIndex) + sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, claudeEvent, &translatorParam) + for _, chunk := range sseData { + if chunk != "" { + out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")} + } + } + } + remaining = "" + } + } } } } @@ -1981,14 +2495,20 @@ func (e *KiroExecutor) buildClaudeMessageStartEvent(model string, inputTokens in func (e *KiroExecutor) buildClaudeContentBlockStartEvent(index int, blockType, toolUseID, toolName string) []byte { var contentBlock map[string]interface{} - if blockType == "tool_use" { + switch blockType { + case "tool_use": contentBlock = map[string]interface{}{ "type": "tool_use", "id": toolUseID, "name": toolName, "input": map[string]interface{}{}, } - } else { + case "thinking": + contentBlock = map[string]interface{}{ + "type": "thinking", + "thinking": "", + } + default: contentBlock = map[string]interface{}{ "type": "text", "text": "", @@ -2075,6 +2595,40 @@ func (e *KiroExecutor) buildClaudeFinalEvent() []byte { return []byte("event: message_stop\ndata: " + string(result)) } +// buildClaudeThinkingDeltaEvent creates a thinking_delta event for Claude API compatibility. +// This is used when streaming thinking content wrapped in tags. +func (e *KiroExecutor) buildClaudeThinkingDeltaEvent(thinkingDelta string, index int) []byte { + event := map[string]interface{}{ + "type": "content_block_delta", + "index": index, + "delta": map[string]interface{}{ + "type": "thinking_delta", + "thinking": thinkingDelta, + }, + } + result, _ := json.Marshal(event) + return []byte("event: content_block_delta\ndata: " + string(result)) +} + +// pendingTagSuffix detects if the buffer ends with a partial prefix of the given tag. +// Returns the length of the partial match (0 if no match). +// Based on amq2api implementation for handling cross-chunk tag boundaries. +func pendingTagSuffix(buffer, tag string) int { + if buffer == "" || tag == "" { + return 0 + } + maxLen := len(buffer) + if maxLen > len(tag)-1 { + maxLen = len(tag) - 1 + } + for length := maxLen; length > 0; length-- { + if len(buffer) >= length && buffer[len(buffer)-length:] == tag[:length] { + return length + } + } + return 0 +} + // CountTokens is not supported for Kiro provider. // Kiro/Amazon Q backend doesn't expose a token counting API. func (e *KiroExecutor) CountTokens(context.Context, *cliproxyauth.Auth, cliproxyexecutor.Request, cliproxyexecutor.Options) (cliproxyexecutor.Response, error) { diff --git a/internal/translator/kiro/openai/chat-completions/kiro_openai_request.go b/internal/translator/kiro/openai/chat-completions/kiro_openai_request.go index 3d339505..d1094c1c 100644 --- a/internal/translator/kiro/openai/chat-completions/kiro_openai_request.go +++ b/internal/translator/kiro/openai/chat-completions/kiro_openai_request.go @@ -10,9 +10,18 @@ import ( "github.com/tidwall/sjson" ) +// reasoningEffortToBudget maps OpenAI reasoning_effort values to Claude thinking budget_tokens. +// OpenAI uses "low", "medium", "high" while Claude uses numeric budget_tokens. +var reasoningEffortToBudget = map[string]int{ + "low": 4000, + "medium": 16000, + "high": 32000, +} + // ConvertOpenAIRequestToKiro transforms an OpenAI Chat Completions API request into Kiro (Claude) format. // Kiro uses Claude-compatible format internally, so we primarily pass through to Claude format. // Supports tool calling: OpenAI tools -> Claude tools, tool_calls -> tool_use, tool messages -> tool_result. +// Supports reasoning/thinking: OpenAI reasoning_effort -> Claude thinking parameter. func ConvertOpenAIRequestToKiro(modelName string, inputRawJSON []byte, stream bool) []byte { rawJSON := bytes.Clone(inputRawJSON) root := gjson.ParseBytes(rawJSON) @@ -38,6 +47,26 @@ func ConvertOpenAIRequestToKiro(modelName string, inputRawJSON []byte, stream bo out, _ = sjson.Set(out, "top_p", v.Float()) } + // Handle OpenAI reasoning_effort parameter -> Claude thinking parameter + // OpenAI format: {"reasoning_effort": "low"|"medium"|"high"} + // Claude format: {"thinking": {"type": "enabled", "budget_tokens": N}} + if v := root.Get("reasoning_effort"); v.Exists() { + effort := v.String() + if budget, ok := reasoningEffortToBudget[effort]; ok { + thinking := map[string]interface{}{ + "type": "enabled", + "budget_tokens": budget, + } + out, _ = sjson.Set(out, "thinking", thinking) + } + } + + // Also support direct thinking parameter passthrough (for Claude API compatibility) + // Claude format: {"thinking": {"type": "enabled", "budget_tokens": N}} + if v := root.Get("thinking"); v.Exists() && v.IsObject() { + out, _ = sjson.Set(out, "thinking", v.Value()) + } + // Convert OpenAI tools to Claude tools format if tools := root.Get("tools"); tools.Exists() && tools.IsArray() { claudeTools := make([]interface{}, 0) diff --git a/internal/translator/kiro/openai/chat-completions/kiro_openai_response.go b/internal/translator/kiro/openai/chat-completions/kiro_openai_response.go index d56c94ac..2fab2a4d 100644 --- a/internal/translator/kiro/openai/chat-completions/kiro_openai_response.go +++ b/internal/translator/kiro/openai/chat-completions/kiro_openai_response.go @@ -134,6 +134,28 @@ func convertClaudeEventToOpenAI(jsonStr string, model string) []string { result, _ := json.Marshal(response) results = append(results, string(result)) } + } else if deltaType == "thinking_delta" { + // Thinking/reasoning content delta - convert to OpenAI reasoning_content format + thinkingDelta := root.Get("delta.thinking").String() + if thinkingDelta != "" { + response := map[string]interface{}{ + "id": "chatcmpl-" + uuid.New().String()[:24], + "object": "chat.completion.chunk", + "created": time.Now().Unix(), + "model": model, + "choices": []map[string]interface{}{ + { + "index": 0, + "delta": map[string]interface{}{ + "reasoning_content": thinkingDelta, + }, + "finish_reason": nil, + }, + }, + } + result, _ := json.Marshal(response) + results = append(results, string(result)) + } } else if deltaType == "input_json_delta" { // Tool input delta (streaming arguments) partialJSON := root.Get("delta.partial_json").String() @@ -298,6 +320,7 @@ func ConvertKiroResponseToOpenAINonStream(ctx context.Context, model string, ori root := gjson.ParseBytes(rawResponse) var content string + var reasoningContent string var toolCalls []map[string]interface{} contentArray := root.Get("content") @@ -306,6 +329,9 @@ func ConvertKiroResponseToOpenAINonStream(ctx context.Context, model string, ori itemType := item.Get("type").String() if itemType == "text" { content += item.Get("text").String() + } else if itemType == "thinking" { + // Extract thinking/reasoning content + reasoningContent += item.Get("thinking").String() } else if itemType == "tool_use" { // Convert Claude tool_use to OpenAI tool_calls format inputJSON := item.Get("input").String() @@ -339,6 +365,11 @@ func ConvertKiroResponseToOpenAINonStream(ctx context.Context, model string, ori "content": content, } + // Add reasoning_content if present (OpenAI reasoning format) + if reasoningContent != "" { + message["reasoning_content"] = reasoningContent + } + // Add tool_calls if present if len(toolCalls) > 0 { message["tool_calls"] = toolCalls diff --git a/internal/watcher/watcher.go b/internal/watcher/watcher.go index 36276de9..428ab814 100644 --- a/internal/watcher/watcher.go +++ b/internal/watcher/watcher.go @@ -1317,6 +1317,12 @@ func (w *Watcher) SnapshotCoreAuths() []*coreauth.Auth { if kk.AgentTaskType != "" { attrs["agent_task_type"] = kk.AgentTaskType } + if kk.PreferredEndpoint != "" { + attrs["preferred_endpoint"] = kk.PreferredEndpoint + } else if cfg.KiroPreferredEndpoint != "" { + // Apply global default if not overridden by specific key + attrs["preferred_endpoint"] = cfg.KiroPreferredEndpoint + } if refreshToken != "" { attrs["refresh_token"] = refreshToken } @@ -1532,6 +1538,17 @@ func (w *Watcher) SnapshotCoreAuths() []*coreauth.Auth { a.NextRefreshAfter = expiresAt.Add(-30 * time.Minute) } } + + // Apply global preferred endpoint setting if not present in metadata + if cfg.KiroPreferredEndpoint != "" { + // Check if already set in metadata (which takes precedence in executor) + if _, hasMeta := metadata["preferred_endpoint"]; !hasMeta { + if a.Attributes == nil { + a.Attributes = make(map[string]string) + } + a.Attributes["preferred_endpoint"] = cfg.KiroPreferredEndpoint + } + } } applyAuthExcludedModelsMeta(a, cfg, nil, "oauth")