mirror of
https://github.com/router-for-me/CLIProxyAPIPlus.git
synced 2026-03-29 16:54:41 +00:00
fix(kiro): implement web search MCP integration for streaming and non-streaming paths
Add complete web search functionality that routes pure web_search requests to the Kiro MCP endpoint instead of the normal GAR API. Executor changes (kiro_executor.go): - Add web_search detection in Execute() and ExecuteStream() entry points using HasWebSearchTool() to intercept pure web_search requests before normal processing - Add 'kiro' format passthrough in buildKiroPayloadForFormat() for pre-built payloads used by callKiroRawAndBuffer() - Implement handleWebSearchStream(): streaming search loop with MCP search -> InjectToolResultsClaude -> callKiroAndBuffer, supporting up to 5 search iterations with model-driven re-search - Implement handleWebSearch(): non-streaming path that performs single MCP search, injects tool results, calls normal Execute path, and appends server_tool_use indicators to response - Add helper methods: callKiroAndBuffer(), callKiroRawAndBuffer(), callKiroDirectStream(), sendFallbackText(), executeNonStreamFallback() Web search core logic (kiro_websearch.go) [NEW]: - Define MCP JSON-RPC 2.0 types (McpRequest, McpResponse, McpResult, McpContent, McpError) - Define WebSearchResults/WebSearchResult structs for parsing MCP search results - HasWebSearchTool(): detect pure web_search requests (single-tool array only) - ContainsWebSearchTool(): detect web_search in mixed-tool arrays - ExtractSearchQuery(): parse search query from Claude Code's tool_use message format - CreateMcpRequest(): build MCP tools/call request with Kiro-compatible ID format - InjectToolResultsClaude(): append assistant tool_use + user tool_result messages to Claude-format payload for GAR translation pipeline - InjectToolResults(): modify Kiro-format payload directly with toolResults in currentMessage context - InjectSearchIndicatorsInResponse(): prepend server_tool_use + web_search_tool_result content blocks to non-streaming response for Claude Code search count display - ReplaceWebSearchToolDescription(): swap restrictive Kiro tool description with minimal re-search-friendly version - StripWebSearchTool(): remove web_search from tools array - FormatSearchContextPrompt() / FormatToolResultText(): format search results for injection - SSE event generation: SseEvent type, GenerateWebSearchEvents() (11-event sequence), GenerateSearchIndicatorEvents() (server_tool_use + web_search_tool_result pairs) - Stream analysis: AnalyzeBufferedStream() to detect stop_reason and web_search tool_use in buffered chunks, FilterChunksForClient() to strip tool_use blocks and adjust indices, AdjustSSEChunk() / AdjustStreamIndices() for content block index offset management MCP API handler (kiro_websearch_handler.go) [NEW]: - WebSearchHandler struct with MCP endpoint, HTTP client, auth token, fingerprint, and custom auth attributes - FetchToolDescription(): sync.Once-guarded MCP tools/list call to cache web_search tool description - GetWebSearchDescription(): thread-safe cached description retrieval - CallMcpAPI(): MCP API caller with retry logic (exponential backoff, retryable on 502/503/504), AWS-aligned headers via setMcpHeaders() - ParseSearchResults(): extract WebSearchResults from MCP JSON-RPC response - setMcpHeaders(): set Content-Type, Kiro agent headers, dynamic fingerprint User-Agent, AWS SDK identifiers, Bearer auth, and custom auth attributes Claude request translation (kiro_claude_request.go): - Rename web_search -> remote_web_search in convertClaudeToolsToKiro() with dynamic description from GetWebSearchDescription() or hardcoded fallback - Rename web_search -> remote_web_search in BuildAssistantMessageStruct() for tool_use content blocks - Add remoteWebSearchDescription constant as fallback when MCP tools/list hasn't been fetched
This commit is contained in:
@@ -519,8 +519,12 @@ func buildKiroPayloadForFormat(body []byte, modelID, profileArn, origin string,
|
||||
case "openai":
|
||||
log.Debugf("kiro: using OpenAI payload builder for source format: %s", sourceFormat.String())
|
||||
return kiroopenai.BuildKiroPayloadFromOpenAI(body, modelID, profileArn, origin, isAgentic, isChatOnly, headers, nil)
|
||||
case "kiro":
|
||||
// Body is already in Kiro format — pass through directly (used by callKiroRawAndBuffer)
|
||||
log.Debugf("kiro: body already in Kiro format, passing through directly")
|
||||
return body, false
|
||||
default:
|
||||
// Default to Claude format (also handles "claude", "kiro", etc.)
|
||||
// Default to Claude format
|
||||
log.Debugf("kiro: using Claude payload builder for source format: %s", sourceFormat.String())
|
||||
return kiroclaude.BuildKiroPayload(body, modelID, profileArn, origin, isAgentic, isChatOnly, headers, nil)
|
||||
}
|
||||
@@ -636,6 +640,13 @@ func (e *KiroExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req
|
||||
rateLimiter.WaitForToken(tokenKey)
|
||||
log.Debugf("kiro: rate limiter cleared for token %s", tokenKey)
|
||||
|
||||
// Check for pure web_search request
|
||||
// Route to MCP endpoint instead of normal Kiro API
|
||||
if kiroclaude.HasWebSearchTool(req.Payload) {
|
||||
log.Infof("kiro: detected pure web_search request (non-stream), routing to MCP endpoint")
|
||||
return e.handleWebSearch(ctx, auth, req, opts, accessToken, profileArn)
|
||||
}
|
||||
|
||||
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
|
||||
defer reporter.trackFailure(ctx, &err)
|
||||
|
||||
@@ -1057,6 +1068,13 @@ func (e *KiroExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Aut
|
||||
rateLimiter.WaitForToken(tokenKey)
|
||||
log.Debugf("kiro: stream rate limiter cleared for token %s", tokenKey)
|
||||
|
||||
// Check for pure web_search request
|
||||
// Route to MCP endpoint instead of normal Kiro API
|
||||
if kiroclaude.HasWebSearchTool(req.Payload) {
|
||||
log.Infof("kiro: detected pure web_search request, routing to MCP endpoint")
|
||||
return e.handleWebSearchStream(ctx, auth, req, opts, accessToken, profileArn)
|
||||
}
|
||||
|
||||
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
|
||||
defer reporter.trackFailure(ctx, &err)
|
||||
|
||||
@@ -4096,6 +4114,539 @@ func (e *KiroExecutor) isTokenExpired(accessToken string) bool {
|
||||
return isExpired
|
||||
}
|
||||
|
||||
// NOTE: Message merging functions moved to internal/translator/kiro/common/message_merge.go
|
||||
// NOTE: Tool calling support functions moved to internal/translator/kiro/claude/kiro_claude_tools.go
|
||||
// The executor now uses kiroclaude.* and kirocommon.* functions instead
|
||||
const maxWebSearchIterations = 5
|
||||
|
||||
// handleWebSearchStream handles web_search requests:
|
||||
// Step 1: tools/list (sync) → fetch/cache tool description
|
||||
// Step 2+: MCP search → InjectToolResultsClaude → callKiroAndBuffer loop
|
||||
// Note: We skip the "model decides to search" step because Claude Code already
|
||||
// decided to use web_search. The Kiro tool description restricts non-coding
|
||||
// topics, so asking the model again would cause it to refuse valid searches.
|
||||
func (e *KiroExecutor) handleWebSearchStream(
|
||||
ctx context.Context,
|
||||
auth *cliproxyauth.Auth,
|
||||
req cliproxyexecutor.Request,
|
||||
opts cliproxyexecutor.Options,
|
||||
accessToken, profileArn string,
|
||||
) (<-chan cliproxyexecutor.StreamChunk, error) {
|
||||
// Extract search query from Claude Code's web_search tool_use
|
||||
query := kiroclaude.ExtractSearchQuery(req.Payload)
|
||||
if query == "" {
|
||||
log.Warnf("kiro/websearch: failed to extract search query, falling back to normal flow")
|
||||
return e.callKiroDirectStream(ctx, auth, req, opts, accessToken, profileArn)
|
||||
}
|
||||
|
||||
// Build MCP endpoint based on region
|
||||
region := kiroDefaultRegion
|
||||
if auth != nil && auth.Metadata != nil {
|
||||
if r, ok := auth.Metadata["api_region"].(string); ok && r != "" {
|
||||
region = r
|
||||
}
|
||||
}
|
||||
mcpEndpoint := fmt.Sprintf("https://q.%s.amazonaws.com/mcp", region)
|
||||
|
||||
// ── Step 1: tools/list (SYNC) — cache tool description ──
|
||||
{
|
||||
tokenKey := getTokenKey(auth)
|
||||
fp := getGlobalFingerprintManager().GetFingerprint(tokenKey)
|
||||
var authAttrs map[string]string
|
||||
if auth != nil {
|
||||
authAttrs = auth.Attributes
|
||||
}
|
||||
kiroclaude.FetchToolDescription(mcpEndpoint, accessToken, newKiroHTTPClientWithPooling(ctx, e.cfg, auth, 30*time.Second), fp, authAttrs)
|
||||
}
|
||||
|
||||
// Create output channel
|
||||
out := make(chan cliproxyexecutor.StreamChunk)
|
||||
|
||||
go func() {
|
||||
defer close(out)
|
||||
|
||||
// Send message_start event to client
|
||||
messageStartEvent := kiroclaude.SseEvent{
|
||||
Event: "message_start",
|
||||
Data: map[string]interface{}{
|
||||
"type": "message_start",
|
||||
"message": map[string]interface{}{
|
||||
"id": kiroclaude.GenerateMessageID(),
|
||||
"type": "message",
|
||||
"role": "assistant",
|
||||
"model": req.Model,
|
||||
"content": []interface{}{},
|
||||
"stop_reason": nil,
|
||||
"stop_sequence": nil,
|
||||
"usage": map[string]interface{}{
|
||||
"input_tokens": len(req.Payload) / 4,
|
||||
"output_tokens": 0,
|
||||
"cache_creation_input_tokens": 0,
|
||||
"cache_read_input_tokens": 0,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case out <- cliproxyexecutor.StreamChunk{Payload: []byte(messageStartEvent.ToSSEString())}:
|
||||
}
|
||||
|
||||
// ── Step 2+: MCP search → InjectToolResultsClaude → callKiroAndBuffer loop ──
|
||||
contentBlockIndex := 0
|
||||
currentQuery := query
|
||||
|
||||
// Replace web_search tool description with a minimal one that allows re-search.
|
||||
// The original tools/list description from Kiro restricts non-coding topics,
|
||||
// but we've already decided to search. We keep the tool so the model can
|
||||
// request additional searches when results are insufficient.
|
||||
simplifiedPayload, simplifyErr := kiroclaude.ReplaceWebSearchToolDescription(bytes.Clone(req.Payload))
|
||||
if simplifyErr != nil {
|
||||
log.Warnf("kiro/websearch: failed to simplify web_search tool: %v, using original payload", simplifyErr)
|
||||
simplifiedPayload = bytes.Clone(req.Payload)
|
||||
}
|
||||
|
||||
currentClaudePayload := simplifiedPayload
|
||||
totalSearches := 0
|
||||
|
||||
// Generate toolUseId for the first iteration (Claude Code already decided to search)
|
||||
currentToolUseId := fmt.Sprintf("srvtoolu_%s", kiroclaude.GenerateToolUseID())
|
||||
|
||||
for iteration := 0; iteration < maxWebSearchIterations; iteration++ {
|
||||
log.Infof("kiro/websearch: search iteration %d/%d — query: %s",
|
||||
iteration+1, maxWebSearchIterations, currentQuery)
|
||||
|
||||
// MCP search
|
||||
_, mcpRequest := kiroclaude.CreateMcpRequest(currentQuery)
|
||||
tokenKey := getTokenKey(auth)
|
||||
fp := getGlobalFingerprintManager().GetFingerprint(tokenKey)
|
||||
var authAttrs map[string]string
|
||||
if auth != nil {
|
||||
authAttrs = auth.Attributes
|
||||
}
|
||||
handler := kiroclaude.NewWebSearchHandler(mcpEndpoint, accessToken, newKiroHTTPClientWithPooling(ctx, e.cfg, auth, 30*time.Second), fp, authAttrs)
|
||||
mcpResponse, mcpErr := handler.CallMcpAPI(mcpRequest)
|
||||
|
||||
var searchResults *kiroclaude.WebSearchResults
|
||||
if mcpErr != nil {
|
||||
log.Warnf("kiro/websearch: MCP API call failed: %v, continuing with empty results", mcpErr)
|
||||
} else {
|
||||
searchResults = kiroclaude.ParseSearchResults(mcpResponse)
|
||||
}
|
||||
|
||||
resultCount := 0
|
||||
if searchResults != nil {
|
||||
resultCount = len(searchResults.Results)
|
||||
}
|
||||
totalSearches++
|
||||
log.Infof("kiro/websearch: iteration %d — got %d search results", iteration+1, resultCount)
|
||||
|
||||
// Send search indicator events to client
|
||||
searchEvents := kiroclaude.GenerateSearchIndicatorEvents(currentQuery, currentToolUseId, searchResults, contentBlockIndex)
|
||||
for _, event := range searchEvents {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case out <- cliproxyexecutor.StreamChunk{Payload: []byte(event.ToSSEString())}:
|
||||
}
|
||||
}
|
||||
contentBlockIndex += 2
|
||||
|
||||
// Inject tool_use + tool_result into Claude payload, then call GAR
|
||||
var err error
|
||||
currentClaudePayload, err = kiroclaude.InjectToolResultsClaude(currentClaudePayload, currentToolUseId, currentQuery, searchResults)
|
||||
if err != nil {
|
||||
log.Warnf("kiro/websearch: failed to inject tool results: %v", err)
|
||||
e.sendFallbackText(ctx, out, contentBlockIndex, currentQuery, searchResults)
|
||||
break
|
||||
}
|
||||
|
||||
// Call GAR with modified Claude payload (full translation pipeline)
|
||||
modifiedReq := req
|
||||
modifiedReq.Payload = currentClaudePayload
|
||||
kiroChunks, kiroErr := e.callKiroAndBuffer(ctx, auth, modifiedReq, opts, accessToken, profileArn)
|
||||
if kiroErr != nil {
|
||||
log.Warnf("kiro/websearch: Kiro API failed at iteration %d: %v", iteration+1, kiroErr)
|
||||
e.sendFallbackText(ctx, out, contentBlockIndex, currentQuery, searchResults)
|
||||
break
|
||||
}
|
||||
|
||||
// Analyze response
|
||||
analysis := kiroclaude.AnalyzeBufferedStream(kiroChunks)
|
||||
log.Infof("kiro/websearch: iteration %d — stop_reason: %s, has_tool_use: %v, query: %s, toolUseId: %s",
|
||||
iteration+1, analysis.StopReason, analysis.HasWebSearchToolUse, analysis.WebSearchQuery, analysis.WebSearchToolUseId)
|
||||
|
||||
if analysis.HasWebSearchToolUse && analysis.WebSearchQuery != "" && iteration+1 < maxWebSearchIterations {
|
||||
// Model wants another search
|
||||
filteredChunks := kiroclaude.FilterChunksForClient(kiroChunks, analysis.WebSearchToolUseIndex, contentBlockIndex)
|
||||
for _, chunk := range filteredChunks {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case out <- cliproxyexecutor.StreamChunk{Payload: chunk}:
|
||||
}
|
||||
}
|
||||
|
||||
currentQuery = analysis.WebSearchQuery
|
||||
currentToolUseId = analysis.WebSearchToolUseId
|
||||
continue
|
||||
}
|
||||
|
||||
// Model returned final response — stream to client
|
||||
for _, chunk := range kiroChunks {
|
||||
if contentBlockIndex > 0 && len(chunk) > 0 {
|
||||
adjusted, shouldForward := kiroclaude.AdjustSSEChunk(chunk, contentBlockIndex)
|
||||
if !shouldForward {
|
||||
continue
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case out <- cliproxyexecutor.StreamChunk{Payload: adjusted}:
|
||||
}
|
||||
} else {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case out <- cliproxyexecutor.StreamChunk{Payload: chunk}:
|
||||
}
|
||||
}
|
||||
}
|
||||
log.Infof("kiro/websearch: completed after %d search iteration(s), total searches: %d", iteration+1, totalSearches)
|
||||
return
|
||||
}
|
||||
|
||||
log.Warnf("kiro/websearch: reached max iterations (%d), stopping search loop", maxWebSearchIterations)
|
||||
}()
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// callKiroAndBuffer calls the Kiro API and buffers all response chunks.
|
||||
// Returns the buffered chunks for analysis before forwarding to client.
|
||||
func (e *KiroExecutor) callKiroAndBuffer(
|
||||
ctx context.Context,
|
||||
auth *cliproxyauth.Auth,
|
||||
req cliproxyexecutor.Request,
|
||||
opts cliproxyexecutor.Options,
|
||||
accessToken, profileArn string,
|
||||
) ([][]byte, error) {
|
||||
from := opts.SourceFormat
|
||||
to := sdktranslator.FromString("kiro")
|
||||
body := sdktranslator.TranslateRequest(from, to, req.Model, bytes.Clone(req.Payload), true)
|
||||
log.Debugf("kiro/websearch GAR request: %d bytes", len(body))
|
||||
|
||||
kiroModelID := e.mapModelToKiro(req.Model)
|
||||
isAgentic, isChatOnly := determineAgenticMode(req.Model)
|
||||
effectiveProfileArn := getEffectiveProfileArnWithWarning(auth, profileArn)
|
||||
|
||||
tokenKey := ""
|
||||
if auth != nil {
|
||||
tokenKey = auth.ID
|
||||
}
|
||||
|
||||
kiroStream, err := e.executeStreamWithRetry(
|
||||
ctx, auth, req, opts, accessToken, effectiveProfileArn,
|
||||
nil, body, from, nil, "", kiroModelID, isAgentic, isChatOnly, tokenKey,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Buffer all chunks
|
||||
var chunks [][]byte
|
||||
for chunk := range kiroStream {
|
||||
if chunk.Err != nil {
|
||||
return chunks, chunk.Err
|
||||
}
|
||||
if len(chunk.Payload) > 0 {
|
||||
chunks = append(chunks, bytes.Clone(chunk.Payload))
|
||||
}
|
||||
}
|
||||
|
||||
log.Debugf("kiro/websearch GAR response: %d chunks buffered", len(chunks))
|
||||
|
||||
return chunks, nil
|
||||
}
|
||||
|
||||
// callKiroRawAndBuffer calls the Kiro API with a pre-built Kiro payload (no translation).
|
||||
// Used in the web search loop where the payload is modified directly in Kiro format.
|
||||
func (e *KiroExecutor) callKiroRawAndBuffer(
|
||||
ctx context.Context,
|
||||
auth *cliproxyauth.Auth,
|
||||
req cliproxyexecutor.Request,
|
||||
opts cliproxyexecutor.Options,
|
||||
accessToken, profileArn string,
|
||||
kiroBody []byte,
|
||||
) ([][]byte, error) {
|
||||
kiroModelID := e.mapModelToKiro(req.Model)
|
||||
isAgentic, isChatOnly := determineAgenticMode(req.Model)
|
||||
effectiveProfileArn := getEffectiveProfileArnWithWarning(auth, profileArn)
|
||||
|
||||
tokenKey := ""
|
||||
if auth != nil {
|
||||
tokenKey = auth.ID
|
||||
}
|
||||
log.Debugf("kiro/websearch GAR raw request: %d bytes", len(kiroBody))
|
||||
|
||||
kiroFormat := sdktranslator.FromString("kiro")
|
||||
kiroStream, err := e.executeStreamWithRetry(
|
||||
ctx, auth, req, opts, accessToken, effectiveProfileArn,
|
||||
nil, kiroBody, kiroFormat, nil, "", kiroModelID, isAgentic, isChatOnly, tokenKey,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Buffer all chunks
|
||||
var chunks [][]byte
|
||||
for chunk := range kiroStream {
|
||||
if chunk.Err != nil {
|
||||
return chunks, chunk.Err
|
||||
}
|
||||
if len(chunk.Payload) > 0 {
|
||||
chunks = append(chunks, bytes.Clone(chunk.Payload))
|
||||
}
|
||||
}
|
||||
|
||||
log.Debugf("kiro/websearch GAR raw response: %d chunks buffered", len(chunks))
|
||||
|
||||
return chunks, nil
|
||||
}
|
||||
|
||||
// callKiroDirectStream creates a direct streaming channel to Kiro API without search.
|
||||
func (e *KiroExecutor) callKiroDirectStream(
|
||||
ctx context.Context,
|
||||
auth *cliproxyauth.Auth,
|
||||
req cliproxyexecutor.Request,
|
||||
opts cliproxyexecutor.Options,
|
||||
accessToken, profileArn string,
|
||||
) (<-chan cliproxyexecutor.StreamChunk, error) {
|
||||
from := opts.SourceFormat
|
||||
to := sdktranslator.FromString("kiro")
|
||||
body := sdktranslator.TranslateRequest(from, to, req.Model, bytes.Clone(req.Payload), true)
|
||||
|
||||
kiroModelID := e.mapModelToKiro(req.Model)
|
||||
isAgentic, isChatOnly := determineAgenticMode(req.Model)
|
||||
effectiveProfileArn := getEffectiveProfileArnWithWarning(auth, profileArn)
|
||||
|
||||
tokenKey := ""
|
||||
if auth != nil {
|
||||
tokenKey = auth.ID
|
||||
}
|
||||
|
||||
return e.executeStreamWithRetry(
|
||||
ctx, auth, req, opts, accessToken, effectiveProfileArn,
|
||||
nil, body, from, nil, "", kiroModelID, isAgentic, isChatOnly, tokenKey,
|
||||
)
|
||||
}
|
||||
|
||||
// sendFallbackText sends a simple text response when the Kiro API fails during the search loop.
|
||||
func (e *KiroExecutor) sendFallbackText(
|
||||
ctx context.Context,
|
||||
out chan<- cliproxyexecutor.StreamChunk,
|
||||
contentBlockIndex int,
|
||||
query string,
|
||||
searchResults *kiroclaude.WebSearchResults,
|
||||
) {
|
||||
// Generate a simple text summary from search results
|
||||
summary := kiroclaude.FormatSearchContextPrompt(query, searchResults)
|
||||
|
||||
events := []kiroclaude.SseEvent{
|
||||
{
|
||||
Event: "content_block_start",
|
||||
Data: map[string]interface{}{
|
||||
"type": "content_block_start",
|
||||
"index": contentBlockIndex,
|
||||
"content_block": map[string]interface{}{
|
||||
"type": "text",
|
||||
"text": "",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Event: "content_block_delta",
|
||||
Data: map[string]interface{}{
|
||||
"type": "content_block_delta",
|
||||
"index": contentBlockIndex,
|
||||
"delta": map[string]interface{}{
|
||||
"type": "text_delta",
|
||||
"text": summary,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Event: "content_block_stop",
|
||||
Data: map[string]interface{}{
|
||||
"type": "content_block_stop",
|
||||
"index": contentBlockIndex,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, event := range events {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case out <- cliproxyexecutor.StreamChunk{Payload: []byte(event.ToSSEString())}:
|
||||
}
|
||||
}
|
||||
|
||||
// Send message_delta with end_turn and message_stop
|
||||
msgDelta := kiroclaude.SseEvent{
|
||||
Event: "message_delta",
|
||||
Data: map[string]interface{}{
|
||||
"type": "message_delta",
|
||||
"delta": map[string]interface{}{
|
||||
"stop_reason": "end_turn",
|
||||
"stop_sequence": nil,
|
||||
},
|
||||
"usage": map[string]interface{}{
|
||||
"output_tokens": len(summary) / 4,
|
||||
},
|
||||
},
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case out <- cliproxyexecutor.StreamChunk{Payload: []byte(msgDelta.ToSSEString())}:
|
||||
}
|
||||
|
||||
msgStop := kiroclaude.SseEvent{
|
||||
Event: "message_stop",
|
||||
Data: map[string]interface{}{
|
||||
"type": "message_stop",
|
||||
},
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case out <- cliproxyexecutor.StreamChunk{Payload: []byte(msgStop.ToSSEString())}:
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// handleWebSearch handles web_search requests for non-streaming Execute path.
|
||||
// Performs MCP search synchronously, injects results into the request payload,
|
||||
// then calls the normal non-streaming Kiro API path which returns a proper
|
||||
// Claude JSON response (not SSE chunks).
|
||||
func (e *KiroExecutor) handleWebSearch(
|
||||
ctx context.Context,
|
||||
auth *cliproxyauth.Auth,
|
||||
req cliproxyexecutor.Request,
|
||||
opts cliproxyexecutor.Options,
|
||||
accessToken, profileArn string,
|
||||
) (cliproxyexecutor.Response, error) {
|
||||
// Extract search query from Claude Code's web_search tool_use
|
||||
query := kiroclaude.ExtractSearchQuery(req.Payload)
|
||||
if query == "" {
|
||||
log.Warnf("kiro/websearch: non-stream: failed to extract search query, falling back to normal Execute")
|
||||
// Fall through to normal non-streaming path
|
||||
return e.executeNonStreamFallback(ctx, auth, req, opts, accessToken, profileArn)
|
||||
}
|
||||
|
||||
// Build MCP endpoint based on region
|
||||
region := kiroDefaultRegion
|
||||
if auth != nil && auth.Metadata != nil {
|
||||
if r, ok := auth.Metadata["api_region"].(string); ok && r != "" {
|
||||
region = r
|
||||
}
|
||||
}
|
||||
mcpEndpoint := fmt.Sprintf("https://q.%s.amazonaws.com/mcp", region)
|
||||
|
||||
// Step 1: Fetch/cache tool description (sync)
|
||||
{
|
||||
tokenKey := getTokenKey(auth)
|
||||
fp := getGlobalFingerprintManager().GetFingerprint(tokenKey)
|
||||
var authAttrs map[string]string
|
||||
if auth != nil {
|
||||
authAttrs = auth.Attributes
|
||||
}
|
||||
kiroclaude.FetchToolDescription(mcpEndpoint, accessToken, newKiroHTTPClientWithPooling(ctx, e.cfg, auth, 30*time.Second), fp, authAttrs)
|
||||
}
|
||||
|
||||
// Step 2: Perform MCP search
|
||||
_, mcpRequest := kiroclaude.CreateMcpRequest(query)
|
||||
tokenKey := getTokenKey(auth)
|
||||
fp := getGlobalFingerprintManager().GetFingerprint(tokenKey)
|
||||
var authAttrs map[string]string
|
||||
if auth != nil {
|
||||
authAttrs = auth.Attributes
|
||||
}
|
||||
handler := kiroclaude.NewWebSearchHandler(mcpEndpoint, accessToken, newKiroHTTPClientWithPooling(ctx, e.cfg, auth, 30*time.Second), fp, authAttrs)
|
||||
mcpResponse, mcpErr := handler.CallMcpAPI(mcpRequest)
|
||||
|
||||
var searchResults *kiroclaude.WebSearchResults
|
||||
if mcpErr != nil {
|
||||
log.Warnf("kiro/websearch: non-stream: MCP API call failed: %v, continuing with empty results", mcpErr)
|
||||
} else {
|
||||
searchResults = kiroclaude.ParseSearchResults(mcpResponse)
|
||||
}
|
||||
|
||||
resultCount := 0
|
||||
if searchResults != nil {
|
||||
resultCount = len(searchResults.Results)
|
||||
}
|
||||
log.Infof("kiro/websearch: non-stream: got %d search results for query: %s", resultCount, query)
|
||||
|
||||
// Step 3: Inject search tool_use + tool_result into Claude payload
|
||||
currentToolUseId := fmt.Sprintf("srvtoolu_%s", kiroclaude.GenerateToolUseID())
|
||||
modifiedPayload, err := kiroclaude.InjectToolResultsClaude(bytes.Clone(req.Payload), currentToolUseId, query, searchResults)
|
||||
if err != nil {
|
||||
log.Warnf("kiro/websearch: non-stream: failed to inject tool results: %v, falling back", err)
|
||||
return e.executeNonStreamFallback(ctx, auth, req, opts, accessToken, profileArn)
|
||||
}
|
||||
|
||||
// Step 4: Call Kiro API via the normal non-streaming path (executeWithRetry)
|
||||
// This path uses parseEventStream → BuildClaudeResponse → TranslateNonStream
|
||||
// to produce a proper Claude JSON response
|
||||
modifiedReq := req
|
||||
modifiedReq.Payload = modifiedPayload
|
||||
|
||||
resp, err := e.executeNonStreamFallback(ctx, auth, modifiedReq, opts, accessToken, profileArn)
|
||||
if err != nil {
|
||||
return resp, err
|
||||
}
|
||||
|
||||
// Step 5: Inject server_tool_use + web_search_tool_result into response
|
||||
// so Claude Code can display "Did X searches in Ys"
|
||||
indicators := []kiroclaude.SearchIndicator{
|
||||
{
|
||||
ToolUseID: currentToolUseId,
|
||||
Query: query,
|
||||
Results: searchResults,
|
||||
},
|
||||
}
|
||||
injectedPayload, injErr := kiroclaude.InjectSearchIndicatorsInResponse(resp.Payload, indicators)
|
||||
if injErr != nil {
|
||||
log.Warnf("kiro/websearch: non-stream: failed to inject search indicators: %v", injErr)
|
||||
} else {
|
||||
resp.Payload = injectedPayload
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// executeNonStreamFallback runs the standard non-streaming Execute path for a request.
|
||||
// Used by handleWebSearch after injecting search results, or as a fallback.
|
||||
func (e *KiroExecutor) executeNonStreamFallback(
|
||||
ctx context.Context,
|
||||
auth *cliproxyauth.Auth,
|
||||
req cliproxyexecutor.Request,
|
||||
opts cliproxyexecutor.Options,
|
||||
accessToken, profileArn string,
|
||||
) (cliproxyexecutor.Response, error) {
|
||||
from := opts.SourceFormat
|
||||
to := sdktranslator.FromString("kiro")
|
||||
body := sdktranslator.TranslateRequest(from, to, req.Model, bytes.Clone(req.Payload), true)
|
||||
|
||||
kiroModelID := e.mapModelToKiro(req.Model)
|
||||
isAgentic, isChatOnly := determineAgenticMode(req.Model)
|
||||
effectiveProfileArn := getEffectiveProfileArnWithWarning(auth, profileArn)
|
||||
tokenKey := getTokenKey(auth)
|
||||
|
||||
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
|
||||
var err error
|
||||
defer reporter.trackFailure(ctx, &err)
|
||||
|
||||
resp, err := e.executeWithRetry(ctx, auth, req, opts, accessToken, effectiveProfileArn, nil, body, from, to, reporter, "", kiroModelID, isAgentic, isChatOnly, tokenKey)
|
||||
return resp, err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user