From 11b0efc38ff365a43ab2828f372622487d081ee4 Mon Sep 17 00:00:00 2001 From: Luis Pater Date: Tue, 23 Sep 2025 12:44:44 +0800 Subject: [PATCH] feat(claude-executor): add ZSTD decoding support for Claude executor responses - Integrated ZSTD decompression via `github.com/klauspost/compress` for responses with "zstd" content-encoding. - Added helper `hasZSTDEcoding` to detect ZSTD-encoded responses. - Updated response handling logic to initialize and use a ZSTD decoder when necessary. refactor(api-handlers): split streaming and non-streaming response handling - Introduced `handleNonStreamingResponse` for processing non-streaming requests in `ClaudeCodeAPIHandler`. - Improved code clarity by separating streaming and non-streaming logic. fix(service): remove redundant token refresh interval assignment logic in `cliproxy` service. --- go.mod | 1 + go.sum | 2 ++ internal/api/handlers/claude/code_handlers.go | 32 +++++++++++++++++-- internal/runtime/executor/claude_executor.go | 32 +++++++++++++++++-- sdk/cliproxy/service.go | 3 -- 5 files changed, 62 insertions(+), 8 deletions(-) diff --git a/go.mod b/go.mod index 63b9d137..fa31a7d5 100644 --- a/go.mod +++ b/go.mod @@ -30,6 +30,7 @@ require ( github.com/go-playground/validator/v10 v10.20.0 // indirect github.com/goccy/go-json v0.10.2 // indirect github.com/json-iterator/go v1.1.12 // indirect + github.com/klauspost/compress v1.17.3 // indirect github.com/klauspost/cpuid/v2 v2.2.7 // indirect github.com/leodido/go-urn v1.4.0 // indirect github.com/mattn/go-isatty v0.0.20 // indirect diff --git a/go.sum b/go.sum index a4d6fbcd..5c8f0b1d 100644 --- a/go.sum +++ b/go.sum @@ -36,6 +36,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/klauspost/compress v1.17.3 h1:qkRjuerhUU1EmXLYGkSH6EZL+vPSxIrYjLNAK4slzwA= +github.com/klauspost/compress v1.17.3/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM= github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= diff --git a/internal/api/handlers/claude/code_handlers.go b/internal/api/handlers/claude/code_handlers.go index ed7b0339..4b848ae3 100644 --- a/internal/api/handlers/claude/code_handlers.go +++ b/internal/api/handlers/claude/code_handlers.go @@ -76,10 +76,10 @@ func (h *ClaudeCodeAPIHandler) ClaudeMessages(c *gin.Context) { // Check if the client requested a streaming response. streamResult := gjson.GetBytes(rawJSON, "stream") if !streamResult.Exists() || streamResult.Type == gjson.False { - return + h.handleNonStreamingResponse(c, rawJSON) + } else { + h.handleStreamingResponse(c, rawJSON) } - - h.handleStreamingResponse(c, rawJSON) } // ClaudeModels handles the Claude models listing endpoint. @@ -93,6 +93,32 @@ func (h *ClaudeCodeAPIHandler) ClaudeModels(c *gin.Context) { }) } +// handleNonStreamingResponse handles non-streaming content generation requests for Claude models. +// This function processes the request synchronously and returns the complete generated +// response in a single API call. It supports various generation parameters and +// response formats. +// +// Parameters: +// - c: The Gin context for the request +// - modelName: The name of the Gemini model to use for content generation +// - rawJSON: The raw JSON request body containing generation parameters and content +func (h *ClaudeCodeAPIHandler) handleNonStreamingResponse(c *gin.Context, rawJSON []byte) { + c.Header("Content-Type", "application/json") + alt := h.GetAlt(c) + cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background()) + + modelName := gjson.GetBytes(rawJSON, "model").String() + + resp, errMsg := h.ExecuteWithAuthManager(cliCtx, h.HandlerType(), modelName, rawJSON, alt) + if errMsg != nil { + h.WriteErrorResponse(c, errMsg) + cliCancel(errMsg.Error) + return + } + _, _ = c.Writer.Write(resp) + cliCancel() +} + // handleStreamingResponse streams Claude-compatible responses backed by Gemini. // It sets up SSE, selects a backend client with rotation/quota logic, // forwards chunks, and translates them to Claude CLI format. diff --git a/internal/runtime/executor/claude_executor.go b/internal/runtime/executor/claude_executor.go index 53ba280c..40a6f443 100644 --- a/internal/runtime/executor/claude_executor.go +++ b/internal/runtime/executor/claude_executor.go @@ -10,6 +10,7 @@ import ( "strings" "time" + "github.com/klauspost/compress/zstd" claudeauth "github.com/router-for-me/CLIProxyAPI/v6/internal/auth/claude" "github.com/router-for-me/CLIProxyAPI/v6/internal/config" "github.com/router-for-me/CLIProxyAPI/v6/internal/misc" @@ -66,14 +67,28 @@ func (e *ClaudeExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r if err != nil { return cliproxyexecutor.Response{}, err } - defer func() { _ = resp.Body.Close() }() + defer func() { + if errClose := resp.Body.Close(); errClose != nil { + log.Errorf("response body close error: %v", errClose) + } + }() if resp.StatusCode < 200 || resp.StatusCode >= 300 { b, _ := io.ReadAll(resp.Body) appendAPIResponseChunk(ctx, e.cfg, b) log.Debugf("request error, error status: %d, error body: %s", resp.StatusCode, string(b)) return cliproxyexecutor.Response{}, statusErr{code: resp.StatusCode, msg: string(b)} } - data, err := io.ReadAll(resp.Body) + reader := io.Reader(resp.Body) + var decoder *zstd.Decoder + if hasZSTDEcoding(resp.Header.Get("Content-Encoding")) { + decoder, err = zstd.NewReader(resp.Body) + if err != nil { + return cliproxyexecutor.Response{}, fmt.Errorf("failed to initialize zstd decoder: %w", err) + } + reader = decoder + defer decoder.Close() + } + data, err := io.ReadAll(reader) if err != nil { return cliproxyexecutor.Response{}, err } @@ -180,6 +195,19 @@ func (e *ClaudeExecutor) Refresh(ctx context.Context, auth *cliproxyauth.Auth) ( return auth, nil } +func hasZSTDEcoding(contentEncoding string) bool { + if contentEncoding == "" { + return false + } + parts := strings.Split(contentEncoding, ",") + for i := range parts { + if strings.EqualFold(strings.TrimSpace(parts[i]), "zstd") { + return true + } + } + return false +} + func applyClaudeHeaders(r *http.Request, apiKey string, stream bool) { r.Header.Set("Authorization", "Bearer "+apiKey) r.Header.Set("Content-Type", "application/json") diff --git a/sdk/cliproxy/service.go b/sdk/cliproxy/service.go index e5546a52..9dbeaedc 100644 --- a/sdk/cliproxy/service.go +++ b/sdk/cliproxy/service.go @@ -322,9 +322,6 @@ func (s *Service) Run(ctx context.Context) error { // Prefer core auth manager auto refresh if available. if s.coreManager != nil { interval := 15 * time.Minute - if sec := s.cfg.GeminiWeb.TokenRefreshSeconds; sec > 0 { - interval = time.Duration(sec) * time.Second - } s.coreManager.StartAutoRefresh(context.Background(), interval) log.Infof("core auth auto-refresh started (interval=%s)", interval) }