From 847c2502a5c7a312526fe1ee86add477b7860dda Mon Sep 17 00:00:00 2001 From: Luis Pater Date: Tue, 28 Oct 2025 08:39:03 +0800 Subject: [PATCH] Fixed: #172 feat(runtime): add Brotli and Zstd compression support, improve response handling - Implemented Brotli and Zstd decompression handling in `FileRequestLogger` and executor logic for enhanced compatibility. - Added `decodeResponseBody` utility for streamlined multi-encoding support (Gzip, Deflate, Brotli, Zstd). - Improved resource cleanup with composite readers for proper closure under all conditions. - Updated dependencies in `go.mod` and `go.sum` to include Brotli and Zstd libraries. --- go.mod | 1 + go.sum | 2 + internal/logging/request_logger.go | 58 ++++++- internal/runtime/executor/claude_executor.go | 164 ++++++++++++++----- 4 files changed, 180 insertions(+), 45 deletions(-) diff --git a/go.mod b/go.mod index 010c8a6e..50b04920 100644 --- a/go.mod +++ b/go.mod @@ -28,6 +28,7 @@ require ( cloud.google.com/go/compute/metadata v0.3.0 // indirect github.com/Microsoft/go-winio v0.6.2 // indirect github.com/ProtonMail/go-crypto v1.3.0 // indirect + github.com/andybalholm/brotli v1.0.6 // indirect github.com/bytedance/sonic v1.11.6 // indirect github.com/bytedance/sonic/loader v0.1.1 // indirect github.com/cloudflare/circl v1.6.1 // indirect diff --git a/go.sum b/go.sum index b5cfca4a..3acf8562 100644 --- a/go.sum +++ b/go.sum @@ -4,6 +4,8 @@ github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERo github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= github.com/ProtonMail/go-crypto v1.3.0 h1:ILq8+Sf5If5DCpHQp4PbZdS1J7HDFRXz/+xKBiRGFrw= github.com/ProtonMail/go-crypto v1.3.0/go.mod h1:9whxjD8Rbs29b4XWbB8irEcE8KHMqaR2e7GWU1R+/PE= +github.com/andybalholm/brotli v1.0.6 h1:Yf9fFpf49Zrxb9NlQaluyE92/+X7UVHlhMNJN2sxfOI= +github.com/andybalholm/brotli v1.0.6/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be h1:9AeTilPcZAjCFIImctFaOjnTIavg87rW78vTPkQqLI8= github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be/go.mod h1:ySMOLuWl6zY27l47sB3qLNK6tF2fkHG55UZxx8oIVo4= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= diff --git a/internal/logging/request_logger.go b/internal/logging/request_logger.go index 70a48b75..d47b3253 100644 --- a/internal/logging/request_logger.go +++ b/internal/logging/request_logger.go @@ -15,6 +15,10 @@ import ( "strings" "time" + "github.com/andybalholm/brotli" + "github.com/klauspost/compress/zstd" + log "github.com/sirupsen/logrus" + "github.com/router-for-me/CLIProxyAPI/v6/internal/interfaces" "github.com/router-for-me/CLIProxyAPI/v6/internal/util" ) @@ -411,6 +415,10 @@ func (l *FileRequestLogger) decompressResponse(responseHeaders map[string][]stri return l.decompressGzip(response) case "deflate": return l.decompressDeflate(response) + case "br": + return l.decompressBrotli(response) + case "zstd": + return l.decompressZstd(response) default: // No compression or unsupported compression return response, nil @@ -431,7 +439,9 @@ func (l *FileRequestLogger) decompressGzip(data []byte) ([]byte, error) { return nil, fmt.Errorf("failed to create gzip reader: %w", err) } defer func() { - _ = reader.Close() + if errClose := reader.Close(); errClose != nil { + log.WithError(errClose).Warn("failed to close gzip reader in request logger") + } }() decompressed, err := io.ReadAll(reader) @@ -453,7 +463,9 @@ func (l *FileRequestLogger) decompressGzip(data []byte) ([]byte, error) { func (l *FileRequestLogger) decompressDeflate(data []byte) ([]byte, error) { reader := flate.NewReader(bytes.NewReader(data)) defer func() { - _ = reader.Close() + if errClose := reader.Close(); errClose != nil { + log.WithError(errClose).Warn("failed to close deflate reader in request logger") + } }() decompressed, err := io.ReadAll(reader) @@ -464,6 +476,48 @@ func (l *FileRequestLogger) decompressDeflate(data []byte) ([]byte, error) { return decompressed, nil } +// decompressBrotli decompresses brotli-encoded data. +// +// Parameters: +// - data: The brotli-encoded data to decompress +// +// Returns: +// - []byte: The decompressed data +// - error: An error if decompression fails, nil otherwise +func (l *FileRequestLogger) decompressBrotli(data []byte) ([]byte, error) { + reader := brotli.NewReader(bytes.NewReader(data)) + + decompressed, err := io.ReadAll(reader) + if err != nil { + return nil, fmt.Errorf("failed to decompress brotli data: %w", err) + } + + return decompressed, nil +} + +// decompressZstd decompresses zstd-encoded data. +// +// Parameters: +// - data: The zstd-encoded data to decompress +// +// Returns: +// - []byte: The decompressed data +// - error: An error if decompression fails, nil otherwise +func (l *FileRequestLogger) decompressZstd(data []byte) ([]byte, error) { + decoder, err := zstd.NewReader(bytes.NewReader(data)) + if err != nil { + return nil, fmt.Errorf("failed to create zstd reader: %w", err) + } + defer decoder.Close() + + decompressed, err := io.ReadAll(decoder) + if err != nil { + return nil, fmt.Errorf("failed to decompress zstd data: %w", err) + } + + return decompressed, nil +} + // formatRequestInfo creates the request information section of the log. // // Parameters: diff --git a/internal/runtime/executor/claude_executor.go b/internal/runtime/executor/claude_executor.go index 033e3bd3..ee0dfd80 100644 --- a/internal/runtime/executor/claude_executor.go +++ b/internal/runtime/executor/claude_executor.go @@ -3,6 +3,8 @@ package executor import ( "bufio" "bytes" + "compress/flate" + "compress/gzip" "context" "fmt" "io" @@ -10,6 +12,7 @@ import ( "strings" "time" + "github.com/andybalholm/brotli" "github.com/klauspost/compress/zstd" claudeauth "github.com/router-for-me/CLIProxyAPI/v6/internal/auth/claude" "github.com/router-for-me/CLIProxyAPI/v6/internal/config" @@ -89,31 +92,31 @@ func (e *ClaudeExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r recordAPIResponseError(ctx, e.cfg, err) return resp, err } - defer func() { - if errClose := httpResp.Body.Close(); errClose != nil { - log.Errorf("response body close error: %v", errClose) - } - }() recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { b, _ := io.ReadAll(httpResp.Body) appendAPIResponseChunk(ctx, e.cfg, b) log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, string(b)) err = statusErr{code: httpResp.StatusCode, msg: string(b)} + if errClose := httpResp.Body.Close(); errClose != nil { + log.Errorf("response body close error: %v", errClose) + } return resp, err } - reader := io.Reader(httpResp.Body) - var decoder *zstd.Decoder - if hasZSTDEcoding(httpResp.Header.Get("Content-Encoding")) { - decoder, err = zstd.NewReader(httpResp.Body) - if err != nil { - recordAPIResponseError(ctx, e.cfg, err) - return resp, fmt.Errorf("failed to initialize zstd decoder: %w", err) + decodedBody, err := decodeResponseBody(httpResp.Body, httpResp.Header.Get("Content-Encoding")) + if err != nil { + recordAPIResponseError(ctx, e.cfg, err) + if errClose := httpResp.Body.Close(); errClose != nil { + log.Errorf("response body close error: %v", errClose) } - reader = decoder - defer decoder.Close() + return resp, err } - data, err := io.ReadAll(reader) + defer func() { + if errClose := decodedBody.Close(); errClose != nil { + log.Errorf("response body close error: %v", errClose) + } + }() + data, err := io.ReadAll(decodedBody) if err != nil { recordAPIResponseError(ctx, e.cfg, err) return resp, err @@ -192,19 +195,27 @@ func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A err = statusErr{code: httpResp.StatusCode, msg: string(b)} return nil, err } + decodedBody, err := decodeResponseBody(httpResp.Body, httpResp.Header.Get("Content-Encoding")) + if err != nil { + recordAPIResponseError(ctx, e.cfg, err) + if errClose := httpResp.Body.Close(); errClose != nil { + log.Errorf("response body close error: %v", errClose) + } + return nil, err + } out := make(chan cliproxyexecutor.StreamChunk) stream = out go func() { defer close(out) defer func() { - if errClose := httpResp.Body.Close(); errClose != nil { + if errClose := decodedBody.Close(); errClose != nil { log.Errorf("response body close error: %v", errClose) } }() // If from == to (Claude → Claude), directly forward the SSE stream without translation if from == to { - scanner := bufio.NewScanner(httpResp.Body) + scanner := bufio.NewScanner(decodedBody) buf := make([]byte, 20_971_520) scanner.Buffer(buf, 20_971_520) for scanner.Scan() { @@ -228,7 +239,7 @@ func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A } // For other formats, use translation - scanner := bufio.NewScanner(httpResp.Body) + scanner := bufio.NewScanner(decodedBody) buf := make([]byte, 20_971_520) scanner.Buffer(buf, 20_971_520) var param any @@ -304,29 +315,29 @@ func (e *ClaudeExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Aut recordAPIResponseError(ctx, e.cfg, err) return cliproxyexecutor.Response{}, err } - defer func() { - if errClose := resp.Body.Close(); errClose != nil { - log.Errorf("response body close error: %v", errClose) - } - }() recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone()) if resp.StatusCode < 200 || resp.StatusCode >= 300 { b, _ := io.ReadAll(resp.Body) appendAPIResponseChunk(ctx, e.cfg, b) + if errClose := resp.Body.Close(); errClose != nil { + log.Errorf("response body close error: %v", errClose) + } return cliproxyexecutor.Response{}, statusErr{code: resp.StatusCode, msg: string(b)} } - reader := io.Reader(resp.Body) - var decoder *zstd.Decoder - if hasZSTDEcoding(resp.Header.Get("Content-Encoding")) { - decoder, err = zstd.NewReader(resp.Body) - if err != nil { - recordAPIResponseError(ctx, e.cfg, err) - return cliproxyexecutor.Response{}, fmt.Errorf("failed to initialize zstd decoder: %w", err) + decodedBody, err := decodeResponseBody(resp.Body, resp.Header.Get("Content-Encoding")) + if err != nil { + recordAPIResponseError(ctx, e.cfg, err) + if errClose := resp.Body.Close(); errClose != nil { + log.Errorf("response body close error: %v", errClose) } - reader = decoder - defer decoder.Close() + return cliproxyexecutor.Response{}, err } - data, err := io.ReadAll(reader) + defer func() { + if errClose := decodedBody.Close(); errClose != nil { + log.Errorf("response body close error: %v", errClose) + } + }() + data, err := io.ReadAll(decodedBody) if err != nil { recordAPIResponseError(ctx, e.cfg, err) return cliproxyexecutor.Response{}, err @@ -419,7 +430,7 @@ func (e *ClaudeExecutor) resolveClaudeConfig(auth *cliproxyauth.Auth) *config.Cl continue } if attrKey != "" && strings.EqualFold(cfgKey, attrKey) { - if attrBase == "" || cfgBase == "" || strings.EqualFold(cfgBase, attrBase) { + if cfgBase == "" || strings.EqualFold(cfgBase, attrBase) { return entry } } @@ -438,17 +449,84 @@ func (e *ClaudeExecutor) resolveClaudeConfig(auth *cliproxyauth.Auth) *config.Cl return nil } -func hasZSTDEcoding(contentEncoding string) bool { - if contentEncoding == "" { - return false - } - parts := strings.Split(contentEncoding, ",") - for i := range parts { - if strings.EqualFold(strings.TrimSpace(parts[i]), "zstd") { - return true +type compositeReadCloser struct { + io.Reader + closers []func() error +} + +func (c *compositeReadCloser) Close() error { + var firstErr error + for i := range c.closers { + if c.closers[i] == nil { + continue + } + if err := c.closers[i](); err != nil && firstErr == nil { + firstErr = err } } - return false + return firstErr +} + +func decodeResponseBody(body io.ReadCloser, contentEncoding string) (io.ReadCloser, error) { + if body == nil { + return nil, fmt.Errorf("response body is nil") + } + if contentEncoding == "" { + return body, nil + } + encodings := strings.Split(contentEncoding, ",") + for _, raw := range encodings { + encoding := strings.TrimSpace(strings.ToLower(raw)) + switch encoding { + case "", "identity": + continue + case "gzip": + gzipReader, err := gzip.NewReader(body) + if err != nil { + _ = body.Close() + return nil, fmt.Errorf("failed to create gzip reader: %w", err) + } + return &compositeReadCloser{ + Reader: gzipReader, + closers: []func() error{ + gzipReader.Close, + func() error { return body.Close() }, + }, + }, nil + case "deflate": + deflateReader := flate.NewReader(body) + return &compositeReadCloser{ + Reader: deflateReader, + closers: []func() error{ + deflateReader.Close, + func() error { return body.Close() }, + }, + }, nil + case "br": + return &compositeReadCloser{ + Reader: brotli.NewReader(body), + closers: []func() error{ + func() error { return body.Close() }, + }, + }, nil + case "zstd": + decoder, err := zstd.NewReader(body) + if err != nil { + _ = body.Close() + return nil, fmt.Errorf("failed to create zstd reader: %w", err) + } + return &compositeReadCloser{ + Reader: decoder, + closers: []func() error{ + func() error { decoder.Close(); return nil }, + func() error { return body.Close() }, + }, + }, nil + default: + continue + } + } + return body, nil } func applyClaudeHeaders(r *http.Request, apiKey string, stream bool) {