feat(kiro): add multi-endpoint fallback & thinking mode support

This commit is contained in:
Ravens2121
2025-12-12 13:43:36 +08:00
parent 204bba9dea
commit 84920cb670
7 changed files with 787 additions and 90 deletions

View File

@@ -31,18 +31,23 @@ import (
)
const (
// kiroEndpoint is the CodeWhisperer streaming endpoint for chat API (GenerateAssistantResponse).
// Based on AIClient-2-API reference implementation.
// Note: Amazon Q uses a different endpoint (q.us-east-1.amazonaws.com) with different request format.
kiroEndpoint = "https://codewhisperer.us-east-1.amazonaws.com/generateAssistantResponse"
kiroContentType = "application/json"
kiroAcceptStream = "application/json"
// Kiro API common constants
kiroContentType = "application/x-amz-json-1.0"
kiroAcceptStream = "*/*"
kiroMaxMessageSize = 10 * 1024 * 1024 // 10MB max message size for event stream
kiroMaxToolDescLen = 10237 // Kiro API limit is 10240 bytes, leave room for "..."
// kiroUserAgent matches AIClient-2-API format for x-amz-user-agent header
kiroUserAgent = "aws-sdk-js/1.0.7 KiroIDE-0.1.25"
// kiroFullUserAgent is the complete user-agent header matching AIClient-2-API
kiroFullUserAgent = "aws-sdk-js/1.0.7 ua/2.1 os/linux lang/go api/codewhispererstreaming#1.0.7 m/E KiroIDE-0.1.25"
// kiroUserAgent matches amq2api format for User-Agent header
kiroUserAgent = "aws-sdk-rust/1.3.9 os/macos lang/rust/1.87.0"
// kiroFullUserAgent is the complete x-amz-user-agent header matching amq2api
kiroFullUserAgent = "aws-sdk-rust/1.3.9 ua/2.1 api/ssooidc/1.88.0 os/macos lang/rust/1.87.0 m/E app/AmazonQ-For-CLI"
// Thinking mode support - based on amq2api implementation
// These tags wrap reasoning content in the response stream
thinkingStartTag = "<thinking>"
thinkingEndTag = "</thinking>"
// thinkingHint is injected into the request to enable interleaved thinking mode
// This tells the model to use thinking tags and sets the max thinking length
thinkingHint = "<thinking_mode>interleaved</thinking_mode><max_thinking_length>16000</max_thinking_length>"
// kiroAgenticSystemPrompt is injected only for -agentic models to prevent timeouts on large writes.
// AWS Kiro API has a 2-3 minute timeout for large file write operations.
@@ -97,6 +102,106 @@ You MUST follow these rules for ALL file operations. Violation causes server tim
REMEMBER: When in doubt, write LESS per operation. Multiple small operations > one large operation.`
)
// kiroEndpointConfig bundles endpoint URL with its compatible Origin and AmzTarget values.
// This solves the "triple mismatch" problem where different endpoints require matching
// Origin and X-Amz-Target header values.
//
// Based on reference implementations:
// - amq2api-main: Uses Amazon Q endpoint with CLI origin and AmazonQDeveloperStreamingService target
// - AIClient-2-API: Uses CodeWhisperer endpoint with AI_EDITOR origin and AmazonCodeWhispererStreamingService target
type kiroEndpointConfig struct {
URL string // Endpoint URL
Origin string // Request Origin: "CLI" for Amazon Q quota, "AI_EDITOR" for Kiro IDE quota
AmzTarget string // X-Amz-Target header value
Name string // Endpoint name for logging
}
// kiroEndpointConfigs defines the available Kiro API endpoints with their compatible configurations.
// The order determines fallback priority: primary endpoint first, then fallbacks.
//
// CRITICAL: Each endpoint MUST use its compatible Origin and AmzTarget values:
// - CodeWhisperer endpoint (codewhisperer.us-east-1.amazonaws.com): Uses AI_EDITOR origin and AmazonCodeWhispererStreamingService target
// - Amazon Q endpoint (q.us-east-1.amazonaws.com): Uses CLI origin and AmazonQDeveloperStreamingService target
//
// Mismatched combinations will result in 403 Forbidden errors.
//
// NOTE: CodeWhisperer is set as the default endpoint because:
// 1. Most tokens come from Kiro IDE / VSCode extensions (AWS Builder ID auth)
// 2. These tokens use AI_EDITOR origin which is only compatible with CodeWhisperer endpoint
// 3. Amazon Q endpoint requires CLI origin which is for Amazon Q CLI tokens
// This matches the AIClient-2-API-main project's configuration.
var kiroEndpointConfigs = []kiroEndpointConfig{
{
URL: "https://codewhisperer.us-east-1.amazonaws.com/generateAssistantResponse",
Origin: "AI_EDITOR",
AmzTarget: "AmazonCodeWhispererStreamingService.GenerateAssistantResponse",
Name: "CodeWhisperer",
},
{
URL: "https://q.us-east-1.amazonaws.com/",
Origin: "CLI",
AmzTarget: "AmazonQDeveloperStreamingService.SendMessage",
Name: "AmazonQ",
},
}
// getKiroEndpointConfigs returns the list of Kiro API endpoint configurations to try in order.
// Supports reordering based on "preferred_endpoint" in auth metadata/attributes.
func getKiroEndpointConfigs(auth *cliproxyauth.Auth) []kiroEndpointConfig {
if auth == nil {
return kiroEndpointConfigs
}
// Check for preference
var preference string
if auth.Metadata != nil {
if p, ok := auth.Metadata["preferred_endpoint"].(string); ok {
preference = p
}
}
// Check attributes as fallback (e.g. from HTTP headers)
if preference == "" && auth.Attributes != nil {
preference = auth.Attributes["preferred_endpoint"]
}
if preference == "" {
return kiroEndpointConfigs
}
preference = strings.ToLower(strings.TrimSpace(preference))
// Create new slice to avoid modifying global state
var sorted []kiroEndpointConfig
var remaining []kiroEndpointConfig
for _, cfg := range kiroEndpointConfigs {
name := strings.ToLower(cfg.Name)
// Check for matches
// CodeWhisperer aliases: codewhisperer, ide
// AmazonQ aliases: amazonq, q, cli
isMatch := false
if (preference == "codewhisperer" || preference == "ide") && name == "codewhisperer" {
isMatch = true
} else if (preference == "amazonq" || preference == "q" || preference == "cli") && name == "amazonq" {
isMatch = true
}
if isMatch {
sorted = append(sorted, cfg)
} else {
remaining = append(remaining, cfg)
}
}
// If preference didn't match anything, return default
if len(sorted) == 0 {
return kiroEndpointConfigs
}
// Combine: preferred first, then others
return append(sorted, remaining...)
}
// KiroExecutor handles requests to AWS CodeWhisperer (Kiro) API.
type KiroExecutor struct {
cfg *config.Config
@@ -181,13 +286,29 @@ func (e *KiroExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req
}
// executeWithRetry performs the actual HTTP request with automatic retry on auth errors.
// Supports automatic fallback from CLI (Amazon Q) quota to AI_EDITOR (Kiro IDE) quota on 429.
// Supports automatic fallback between endpoints with different quotas:
// - Amazon Q endpoint (CLI origin) uses Amazon Q Developer quota
// - CodeWhisperer endpoint (AI_EDITOR origin) uses Kiro IDE quota
// Also supports multi-endpoint fallback similar to Antigravity implementation.
func (e *KiroExecutor) executeWithRetry(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options, accessToken, profileArn string, kiroPayload, body []byte, from, to sdktranslator.Format, reporter *usageReporter, currentOrigin, kiroModelID string, isAgentic, isChatOnly bool) (cliproxyexecutor.Response, error) {
var resp cliproxyexecutor.Response
maxRetries := 2 // Allow retries for token refresh + origin fallback
maxRetries := 2 // Allow retries for token refresh + endpoint fallback
endpointConfigs := getKiroEndpointConfigs(auth)
for endpointIdx := 0; endpointIdx < len(endpointConfigs); endpointIdx++ {
endpointConfig := endpointConfigs[endpointIdx]
url := endpointConfig.URL
// Use this endpoint's compatible Origin (critical for avoiding 403 errors)
currentOrigin = endpointConfig.Origin
// Rebuild payload with the correct origin for this endpoint
// Each endpoint requires its matching Origin value in the request body
kiroPayload = e.buildKiroPayload(body, kiroModelID, profileArn, currentOrigin, isAgentic, isChatOnly)
log.Debugf("kiro: trying endpoint %d/%d: %s (Name: %s, Origin: %s)",
endpointIdx+1, len(endpointConfigs), url, endpointConfig.Name, currentOrigin)
for attempt := 0; attempt <= maxRetries; attempt++ {
url := kiroEndpoint
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(kiroPayload))
if err != nil {
return resp, err
@@ -196,11 +317,12 @@ func (e *KiroExecutor) executeWithRetry(ctx context.Context, auth *cliproxyauth.
httpReq.Header.Set("Content-Type", kiroContentType)
httpReq.Header.Set("Authorization", "Bearer "+accessToken)
httpReq.Header.Set("Accept", kiroAcceptStream)
httpReq.Header.Set("x-amz-user-agent", kiroUserAgent)
httpReq.Header.Set("User-Agent", kiroFullUserAgent)
httpReq.Header.Set("amz-sdk-request", "attempt=1; max=1")
httpReq.Header.Set("x-amzn-kiro-agent-mode", "vibe")
httpReq.Header.Set("amz-sdk-invocation-id", uuid.New().String())
// Use endpoint-specific X-Amz-Target (critical for avoiding 403 errors)
httpReq.Header.Set("X-Amz-Target", endpointConfig.AmzTarget)
httpReq.Header.Set("User-Agent", kiroUserAgent)
httpReq.Header.Set("X-Amz-User-Agent", kiroFullUserAgent)
httpReq.Header.Set("Amz-Sdk-Request", "attempt=1; max=3")
httpReq.Header.Set("Amz-Sdk-Invocation-Id", uuid.New().String())
var attrs map[string]string
if auth != nil {
@@ -234,27 +356,17 @@ func (e *KiroExecutor) executeWithRetry(ctx context.Context, auth *cliproxyauth.
}
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
// Handle 429 errors (quota exhausted) with origin fallback
// Handle 429 errors (quota exhausted) - try next endpoint
// Each endpoint has its own quota pool, so we can try different endpoints
if httpResp.StatusCode == 429 {
respBody, _ := io.ReadAll(httpResp.Body)
_ = httpResp.Body.Close()
appendAPIResponseChunk(ctx, e.cfg, respBody)
// If currently using CLI quota and it's exhausted, switch to AI_EDITOR (Kiro IDE) quota
if currentOrigin == "CLI" {
log.Warnf("kiro: Amazon Q (CLI) quota exhausted (429), switching to Kiro (AI_EDITOR) fallback")
currentOrigin = "AI_EDITOR"
// Rebuild payload with new origin
kiroPayload = e.buildKiroPayload(body, kiroModelID, profileArn, currentOrigin, isAgentic, isChatOnly)
// Retry with new origin
continue
}
// Already on AI_EDITOR or other origin, return the error
log.Debugf("kiro request error, status: %d, body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), respBody))
return resp, statusErr{code: httpResp.StatusCode, msg: string(respBody)}
log.Warnf("kiro: %s endpoint quota exhausted (429), will try next endpoint", endpointConfig.Name)
// Break inner retry loop to try next endpoint (which has different quota)
break
}
// Handle 5xx server errors with exponential backoff retry
@@ -277,14 +389,15 @@ func (e *KiroExecutor) executeWithRetry(ctx context.Context, auth *cliproxyauth.
return resp, statusErr{code: httpResp.StatusCode, msg: string(respBody)}
}
// Handle 401/403 errors with token refresh and retry
if httpResp.StatusCode == 401 || httpResp.StatusCode == 403 {
// Handle 401 errors with token refresh and retry
// 401 = Unauthorized (token expired/invalid) - refresh token
if httpResp.StatusCode == 401 {
respBody, _ := io.ReadAll(httpResp.Body)
_ = httpResp.Body.Close()
appendAPIResponseChunk(ctx, e.cfg, respBody)
if attempt < maxRetries {
log.Warnf("kiro: received %d error, attempting token refresh and retry (attempt %d/%d)", httpResp.StatusCode, attempt+1, maxRetries+1)
log.Warnf("kiro: received 401 error, attempting token refresh and retry (attempt %d/%d)", attempt+1, maxRetries+1)
refreshedAuth, refreshErr := e.Refresh(ctx, auth)
if refreshErr != nil {
@@ -302,7 +415,66 @@ func (e *KiroExecutor) executeWithRetry(ctx context.Context, auth *cliproxyauth.
}
}
log.Debugf("kiro request error, status: %d, body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), respBody))
log.Warnf("kiro request error, status: 401, body: %s", summarizeErrorBody(httpResp.Header.Get("Content-Type"), respBody))
return resp, statusErr{code: httpResp.StatusCode, msg: string(respBody)}
}
// Handle 402 errors - Monthly Limit Reached
if httpResp.StatusCode == 402 {
respBody, _ := io.ReadAll(httpResp.Body)
_ = httpResp.Body.Close()
appendAPIResponseChunk(ctx, e.cfg, respBody)
log.Warnf("kiro: received 402 (monthly limit). Upstream body: %s", string(respBody))
// Return upstream error body directly
return resp, statusErr{code: httpResp.StatusCode, msg: string(respBody)}
}
// Handle 403 errors - Access Denied / Token Expired
// Do NOT switch endpoints for 403 errors
if httpResp.StatusCode == 403 {
respBody, _ := io.ReadAll(httpResp.Body)
_ = httpResp.Body.Close()
appendAPIResponseChunk(ctx, e.cfg, respBody)
// Log the 403 error details for debugging
log.Warnf("kiro: received 403 error (attempt %d/%d), body: %s", attempt+1, maxRetries+1, summarizeErrorBody(httpResp.Header.Get("Content-Type"), respBody))
respBodyStr := string(respBody)
// Check for SUSPENDED status - return immediately without retry
if strings.Contains(respBodyStr, "SUSPENDED") || strings.Contains(respBodyStr, "TEMPORARILY_SUSPENDED") {
log.Errorf("kiro: account is suspended, cannot proceed")
return resp, statusErr{code: httpResp.StatusCode, msg: "account suspended: " + string(respBody)}
}
// Check if this looks like a token-related 403 (some APIs return 403 for expired tokens)
isTokenRelated := strings.Contains(respBodyStr, "token") ||
strings.Contains(respBodyStr, "expired") ||
strings.Contains(respBodyStr, "invalid") ||
strings.Contains(respBodyStr, "unauthorized")
if isTokenRelated && attempt < maxRetries {
log.Warnf("kiro: 403 appears token-related, attempting token refresh")
refreshedAuth, refreshErr := e.Refresh(ctx, auth)
if refreshErr != nil {
log.Errorf("kiro: token refresh failed: %v", refreshErr)
// Token refresh failed - return error immediately
return resp, statusErr{code: httpResp.StatusCode, msg: string(respBody)}
}
if refreshedAuth != nil {
auth = refreshedAuth
accessToken, profileArn = kiroCredentials(auth)
kiroPayload = e.buildKiroPayload(body, kiroModelID, profileArn, currentOrigin, isAgentic, isChatOnly)
log.Infof("kiro: token refreshed for 403, retrying request")
continue
}
}
// For non-token 403 or after max retries, return error immediately
// Do NOT switch endpoints for 403 errors
log.Warnf("kiro: 403 error, returning immediately (no endpoint switch)")
return resp, statusErr{code: httpResp.StatusCode, msg: string(respBody)}
}
@@ -362,9 +534,14 @@ func (e *KiroExecutor) executeWithRetry(ctx context.Context, auth *cliproxyauth.
out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, kiroResponse, nil)
resp = cliproxyexecutor.Response{Payload: []byte(out)}
return resp, nil
}
// Inner retry loop exhausted for this endpoint, try next endpoint
// Note: This code is unreachable because all paths in the inner loop
// either return or continue. Kept as comment for documentation.
}
return resp, fmt.Errorf("kiro: max retries exceeded")
// All endpoints exhausted
return resp, fmt.Errorf("kiro: all endpoints exhausted")
}
// ExecuteStream handles streaming requests to Kiro API.
@@ -431,12 +608,28 @@ func (e *KiroExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Aut
}
// executeStreamWithRetry performs the streaming HTTP request with automatic retry on auth errors.
// Supports automatic fallback from CLI (Amazon Q) quota to AI_EDITOR (Kiro IDE) quota on 429.
// Supports automatic fallback between endpoints with different quotas:
// - Amazon Q endpoint (CLI origin) uses Amazon Q Developer quota
// - CodeWhisperer endpoint (AI_EDITOR origin) uses Kiro IDE quota
// Also supports multi-endpoint fallback similar to Antigravity implementation.
func (e *KiroExecutor) executeStreamWithRetry(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options, accessToken, profileArn string, kiroPayload, body []byte, from sdktranslator.Format, reporter *usageReporter, currentOrigin, kiroModelID string, isAgentic, isChatOnly bool) (<-chan cliproxyexecutor.StreamChunk, error) {
maxRetries := 2 // Allow retries for token refresh + origin fallback
maxRetries := 2 // Allow retries for token refresh + endpoint fallback
endpointConfigs := getKiroEndpointConfigs(auth)
for endpointIdx := 0; endpointIdx < len(endpointConfigs); endpointIdx++ {
endpointConfig := endpointConfigs[endpointIdx]
url := endpointConfig.URL
// Use this endpoint's compatible Origin (critical for avoiding 403 errors)
currentOrigin = endpointConfig.Origin
// Rebuild payload with the correct origin for this endpoint
// Each endpoint requires its matching Origin value in the request body
kiroPayload = e.buildKiroPayload(body, kiroModelID, profileArn, currentOrigin, isAgentic, isChatOnly)
log.Debugf("kiro: stream trying endpoint %d/%d: %s (Name: %s, Origin: %s)",
endpointIdx+1, len(endpointConfigs), url, endpointConfig.Name, currentOrigin)
for attempt := 0; attempt <= maxRetries; attempt++ {
url := kiroEndpoint
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(kiroPayload))
if err != nil {
return nil, err
@@ -445,11 +638,12 @@ func (e *KiroExecutor) executeStreamWithRetry(ctx context.Context, auth *cliprox
httpReq.Header.Set("Content-Type", kiroContentType)
httpReq.Header.Set("Authorization", "Bearer "+accessToken)
httpReq.Header.Set("Accept", kiroAcceptStream)
httpReq.Header.Set("x-amz-user-agent", kiroUserAgent)
httpReq.Header.Set("User-Agent", kiroFullUserAgent)
httpReq.Header.Set("amz-sdk-request", "attempt=1; max=1")
httpReq.Header.Set("x-amzn-kiro-agent-mode", "vibe")
httpReq.Header.Set("amz-sdk-invocation-id", uuid.New().String())
// Use endpoint-specific X-Amz-Target (critical for avoiding 403 errors)
httpReq.Header.Set("X-Amz-Target", endpointConfig.AmzTarget)
httpReq.Header.Set("User-Agent", kiroUserAgent)
httpReq.Header.Set("X-Amz-User-Agent", kiroFullUserAgent)
httpReq.Header.Set("Amz-Sdk-Request", "attempt=1; max=3")
httpReq.Header.Set("Amz-Sdk-Invocation-Id", uuid.New().String())
var attrs map[string]string
if auth != nil {
@@ -483,27 +677,17 @@ func (e *KiroExecutor) executeStreamWithRetry(ctx context.Context, auth *cliprox
}
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
// Handle 429 errors (quota exhausted) with origin fallback
// Handle 429 errors (quota exhausted) - try next endpoint
// Each endpoint has its own quota pool, so we can try different endpoints
if httpResp.StatusCode == 429 {
respBody, _ := io.ReadAll(httpResp.Body)
_ = httpResp.Body.Close()
appendAPIResponseChunk(ctx, e.cfg, respBody)
// If currently using CLI quota and it's exhausted, switch to AI_EDITOR (Kiro IDE) quota
if currentOrigin == "CLI" {
log.Warnf("kiro: stream Amazon Q (CLI) quota exhausted (429), switching to Kiro (AI_EDITOR) fallback")
currentOrigin = "AI_EDITOR"
// Rebuild payload with new origin
kiroPayload = e.buildKiroPayload(body, kiroModelID, profileArn, currentOrigin, isAgentic, isChatOnly)
// Retry with new origin
continue
}
// Already on AI_EDITOR or other origin, return the error
log.Debugf("kiro stream error, status: %d, body: %s", httpResp.StatusCode, string(respBody))
return nil, statusErr{code: httpResp.StatusCode, msg: string(respBody)}
log.Warnf("kiro: stream %s endpoint quota exhausted (429), will try next endpoint", endpointConfig.Name)
// Break inner retry loop to try next endpoint (which has different quota)
break
}
// Handle 5xx server errors with exponential backoff retry
@@ -526,14 +710,28 @@ func (e *KiroExecutor) executeStreamWithRetry(ctx context.Context, auth *cliprox
return nil, statusErr{code: httpResp.StatusCode, msg: string(respBody)}
}
// Handle 401/403 errors with token refresh and retry
if httpResp.StatusCode == 401 || httpResp.StatusCode == 403 {
// Handle 400 errors - Credential/Validation issues
// Do NOT switch endpoints - return error immediately
if httpResp.StatusCode == 400 {
respBody, _ := io.ReadAll(httpResp.Body)
_ = httpResp.Body.Close()
appendAPIResponseChunk(ctx, e.cfg, respBody)
log.Warnf("kiro: received 400 error (attempt %d/%d), body: %s", attempt+1, maxRetries+1, summarizeErrorBody(httpResp.Header.Get("Content-Type"), respBody))
// 400 errors indicate request validation issues - return immediately without retry
return nil, statusErr{code: httpResp.StatusCode, msg: string(respBody)}
}
// Handle 401 errors with token refresh and retry
// 401 = Unauthorized (token expired/invalid) - refresh token
if httpResp.StatusCode == 401 {
respBody, _ := io.ReadAll(httpResp.Body)
_ = httpResp.Body.Close()
appendAPIResponseChunk(ctx, e.cfg, respBody)
if attempt < maxRetries {
log.Warnf("kiro: stream received %d error, attempting token refresh and retry (attempt %d/%d)", httpResp.StatusCode, attempt+1, maxRetries+1)
log.Warnf("kiro: stream received 401 error, attempting token refresh and retry (attempt %d/%d)", attempt+1, maxRetries+1)
refreshedAuth, refreshErr := e.Refresh(ctx, auth)
if refreshErr != nil {
@@ -551,7 +749,66 @@ func (e *KiroExecutor) executeStreamWithRetry(ctx context.Context, auth *cliprox
}
}
log.Debugf("kiro stream error, status: %d, body: %s", httpResp.StatusCode, string(respBody))
log.Warnf("kiro stream error, status: 401, body: %s", string(respBody))
return nil, statusErr{code: httpResp.StatusCode, msg: string(respBody)}
}
// Handle 402 errors - Monthly Limit Reached
if httpResp.StatusCode == 402 {
respBody, _ := io.ReadAll(httpResp.Body)
_ = httpResp.Body.Close()
appendAPIResponseChunk(ctx, e.cfg, respBody)
log.Warnf("kiro: stream received 402 (monthly limit). Upstream body: %s", string(respBody))
// Return upstream error body directly
return nil, statusErr{code: httpResp.StatusCode, msg: string(respBody)}
}
// Handle 403 errors - Access Denied / Token Expired
// Do NOT switch endpoints for 403 errors
if httpResp.StatusCode == 403 {
respBody, _ := io.ReadAll(httpResp.Body)
_ = httpResp.Body.Close()
appendAPIResponseChunk(ctx, e.cfg, respBody)
// Log the 403 error details for debugging
log.Warnf("kiro: stream received 403 error (attempt %d/%d), body: %s", attempt+1, maxRetries+1, string(respBody))
respBodyStr := string(respBody)
// Check for SUSPENDED status - return immediately without retry
if strings.Contains(respBodyStr, "SUSPENDED") || strings.Contains(respBodyStr, "TEMPORARILY_SUSPENDED") {
log.Errorf("kiro: account is suspended, cannot proceed")
return nil, statusErr{code: httpResp.StatusCode, msg: "account suspended: " + string(respBody)}
}
// Check if this looks like a token-related 403 (some APIs return 403 for expired tokens)
isTokenRelated := strings.Contains(respBodyStr, "token") ||
strings.Contains(respBodyStr, "expired") ||
strings.Contains(respBodyStr, "invalid") ||
strings.Contains(respBodyStr, "unauthorized")
if isTokenRelated && attempt < maxRetries {
log.Warnf("kiro: 403 appears token-related, attempting token refresh")
refreshedAuth, refreshErr := e.Refresh(ctx, auth)
if refreshErr != nil {
log.Errorf("kiro: token refresh failed: %v", refreshErr)
// Token refresh failed - return error immediately
return nil, statusErr{code: httpResp.StatusCode, msg: string(respBody)}
}
if refreshedAuth != nil {
auth = refreshedAuth
accessToken, profileArn = kiroCredentials(auth)
kiroPayload = e.buildKiroPayload(body, kiroModelID, profileArn, currentOrigin, isAgentic, isChatOnly)
log.Infof("kiro: token refreshed for 403, retrying stream request")
continue
}
}
// For non-token 403 or after max retries, return error immediately
// Do NOT switch endpoints for 403 errors
log.Warnf("kiro: 403 error, returning immediately (no endpoint switch)")
return nil, statusErr{code: httpResp.StatusCode, msg: string(respBody)}
}
@@ -585,9 +842,14 @@ func (e *KiroExecutor) executeStreamWithRetry(ctx context.Context, auth *cliprox
}(httpResp)
return out, nil
}
// Inner retry loop exhausted for this endpoint, try next endpoint
// Note: This code is unreachable because all paths in the inner loop
// either return or continue. Kept as comment for documentation.
}
return nil, fmt.Errorf("kiro: max retries exceeded for stream")
// All endpoints exhausted
return nil, fmt.Errorf("kiro: stream all endpoints exhausted")
}
@@ -795,6 +1057,7 @@ type kiroToolUse struct {
// origin parameter determines which quota to use: "CLI" for Amazon Q, "AI_EDITOR" for Kiro IDE.
// isAgentic parameter enables chunked write optimization prompt for -agentic model variants.
// isChatOnly parameter disables tool calling for -chat model variants (pure conversation mode).
// Supports thinking mode - when Claude API thinking parameter is present, injects thinkingHint.
func (e *KiroExecutor) buildKiroPayload(claudeBody []byte, modelID, profileArn, origin string, isAgentic, isChatOnly bool) []byte {
// Normalize origin value for Kiro API compatibility
// Kiro API only accepts "CLI" or "AI_EDITOR" as valid origin values
@@ -840,6 +1103,39 @@ func (e *KiroExecutor) buildKiroPayload(claudeBody []byte, modelID, profileArn,
systemPrompt = systemField.String()
}
// Check for thinking parameter in Claude API request
// Claude API format: {"thinking": {"type": "enabled", "budget_tokens": 16000}}
// When thinking is enabled, inject dynamic thinkingHint based on budget_tokens
// This allows reasoning_effort (low/medium/high) to control actual thinking length
thinkingEnabled := false
var budgetTokens int64 = 16000 // Default value (same as OpenAI reasoning_effort "medium")
thinkingField := gjson.GetBytes(claudeBody, "thinking")
if thinkingField.Exists() {
// Check if thinking.type is "enabled"
thinkingType := thinkingField.Get("type").String()
if thinkingType == "enabled" {
thinkingEnabled = true
// Read budget_tokens if specified - this value comes from:
// - Claude API: thinking.budget_tokens directly
// - OpenAI API: reasoning_effort -> budget_tokens (low:4000, medium:16000, high:32000)
if bt := thinkingField.Get("budget_tokens"); bt.Exists() && bt.Int() > 0 {
budgetTokens = bt.Int()
}
log.Debugf("kiro: thinking mode enabled via Claude API parameter, budget_tokens: %d", budgetTokens)
}
}
// Inject timestamp context for better temporal awareness
// Based on amq2api implementation - helps model understand current time context
timestamp := time.Now().Format("2006-01-02 15:04:05 MST")
timestampContext := fmt.Sprintf("[Context: Current time is %s]", timestamp)
if systemPrompt != "" {
systemPrompt = timestampContext + "\n\n" + systemPrompt
} else {
systemPrompt = timestampContext
}
log.Debugf("kiro: injected timestamp context: %s", timestamp)
// Inject agentic optimization prompt for -agentic model variants
// This prevents AWS Kiro API timeouts during large file write operations
if isAgentic {
@@ -849,6 +1145,20 @@ func (e *KiroExecutor) buildKiroPayload(claudeBody []byte, modelID, profileArn,
systemPrompt += kiroAgenticSystemPrompt
}
// Inject thinking hint when thinking mode is enabled
// This tells the model to use <thinking> tags in its response
// DYNAMICALLY set max_thinking_length based on budget_tokens from request
// This respects the reasoning_effort setting: low(4000), medium(16000), high(32000)
if thinkingEnabled {
if systemPrompt != "" {
systemPrompt += "\n"
}
// Build dynamic thinking hint with the actual budget_tokens value
dynamicThinkingHint := fmt.Sprintf("<thinking_mode>interleaved</thinking_mode><max_thinking_length>%d</max_thinking_length>", budgetTokens)
systemPrompt += dynamicThinkingHint
log.Debugf("kiro: injected dynamic thinking hint into system prompt, max_thinking_length: %d", budgetTokens)
}
// Convert Claude tools to Kiro format
var kiroTools []kiroToolWrapper
if tools.IsArray() {
@@ -859,13 +1169,15 @@ func (e *KiroExecutor) buildKiroPayload(claudeBody []byte, modelID, profileArn,
// Truncate long descriptions (Kiro API limit is in bytes)
// Truncate at valid UTF-8 boundary to avoid breaking multi-byte chars
// Add truncation notice to help model understand the description is incomplete
if len(description) > kiroMaxToolDescLen {
// Find a valid UTF-8 boundary before the limit
truncLen := kiroMaxToolDescLen
// Reserve space for truncation notice (about 30 bytes)
truncLen := kiroMaxToolDescLen - 30
for truncLen > 0 && !utf8.RuneStart(description[truncLen]) {
truncLen--
}
description = description[:truncLen] + "..."
description = description[:truncLen] + "... (description truncated)"
}
kiroTools = append(kiroTools, kiroToolWrapper{
@@ -1505,6 +1817,14 @@ func (e *KiroExecutor) streamToChannel(ctx context.Context, body io.Reader, out
// IMPORTANT: This must persist across all TranslateStream calls
var translatorParam any
// Thinking mode state tracking - based on amq2api implementation
// Tracks whether we're inside a <thinking> block and handles partial tags
inThinkBlock := false
pendingStartTagChars := 0 // Number of chars that might be start of <thinking>
pendingEndTagChars := 0 // Number of chars that might be start of </thinking>
isThinkingBlockOpen := false // Track if thinking content block is open
thinkingBlockIndex := -1 // Index of the thinking content block
// Pre-calculate input tokens from request if possible
if enc, err := tokenizerForModel(model); err == nil {
// Try OpenAI format first, then fall back to raw byte count estimation
@@ -1715,7 +2035,7 @@ func (e *KiroExecutor) streamToChannel(ctx context.Context, body io.Reader, out
}
}
// Handle text content with duplicate detection
// Handle text content with duplicate detection and thinking mode support
if contentDelta != "" {
// Check for duplicate content - skip if identical to last content event
// Based on AIClient-2-API implementation for Kiro
@@ -1728,24 +2048,218 @@ func (e *KiroExecutor) streamToChannel(ctx context.Context, body io.Reader, out
outputLen += len(contentDelta)
// Accumulate content for streaming token calculation
accumulatedContent.WriteString(contentDelta)
// Start text content block if needed
if !isTextBlockOpen {
contentBlockIndex++
isTextBlockOpen = true
blockStart := e.buildClaudeContentBlockStartEvent(contentBlockIndex, "text", "", "")
sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, blockStart, &translatorParam)
for _, chunk := range sseData {
if chunk != "" {
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")}
}
}
// Process content with thinking tag detection - based on amq2api implementation
// This handles <thinking> and </thinking> tags that may span across chunks
remaining := contentDelta
// If we have pending start tag chars from previous chunk, prepend them
if pendingStartTagChars > 0 {
remaining = thinkingStartTag[:pendingStartTagChars] + remaining
pendingStartTagChars = 0
}
// If we have pending end tag chars from previous chunk, prepend them
if pendingEndTagChars > 0 {
remaining = thinkingEndTag[:pendingEndTagChars] + remaining
pendingEndTagChars = 0
}
claudeEvent := e.buildClaudeStreamEvent(contentDelta, contentBlockIndex)
sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, claudeEvent, &translatorParam)
for _, chunk := range sseData {
if chunk != "" {
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")}
for len(remaining) > 0 {
if inThinkBlock {
// Inside thinking block - look for </thinking> end tag
endIdx := strings.Index(remaining, thinkingEndTag)
if endIdx >= 0 {
// Found end tag - emit any content before end tag, then close block
thinkContent := remaining[:endIdx]
if thinkContent != "" {
// TRUE STREAMING: Emit thinking content immediately
// Start thinking block if not open
if !isThinkingBlockOpen {
contentBlockIndex++
thinkingBlockIndex = contentBlockIndex
isThinkingBlockOpen = true
blockStart := e.buildClaudeContentBlockStartEvent(thinkingBlockIndex, "thinking", "", "")
sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, blockStart, &translatorParam)
for _, chunk := range sseData {
if chunk != "" {
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")}
}
}
}
// Send thinking delta immediately
thinkingEvent := e.buildClaudeThinkingDeltaEvent(thinkContent, thinkingBlockIndex)
sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, thinkingEvent, &translatorParam)
for _, chunk := range sseData {
if chunk != "" {
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")}
}
}
}
// Note: Partial tag handling is done via pendingEndTagChars
// When the next chunk arrives, the partial tag will be reconstructed
// Close thinking block
if isThinkingBlockOpen {
blockStop := e.buildClaudeContentBlockStopEvent(thinkingBlockIndex)
sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, blockStop, &translatorParam)
for _, chunk := range sseData {
if chunk != "" {
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")}
}
}
isThinkingBlockOpen = false
}
inThinkBlock = false
remaining = remaining[endIdx+len(thinkingEndTag):]
log.Debugf("kiro: exited thinking block")
} else {
// No end tag found - TRUE STREAMING: emit content immediately
// Only save potential partial tag length for next iteration
pendingEnd := pendingTagSuffix(remaining, thinkingEndTag)
// Calculate content to emit immediately (excluding potential partial tag)
var contentToEmit string
if pendingEnd > 0 {
contentToEmit = remaining[:len(remaining)-pendingEnd]
// Save partial tag length for next iteration (will be reconstructed from thinkingEndTag)
pendingEndTagChars = pendingEnd
} else {
contentToEmit = remaining
}
// TRUE STREAMING: Emit thinking content immediately
if contentToEmit != "" {
// Start thinking block if not open
if !isThinkingBlockOpen {
contentBlockIndex++
thinkingBlockIndex = contentBlockIndex
isThinkingBlockOpen = true
blockStart := e.buildClaudeContentBlockStartEvent(thinkingBlockIndex, "thinking", "", "")
sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, blockStart, &translatorParam)
for _, chunk := range sseData {
if chunk != "" {
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")}
}
}
}
// Send thinking delta immediately - TRUE STREAMING!
thinkingEvent := e.buildClaudeThinkingDeltaEvent(contentToEmit, thinkingBlockIndex)
sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, thinkingEvent, &translatorParam)
for _, chunk := range sseData {
if chunk != "" {
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")}
}
}
}
remaining = ""
}
} else {
// Outside thinking block - look for <thinking> start tag
startIdx := strings.Index(remaining, thinkingStartTag)
if startIdx >= 0 {
// Found start tag - emit text before it and switch to thinking mode
textBefore := remaining[:startIdx]
if textBefore != "" {
// Start text content block if needed
if !isTextBlockOpen {
contentBlockIndex++
isTextBlockOpen = true
blockStart := e.buildClaudeContentBlockStartEvent(contentBlockIndex, "text", "", "")
sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, blockStart, &translatorParam)
for _, chunk := range sseData {
if chunk != "" {
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")}
}
}
}
claudeEvent := e.buildClaudeStreamEvent(textBefore, contentBlockIndex)
sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, claudeEvent, &translatorParam)
for _, chunk := range sseData {
if chunk != "" {
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")}
}
}
}
// Close text block before starting thinking block
if isTextBlockOpen {
blockStop := e.buildClaudeContentBlockStopEvent(contentBlockIndex)
sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, blockStop, &translatorParam)
for _, chunk := range sseData {
if chunk != "" {
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")}
}
}
isTextBlockOpen = false
}
inThinkBlock = true
remaining = remaining[startIdx+len(thinkingStartTag):]
log.Debugf("kiro: entered thinking block")
} else {
// No start tag found - check for partial start tag at buffer end
pendingStart := pendingTagSuffix(remaining, thinkingStartTag)
if pendingStart > 0 {
// Emit text except potential partial tag
textToEmit := remaining[:len(remaining)-pendingStart]
if textToEmit != "" {
// Start text content block if needed
if !isTextBlockOpen {
contentBlockIndex++
isTextBlockOpen = true
blockStart := e.buildClaudeContentBlockStartEvent(contentBlockIndex, "text", "", "")
sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, blockStart, &translatorParam)
for _, chunk := range sseData {
if chunk != "" {
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")}
}
}
}
claudeEvent := e.buildClaudeStreamEvent(textToEmit, contentBlockIndex)
sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, claudeEvent, &translatorParam)
for _, chunk := range sseData {
if chunk != "" {
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")}
}
}
}
pendingStartTagChars = pendingStart
remaining = ""
} else {
// No partial tag - emit all as text
if remaining != "" {
// Start text content block if needed
if !isTextBlockOpen {
contentBlockIndex++
isTextBlockOpen = true
blockStart := e.buildClaudeContentBlockStartEvent(contentBlockIndex, "text", "", "")
sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, blockStart, &translatorParam)
for _, chunk := range sseData {
if chunk != "" {
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")}
}
}
}
claudeEvent := e.buildClaudeStreamEvent(remaining, contentBlockIndex)
sseData := sdktranslator.TranslateStream(ctx, sdktranslator.FromString("kiro"), targetFormat, model, originalReq, claudeBody, claudeEvent, &translatorParam)
for _, chunk := range sseData {
if chunk != "" {
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunk + "\n\n")}
}
}
}
remaining = ""
}
}
}
}
}
@@ -1981,14 +2495,20 @@ func (e *KiroExecutor) buildClaudeMessageStartEvent(model string, inputTokens in
func (e *KiroExecutor) buildClaudeContentBlockStartEvent(index int, blockType, toolUseID, toolName string) []byte {
var contentBlock map[string]interface{}
if blockType == "tool_use" {
switch blockType {
case "tool_use":
contentBlock = map[string]interface{}{
"type": "tool_use",
"id": toolUseID,
"name": toolName,
"input": map[string]interface{}{},
}
} else {
case "thinking":
contentBlock = map[string]interface{}{
"type": "thinking",
"thinking": "",
}
default:
contentBlock = map[string]interface{}{
"type": "text",
"text": "",
@@ -2075,6 +2595,40 @@ func (e *KiroExecutor) buildClaudeFinalEvent() []byte {
return []byte("event: message_stop\ndata: " + string(result))
}
// buildClaudeThinkingDeltaEvent creates a thinking_delta event for Claude API compatibility.
// This is used when streaming thinking content wrapped in <thinking> tags.
func (e *KiroExecutor) buildClaudeThinkingDeltaEvent(thinkingDelta string, index int) []byte {
event := map[string]interface{}{
"type": "content_block_delta",
"index": index,
"delta": map[string]interface{}{
"type": "thinking_delta",
"thinking": thinkingDelta,
},
}
result, _ := json.Marshal(event)
return []byte("event: content_block_delta\ndata: " + string(result))
}
// pendingTagSuffix detects if the buffer ends with a partial prefix of the given tag.
// Returns the length of the partial match (0 if no match).
// Based on amq2api implementation for handling cross-chunk tag boundaries.
func pendingTagSuffix(buffer, tag string) int {
if buffer == "" || tag == "" {
return 0
}
maxLen := len(buffer)
if maxLen > len(tag)-1 {
maxLen = len(tag) - 1
}
for length := maxLen; length > 0; length-- {
if len(buffer) >= length && buffer[len(buffer)-length:] == tag[:length] {
return length
}
}
return 0
}
// CountTokens is not supported for Kiro provider.
// Kiro/Amazon Q backend doesn't expose a token counting API.
func (e *KiroExecutor) CountTokens(context.Context, *cliproxyauth.Auth, cliproxyexecutor.Request, cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {