From 2b8c466e885cd10da89a303c473d966ba026b286 Mon Sep 17 00:00:00 2001 From: Luis Pater Date: Thu, 19 Feb 2026 22:07:14 +0800 Subject: [PATCH] refactor(executor, handlers): replace channel-based streams with `StreamResult` for consistency - Updated `ExecuteStream` functions in executors to use `StreamResult` instead of channels. - Enhanced upstream header handling in OpenAI handlers. - Improved maintainability and alignment across executors and handlers. --- .../executor/github_copilot_executor.go | 20 ++++++++++--------- internal/runtime/executor/kilo_executor.go | 9 +++++---- internal/runtime/executor/kiro_executor.go | 14 ++++++++++--- sdk/api/handlers/openai/openai_handlers.go | 7 +++++-- .../openai/openai_responses_handlers.go | 7 +++++-- 5 files changed, 37 insertions(+), 20 deletions(-) diff --git a/internal/runtime/executor/github_copilot_executor.go b/internal/runtime/executor/github_copilot_executor.go index 0189ffc8..af4b7e6a 100644 --- a/internal/runtime/executor/github_copilot_executor.go +++ b/internal/runtime/executor/github_copilot_executor.go @@ -35,12 +35,12 @@ const ( maxScannerBufferSize = 20_971_520 // Copilot API header values. - copilotUserAgent = "GitHubCopilotChat/0.35.0" - copilotEditorVersion = "vscode/1.107.0" - copilotPluginVersion = "copilot-chat/0.35.0" - copilotIntegrationID = "vscode-chat" - copilotOpenAIIntent = "conversation-panel" - copilotGitHubAPIVer = "2025-04-01" + copilotUserAgent = "GitHubCopilotChat/0.35.0" + copilotEditorVersion = "vscode/1.107.0" + copilotPluginVersion = "copilot-chat/0.35.0" + copilotIntegrationID = "vscode-chat" + copilotOpenAIIntent = "conversation-panel" + copilotGitHubAPIVer = "2025-04-01" ) // GitHubCopilotExecutor handles requests to the GitHub Copilot API. @@ -232,7 +232,7 @@ func (e *GitHubCopilotExecutor) Execute(ctx context.Context, auth *cliproxyauth. } // ExecuteStream handles streaming requests to GitHub Copilot. -func (e *GitHubCopilotExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (stream <-chan cliproxyexecutor.StreamChunk, err error) { +func (e *GitHubCopilotExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (_ *cliproxyexecutor.StreamResult, err error) { apiToken, baseURL, errToken := e.ensureAPIToken(ctx, auth) if errToken != nil { return nil, errToken @@ -341,7 +341,6 @@ func (e *GitHubCopilotExecutor) ExecuteStream(ctx context.Context, auth *cliprox } out := make(chan cliproxyexecutor.StreamChunk) - stream = out go func() { defer close(out) @@ -394,7 +393,10 @@ func (e *GitHubCopilotExecutor) ExecuteStream(ctx context.Context, auth *cliprox } }() - return stream, nil + return &cliproxyexecutor.StreamResult{ + Headers: httpResp.Header.Clone(), + Chunks: out, + }, nil } // CountTokens is not supported for GitHub Copilot. diff --git a/internal/runtime/executor/kilo_executor.go b/internal/runtime/executor/kilo_executor.go index b2359319..34f62023 100644 --- a/internal/runtime/executor/kilo_executor.go +++ b/internal/runtime/executor/kilo_executor.go @@ -168,7 +168,7 @@ func (e *KiloExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req } // ExecuteStream performs a streaming request. -func (e *KiloExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (stream <-chan cliproxyexecutor.StreamChunk, err error) { +func (e *KiloExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (_ *cliproxyexecutor.StreamResult, err error) { baseModel := thinking.ParseSuffix(req.Model).ModelName reporter := newUsageReporter(ctx, e.Identifier(), baseModel, auth) @@ -253,7 +253,6 @@ func (e *KiloExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Aut } out := make(chan cliproxyexecutor.StreamChunk) - stream = out go func() { defer close(out) defer httpResp.Body.Close() @@ -286,7 +285,10 @@ func (e *KiloExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Aut reporter.ensurePublished(ctx) }() - return stream, nil + return &cliproxyexecutor.StreamResult{ + Headers: httpResp.Header.Clone(), + Chunks: out, + }, nil } // Refresh validates the Kilo token. @@ -456,4 +458,3 @@ func FetchKiloModels(ctx context.Context, auth *cliproxyauth.Auth, cfg *config.C return allModels } - diff --git a/internal/runtime/executor/kiro_executor.go b/internal/runtime/executor/kiro_executor.go index e1a280b9..a8237cd7 100644 --- a/internal/runtime/executor/kiro_executor.go +++ b/internal/runtime/executor/kiro_executor.go @@ -1053,7 +1053,7 @@ func (e *KiroExecutor) executeWithRetry(ctx context.Context, auth *cliproxyauth. // ExecuteStream handles streaming requests to Kiro API. // Supports automatic token refresh on 401/403 errors and quota fallback on 429. -func (e *KiroExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (stream <-chan cliproxyexecutor.StreamChunk, err error) { +func (e *KiroExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (_ *cliproxyexecutor.StreamResult, err error) { accessToken, profileArn := kiroCredentials(auth) if accessToken == "" { return nil, fmt.Errorf("kiro: access token not found in auth") @@ -1110,7 +1110,11 @@ func (e *KiroExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Aut // Route to MCP endpoint instead of normal Kiro API if kiroclaude.HasWebSearchTool(req.Payload) { log.Infof("kiro: detected pure web_search request, routing to MCP endpoint") - return e.handleWebSearchStream(ctx, auth, req, opts, accessToken, profileArn) + streamWebSearch, errWebSearch := e.handleWebSearchStream(ctx, auth, req, opts, accessToken, profileArn) + if errWebSearch != nil { + return nil, errWebSearch + } + return &cliproxyexecutor.StreamResult{Chunks: streamWebSearch}, nil } reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth) @@ -1128,7 +1132,11 @@ func (e *KiroExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Aut // Execute stream with retry on 401/403 and 429 (quota exhausted) // Note: currentOrigin and kiroPayload are built inside executeStreamWithRetry for each endpoint - return e.executeStreamWithRetry(ctx, auth, req, opts, accessToken, effectiveProfileArn, nil, body, from, reporter, "", kiroModelID, isAgentic, isChatOnly, tokenKey) + streamKiro, errStreamKiro := e.executeStreamWithRetry(ctx, auth, req, opts, accessToken, effectiveProfileArn, nil, body, from, reporter, "", kiroModelID, isAgentic, isChatOnly, tokenKey) + if errStreamKiro != nil { + return nil, errStreamKiro + } + return &cliproxyexecutor.StreamResult{Chunks: streamKiro}, nil } // executeStreamWithRetry performs the streaming HTTP request with automatic retry on auth errors. diff --git a/sdk/api/handlers/openai/openai_handlers.go b/sdk/api/handlers/openai/openai_handlers.go index c589c1b1..2e85dcf8 100644 --- a/sdk/api/handlers/openai/openai_handlers.go +++ b/sdk/api/handlers/openai/openai_handlers.go @@ -535,12 +535,13 @@ func (h *OpenAIAPIHandler) handleNonStreamingResponseViaResponses(c *gin.Context modelName := gjson.GetBytes(rawJSON, "model").String() cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background()) - resp, errMsg := h.ExecuteWithAuthManager(cliCtx, OpenaiResponse, modelName, rawJSON, h.GetAlt(c)) + resp, upstreamHeaders, errMsg := h.ExecuteWithAuthManager(cliCtx, OpenaiResponse, modelName, rawJSON, h.GetAlt(c)) if errMsg != nil { h.WriteErrorResponse(c, errMsg) cliCancel(errMsg.Error) return } + handlers.WriteUpstreamHeaders(c.Writer.Header(), upstreamHeaders) converted := convertResponsesObjectToChatCompletion(cliCtx, modelName, originalChatJSON, rawJSON, resp) if converted == nil { h.WriteErrorResponse(c, &interfaces.ErrorMessage{ @@ -644,7 +645,7 @@ func (h *OpenAIAPIHandler) handleStreamingResponseViaResponses(c *gin.Context, r modelName := gjson.GetBytes(rawJSON, "model").String() cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background()) - dataChan, errChan := h.ExecuteStreamWithAuthManager(cliCtx, OpenaiResponse, modelName, rawJSON, h.GetAlt(c)) + dataChan, upstreamHeaders, errChan := h.ExecuteStreamWithAuthManager(cliCtx, OpenaiResponse, modelName, rawJSON, h.GetAlt(c)) var param any setSSEHeaders := func() { @@ -675,6 +676,7 @@ func (h *OpenAIAPIHandler) handleStreamingResponseViaResponses(c *gin.Context, r case chunk, ok := <-dataChan: if !ok { setSSEHeaders() + handlers.WriteUpstreamHeaders(c.Writer.Header(), upstreamHeaders) _, _ = fmt.Fprintf(c.Writer, "data: [DONE]\n\n") flusher.Flush() cliCancel(nil) @@ -682,6 +684,7 @@ func (h *OpenAIAPIHandler) handleStreamingResponseViaResponses(c *gin.Context, r } setSSEHeaders() + handlers.WriteUpstreamHeaders(c.Writer.Header(), upstreamHeaders) writeConvertedResponsesChunk(c, cliCtx, modelName, originalChatJSON, rawJSON, chunk, ¶m) flusher.Flush() diff --git a/sdk/api/handlers/openai/openai_responses_handlers.go b/sdk/api/handlers/openai/openai_responses_handlers.go index 1f621e82..f10e8d51 100644 --- a/sdk/api/handlers/openai/openai_responses_handlers.go +++ b/sdk/api/handlers/openai/openai_responses_handlers.go @@ -182,12 +182,13 @@ func (h *OpenAIResponsesAPIHandler) handleNonStreamingResponseViaChat(c *gin.Con modelName := gjson.GetBytes(chatJSON, "model").String() cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background()) - resp, errMsg := h.ExecuteWithAuthManager(cliCtx, OpenAI, modelName, chatJSON, "") + resp, upstreamHeaders, errMsg := h.ExecuteWithAuthManager(cliCtx, OpenAI, modelName, chatJSON, "") if errMsg != nil { h.WriteErrorResponse(c, errMsg) cliCancel(errMsg.Error) return } + handlers.WriteUpstreamHeaders(c.Writer.Header(), upstreamHeaders) var param any converted := responsesconverter.ConvertOpenAIChatCompletionsResponseToOpenAIResponsesNonStream(cliCtx, modelName, originalResponsesJSON, originalResponsesJSON, resp, ¶m) if converted == "" { @@ -298,7 +299,7 @@ func (h *OpenAIResponsesAPIHandler) handleStreamingResponseViaChat(c *gin.Contex modelName := gjson.GetBytes(chatJSON, "model").String() cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background()) - dataChan, errChan := h.ExecuteStreamWithAuthManager(cliCtx, OpenAI, modelName, chatJSON, "") + dataChan, upstreamHeaders, errChan := h.ExecuteStreamWithAuthManager(cliCtx, OpenAI, modelName, chatJSON, "") var param any setSSEHeaders := func() { @@ -328,6 +329,7 @@ func (h *OpenAIResponsesAPIHandler) handleStreamingResponseViaChat(c *gin.Contex case chunk, ok := <-dataChan: if !ok { setSSEHeaders() + handlers.WriteUpstreamHeaders(c.Writer.Header(), upstreamHeaders) _, _ = c.Writer.Write([]byte("\n")) flusher.Flush() cliCancel(nil) @@ -335,6 +337,7 @@ func (h *OpenAIResponsesAPIHandler) handleStreamingResponseViaChat(c *gin.Contex } setSSEHeaders() + handlers.WriteUpstreamHeaders(c.Writer.Header(), upstreamHeaders) writeChatAsResponsesChunk(c, cliCtx, modelName, originalResponsesJSON, chunk, ¶m) flusher.Flush()