From bea13f9724a898dab0ed0db1de866a454f487b4d Mon Sep 17 00:00:00 2001 From: rensumo Date: Mon, 6 Apr 2026 13:59:06 +0800 Subject: [PATCH] fix(executor): support non-stream requests for CodeBuddy --- .../runtime/executor/codebuddy_executor.go | 217 +++++++++++++++++- 1 file changed, 213 insertions(+), 4 deletions(-) diff --git a/internal/runtime/executor/codebuddy_executor.go b/internal/runtime/executor/codebuddy_executor.go index 0bc56354..18719094 100644 --- a/internal/runtime/executor/codebuddy_executor.go +++ b/internal/runtime/executor/codebuddy_executor.go @@ -4,9 +4,11 @@ import ( "bufio" "bytes" "context" + "encoding/json" "fmt" "io" "net/http" + "strings" "time" "github.com/router-for-me/CLIProxyAPI/v6/internal/auth/codebuddy" @@ -14,8 +16,11 @@ import ( "github.com/router-for-me/CLIProxyAPI/v6/internal/thinking" cliproxyauth "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth" cliproxyexecutor "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/executor" + "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/usage" sdktranslator "github.com/router-for-me/CLIProxyAPI/v6/sdk/translator" log "github.com/sirupsen/logrus" + "github.com/tidwall/gjson" + "github.com/tidwall/sjson" ) const ( @@ -98,10 +103,13 @@ func (e *CodeBuddyExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth if len(opts.OriginalRequest) > 0 { originalPayloadSource = opts.OriginalRequest } - originalTranslated := sdktranslator.TranslateRequest(from, to, baseModel, originalPayloadSource, false) - translated := sdktranslator.TranslateRequest(from, to, baseModel, req.Payload, false) + originalTranslated := sdktranslator.TranslateRequest(from, to, baseModel, originalPayloadSource, true) + translated := sdktranslator.TranslateRequest(from, to, baseModel, req.Payload, true) requestedModel := payloadRequestedModel(opts, req.Model) translated = applyPayloadConfigWithRoot(e.cfg, baseModel, to.String(), "", translated, originalTranslated, requestedModel) + translated, _ = sjson.SetBytes(translated, "stream", true) + translated, _ = sjson.SetBytes(originalTranslated, "stream", true) + translated, _ = sjson.SetBytes(translated, "stream_options.include_usage", true) translated, err = thinking.ApplyThinking(translated, req.Model, from.String(), to.String(), e.Identifier()) if err != nil { @@ -114,6 +122,8 @@ func (e *CodeBuddyExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth return resp, err } e.applyHeaders(httpReq, accessToken, userID, domain) + httpReq.Header.Set("Accept", "text/event-stream") + httpReq.Header.Set("Cache-Control", "no-cache") var authID, authLabel, authType, authValue string if auth != nil { @@ -160,11 +170,16 @@ func (e *CodeBuddyExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth return resp, err } appendAPIResponseChunk(ctx, e.cfg, body) - reporter.publish(ctx, parseOpenAIUsage(body)) + aggregatedBody, usageDetail, err := aggregateOpenAIChatCompletionStream(body) + if err != nil { + recordAPIResponseError(ctx, e.cfg, err) + return resp, err + } + reporter.publish(ctx, usageDetail) reporter.ensurePublished(ctx) var param any - out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, opts.OriginalRequest, translated, body, ¶m) + out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, opts.OriginalRequest, translated, aggregatedBody, ¶m) resp = cliproxyexecutor.Response{Payload: []byte(out), Headers: httpResp.Header.Clone()} return resp, nil } @@ -341,3 +356,197 @@ func (e *CodeBuddyExecutor) applyHeaders(req *http.Request, accessToken, userID, req.Header.Set("X-IDE-Version", "2.63.2") req.Header.Set("X-Requested-With", "XMLHttpRequest") } + +type openAIChatStreamChoiceAccumulator struct { + Role string + ContentParts []string + ReasoningParts []string + FinishReason string + ToolCalls map[int]*openAIChatStreamToolCallAccumulator + ToolCallOrder []int + NativeFinishReason any +} + +type openAIChatStreamToolCallAccumulator struct { + ID string + Type string + Name string + Arguments strings.Builder +} + +func aggregateOpenAIChatCompletionStream(raw []byte) ([]byte, usage.Detail, error) { + lines := bytes.Split(raw, []byte("\n")) + var ( + responseID string + model string + created int64 + serviceTier string + systemFP string + usageDetail usage.Detail + choices = map[int]*openAIChatStreamChoiceAccumulator{} + choiceOrder []int + ) + + for _, line := range lines { + line = bytes.TrimSpace(line) + if len(line) == 0 || !bytes.HasPrefix(line, []byte("data:")) { + continue + } + payload := bytes.TrimSpace(line[5:]) + if len(payload) == 0 || bytes.Equal(payload, []byte("[DONE]")) { + continue + } + if !gjson.ValidBytes(payload) { + continue + } + + root := gjson.ParseBytes(payload) + if responseID == "" { + responseID = root.Get("id").String() + } + if model == "" { + model = root.Get("model").String() + } + if created == 0 { + created = root.Get("created").Int() + } + if serviceTier == "" { + serviceTier = root.Get("service_tier").String() + } + if systemFP == "" { + systemFP = root.Get("system_fingerprint").String() + } + if detail, ok := parseOpenAIStreamUsage(line); ok { + usageDetail = detail + } + + for _, choiceResult := range root.Get("choices").Array() { + idx := int(choiceResult.Get("index").Int()) + choice := choices[idx] + if choice == nil { + choice = &openAIChatStreamChoiceAccumulator{ToolCalls: map[int]*openAIChatStreamToolCallAccumulator{}} + choices[idx] = choice + choiceOrder = append(choiceOrder, idx) + } + + delta := choiceResult.Get("delta") + if role := delta.Get("role").String(); role != "" { + choice.Role = role + } + if content := delta.Get("content").String(); content != "" { + choice.ContentParts = append(choice.ContentParts, content) + } + if reasoning := delta.Get("reasoning_content").String(); reasoning != "" { + choice.ReasoningParts = append(choice.ReasoningParts, reasoning) + } + if finishReason := choiceResult.Get("finish_reason").String(); finishReason != "" { + choice.FinishReason = finishReason + } + if nativeFinishReason := choiceResult.Get("native_finish_reason"); nativeFinishReason.Exists() { + choice.NativeFinishReason = nativeFinishReason.Value() + } + + for _, toolCallResult := range delta.Get("tool_calls").Array() { + toolIdx := int(toolCallResult.Get("index").Int()) + toolCall := choice.ToolCalls[toolIdx] + if toolCall == nil { + toolCall = &openAIChatStreamToolCallAccumulator{} + choice.ToolCalls[toolIdx] = toolCall + choice.ToolCallOrder = append(choice.ToolCallOrder, toolIdx) + } + if id := toolCallResult.Get("id").String(); id != "" { + toolCall.ID = id + } + if typ := toolCallResult.Get("type").String(); typ != "" { + toolCall.Type = typ + } + if name := toolCallResult.Get("function.name").String(); name != "" { + toolCall.Name = name + } + if args := toolCallResult.Get("function.arguments").String(); args != "" { + toolCall.Arguments.WriteString(args) + } + } + } + } + + if responseID == "" && model == "" && len(choiceOrder) == 0 { + return nil, usageDetail, fmt.Errorf("codebuddy: streaming response did not contain any chat completion chunks") + } + + response := map[string]any{ + "id": responseID, + "object": "chat.completion", + "created": created, + "model": model, + "choices": make([]map[string]any, 0, len(choiceOrder)), + "usage": map[string]any{ + "prompt_tokens": usageDetail.InputTokens, + "completion_tokens": usageDetail.OutputTokens, + "total_tokens": usageDetail.TotalTokens, + }, + } + if serviceTier != "" { + response["service_tier"] = serviceTier + } + if systemFP != "" { + response["system_fingerprint"] = systemFP + } + + for _, idx := range choiceOrder { + choice := choices[idx] + message := map[string]any{ + "role": choice.Role, + "content": strings.Join(choice.ContentParts, ""), + } + if message["role"] == "" { + message["role"] = "assistant" + } + if len(choice.ReasoningParts) > 0 { + message["reasoning_content"] = strings.Join(choice.ReasoningParts, "") + } + if len(choice.ToolCallOrder) > 0 { + toolCalls := make([]map[string]any, 0, len(choice.ToolCallOrder)) + for _, toolIdx := range choice.ToolCallOrder { + toolCall := choice.ToolCalls[toolIdx] + toolCallType := toolCall.Type + if toolCallType == "" { + toolCallType = "function" + } + arguments := toolCall.Arguments.String() + if arguments == "" { + arguments = "{}" + } + toolCalls = append(toolCalls, map[string]any{ + "id": toolCall.ID, + "type": toolCallType, + "function": map[string]any{ + "name": toolCall.Name, + "arguments": arguments, + }, + }) + } + message["tool_calls"] = toolCalls + } + + finishReason := choice.FinishReason + if finishReason == "" { + finishReason = "stop" + } + choicePayload := map[string]any{ + "index": idx, + "message": message, + "finish_reason": finishReason, + } + if choice.NativeFinishReason != nil { + choicePayload["native_finish_reason"] = choice.NativeFinishReason + } + response["choices"] = append(response["choices"].([]map[string]any), choicePayload) + } + + out, err := json.Marshal(response) + if err != nil { + return nil, usageDetail, fmt.Errorf("codebuddy: failed to encode aggregated response: %w", err) + } + return out, usageDetail, nil +}