From ed28b71e8706bfa0269480b61cd933726ef97e97 Mon Sep 17 00:00:00 2001 From: Luis Pater Date: Fri, 9 Jan 2026 08:21:13 +0800 Subject: [PATCH 01/10] refactor(amp): remove duplicate comments in response rewriter --- internal/api/modules/amp/response_rewriter.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/internal/api/modules/amp/response_rewriter.go b/internal/api/modules/amp/response_rewriter.go index 35888116..57e4922a 100644 --- a/internal/api/modules/amp/response_rewriter.go +++ b/internal/api/modules/amp/response_rewriter.go @@ -71,8 +71,6 @@ var modelFieldPaths = []string{"model", "modelVersion", "response.modelVersion", // rewriteModelInResponse replaces all occurrences of the mapped model with the original model in JSON // It also suppresses "thinking" blocks if "tool_use" is present to ensure Amp client compatibility func (rw *ResponseRewriter) rewriteModelInResponse(data []byte) []byte { - // 1. Amp Compatibility: Suppress thinking blocks if tool use is detected - // The Amp client struggles when both thinking and tool_use blocks are present // 1. Amp Compatibility: Suppress thinking blocks if tool use is detected // The Amp client struggles when both thinking and tool_use blocks are present if gjson.GetBytes(data, `content.#(type=="tool_use")`).Exists() { From ef6bafbf7e6677114bef7917561ad9401576938b Mon Sep 17 00:00:00 2001 From: Luis Pater Date: Fri, 9 Jan 2026 10:48:29 +0800 Subject: [PATCH 02/10] fix(executor): handle context cancellation and deadline errors explicitly --- .../runtime/executor/antigravity_executor.go | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/internal/runtime/executor/antigravity_executor.go b/internal/runtime/executor/antigravity_executor.go index 89bdbe49..47b2ac48 100644 --- a/internal/runtime/executor/antigravity_executor.go +++ b/internal/runtime/executor/antigravity_executor.go @@ -10,6 +10,7 @@ import ( "crypto/sha256" "encoding/binary" "encoding/json" + "errors" "fmt" "io" "math/rand" @@ -125,6 +126,9 @@ func (e *AntigravityExecutor) Execute(ctx context.Context, auth *cliproxyauth.Au httpResp, errDo := httpClient.Do(httpReq) if errDo != nil { recordAPIResponseError(ctx, e.cfg, errDo) + if errors.Is(errDo, context.Canceled) || errors.Is(errDo, context.DeadlineExceeded) { + return resp, errDo + } lastStatus = 0 lastBody = nil lastErr = errDo @@ -237,6 +241,9 @@ func (e *AntigravityExecutor) executeClaudeNonStream(ctx context.Context, auth * httpResp, errDo := httpClient.Do(httpReq) if errDo != nil { recordAPIResponseError(ctx, e.cfg, errDo) + if errors.Is(errDo, context.Canceled) || errors.Is(errDo, context.DeadlineExceeded) { + return resp, errDo + } lastStatus = 0 lastBody = nil lastErr = errDo @@ -255,6 +262,14 @@ func (e *AntigravityExecutor) executeClaudeNonStream(ctx context.Context, auth * } if errRead != nil { recordAPIResponseError(ctx, e.cfg, errRead) + if errors.Is(errRead, context.Canceled) || errors.Is(errRead, context.DeadlineExceeded) { + err = errRead + return resp, err + } + if errCtx := ctx.Err(); errCtx != nil { + err = errCtx + return resp, err + } lastStatus = 0 lastBody = nil lastErr = errRead @@ -590,6 +605,9 @@ func (e *AntigravityExecutor) ExecuteStream(ctx context.Context, auth *cliproxya httpResp, errDo := httpClient.Do(httpReq) if errDo != nil { recordAPIResponseError(ctx, e.cfg, errDo) + if errors.Is(errDo, context.Canceled) || errors.Is(errDo, context.DeadlineExceeded) { + return nil, errDo + } lastStatus = 0 lastBody = nil lastErr = errDo @@ -608,6 +626,14 @@ func (e *AntigravityExecutor) ExecuteStream(ctx context.Context, auth *cliproxya } if errRead != nil { recordAPIResponseError(ctx, e.cfg, errRead) + if errors.Is(errRead, context.Canceled) || errors.Is(errRead, context.DeadlineExceeded) { + err = errRead + return nil, err + } + if errCtx := ctx.Err(); errCtx != nil { + err = errCtx + return nil, err + } lastStatus = 0 lastBody = nil lastErr = errRead @@ -796,6 +822,9 @@ func (e *AntigravityExecutor) CountTokens(ctx context.Context, auth *cliproxyaut httpResp, errDo := httpClient.Do(httpReq) if errDo != nil { recordAPIResponseError(ctx, e.cfg, errDo) + if errors.Is(errDo, context.Canceled) || errors.Is(errDo, context.DeadlineExceeded) { + return cliproxyexecutor.Response{}, errDo + } lastStatus = 0 lastBody = nil lastErr = errDo @@ -884,6 +913,9 @@ func FetchAntigravityModels(ctx context.Context, auth *cliproxyauth.Auth, cfg *c httpResp, errDo := httpClient.Do(httpReq) if errDo != nil { + if errors.Is(errDo, context.Canceled) || errors.Is(errDo, context.DeadlineExceeded) { + return nil + } if idx+1 < len(baseURLs) { log.Debugf("antigravity executor: models request error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1]) continue From ee62ef4745c9ac6905d6f104fd6ee8739271c673 Mon Sep 17 00:00:00 2001 From: hkfires <10558748+hkfires@users.noreply.github.com> Date: Fri, 9 Jan 2026 11:20:55 +0800 Subject: [PATCH 03/10] refactor(logging): clean up oauth logs and debugs --- internal/runtime/executor/logging_helpers.go | 6 +----- .../antigravity/claude/antigravity_claude_request.go | 9 +++------ .../antigravity/claude/antigravity_claude_response.go | 4 ++-- sdk/cliproxy/auth/conductor.go | 9 +-------- 4 files changed, 7 insertions(+), 21 deletions(-) diff --git a/internal/runtime/executor/logging_helpers.go b/internal/runtime/executor/logging_helpers.go index 26931f53..90532772 100644 --- a/internal/runtime/executor/logging_helpers.go +++ b/internal/runtime/executor/logging_helpers.go @@ -304,11 +304,7 @@ func formatAuthInfo(info upstreamRequestLog) string { parts = append(parts, "type=api_key") } case "oauth": - if authValue != "" { - parts = append(parts, fmt.Sprintf("type=oauth account=%s", authValue)) - } else { - parts = append(parts, "type=oauth") - } + parts = append(parts, "type=oauth") default: if authType != "" { if authValue != "" { diff --git a/internal/translator/antigravity/claude/antigravity_claude_request.go b/internal/translator/antigravity/claude/antigravity_claude_request.go index 2287bccc..d5064c3c 100644 --- a/internal/translator/antigravity/claude/antigravity_claude_request.go +++ b/internal/translator/antigravity/claude/antigravity_claude_request.go @@ -14,7 +14,6 @@ import ( "github.com/router-for-me/CLIProxyAPI/v6/internal/cache" "github.com/router-for-me/CLIProxyAPI/v6/internal/translator/gemini/common" "github.com/router-for-me/CLIProxyAPI/v6/internal/util" - log "github.com/sirupsen/logrus" "github.com/tidwall/gjson" "github.com/tidwall/sjson" ) @@ -136,14 +135,14 @@ func ConvertClaudeRequestToAntigravity(modelName string, inputRawJSON []byte, _ if sessionID != "" && thinkingText != "" { if cachedSig := cache.GetCachedSignature(sessionID, thinkingText); cachedSig != "" { signature = cachedSig - log.Debugf("Using cached signature for thinking block") + // log.Debugf("Using cached signature for thinking block") } } // Fallback to client signature only if cache miss and client signature is valid if signature == "" && cache.HasValidSignature(clientSignature) { signature = clientSignature - log.Debugf("Using client-provided signature for thinking block") + // log.Debugf("Using client-provided signature for thinking block") } // Store for subsequent tool_use in the same message @@ -158,8 +157,7 @@ func ConvertClaudeRequestToAntigravity(modelName string, inputRawJSON []byte, _ // Claude requires assistant messages to start with thinking blocks when thinking is enabled // Converting to text would break this requirement if isUnsigned { - // TypeScript plugin approach: drop unsigned thinking blocks entirely - log.Debugf("Dropping unsigned thinking block (no valid signature)") + // log.Debugf("Dropping unsigned thinking block (no valid signature)") continue } @@ -183,7 +181,6 @@ func ConvertClaudeRequestToAntigravity(modelName string, inputRawJSON []byte, _ } else if contentTypeResult.Type == gjson.String && contentTypeResult.String() == "tool_use" { // NOTE: Do NOT inject dummy thinking blocks here. // Antigravity API validates signatures, so dummy values are rejected. - // The TypeScript plugin removes unsigned thinking blocks instead of injecting dummies. functionName := contentResult.Get("name").String() argsResult := contentResult.Get("input") diff --git a/internal/translator/antigravity/claude/antigravity_claude_response.go b/internal/translator/antigravity/claude/antigravity_claude_response.go index 875e54a7..1672a835 100644 --- a/internal/translator/antigravity/claude/antigravity_claude_response.go +++ b/internal/translator/antigravity/claude/antigravity_claude_response.go @@ -136,11 +136,11 @@ func ConvertAntigravityResponseToClaude(_ context.Context, _ string, originalReq // Process thinking content (internal reasoning) if partResult.Get("thought").Bool() { if thoughtSignature := partResult.Get("thoughtSignature"); thoughtSignature.Exists() && thoughtSignature.String() != "" { - log.Debug("Branch: signature_delta") + // log.Debug("Branch: signature_delta") if params.SessionID != "" && params.CurrentThinkingText.Len() > 0 { cache.CacheSignature(params.SessionID, params.CurrentThinkingText.String(), thoughtSignature.String()) - log.Debugf("Cached signature for thinking block (sessionID=%s, textLen=%d)", params.SessionID, params.CurrentThinkingText.Len()) + // log.Debugf("Cached signature for thinking block (sessionID=%s, textLen=%d)", params.SessionID, params.CurrentThinkingText.Len()) params.CurrentThinkingText.Reset() } diff --git a/sdk/cliproxy/auth/conductor.go b/sdk/cliproxy/auth/conductor.go index f35cc416..689d3a21 100644 --- a/sdk/cliproxy/auth/conductor.go +++ b/sdk/cliproxy/auth/conductor.go @@ -1609,7 +1609,6 @@ func formatOauthIdentity(auth *Auth, provider string, accountInfo string) string if auth == nil { return "" } - authIndex := auth.EnsureIndex() // Prefer the auth's provider when available. providerName := strings.TrimSpace(auth.Provider) if providerName == "" { @@ -1631,16 +1630,10 @@ func formatOauthIdentity(auth *Auth, provider string, accountInfo string) string if authFile != "" { parts = append(parts, "auth_file="+authFile) } - if authIndex != "" { - parts = append(parts, "auth_index="+authIndex) - } if len(parts) == 0 { return accountInfo } - if accountInfo == "" { - return strings.Join(parts, " ") - } - return strings.Join(parts, " ") + " account=" + strconv.Quote(accountInfo) + return strings.Join(parts, " ") } // InjectCredentials delegates per-provider HTTP request preparation when supported. From 7004295e1d27af47c1eb283b9cd65c1853cf4748 Mon Sep 17 00:00:00 2001 From: hkfires <10558748+hkfires@users.noreply.github.com> Date: Fri, 9 Jan 2026 11:24:00 +0800 Subject: [PATCH 04/10] build(docker): move stats export execution after image build --- docker-build.sh | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docker-build.sh b/docker-build.sh index fc846703..944f3e78 100644 --- a/docker-build.sh +++ b/docker-build.sh @@ -152,16 +152,16 @@ case "$choice" in # Build and start the services with a local-only image tag export CLI_PROXY_IMAGE="cli-proxy-api:local" - if [[ "${WITH_USAGE}" == "true" ]]; then - export_stats - fi - echo "Building the Docker image..." docker compose build \ --build-arg VERSION="${VERSION}" \ --build-arg COMMIT="${COMMIT}" \ --build-arg BUILD_DATE="${BUILD_DATE}" + if [[ "${WITH_USAGE}" == "true" ]]; then + export_stats + fi + echo "Starting the services..." docker compose up -d --remove-orphans --pull never From dcac3407ab49ef6bc49c9d4b3973c596d7d3b841 Mon Sep 17 00:00:00 2001 From: Ben Vargas Date: Fri, 9 Jan 2026 00:10:38 -0700 Subject: [PATCH 05/10] Fix Claude OAuth tool name mapping Prefix tool names with proxy_ for Claude OAuth requests and strip the prefix from streaming and non-streaming responses to restore client-facing names. Updates the Claude executor to: - add prefixing for tools, tool_choice, and tool_use messages when using OAuth tokens - strip the prefix from tool_use events in SSE and non-streaming payloads - add focused unit tests for prefix/strip helpers --- internal/runtime/executor/claude_executor.go | 124 ++++++++++++++++++ .../runtime/executor/claude_executor_test.go | 51 +++++++ 2 files changed, 175 insertions(+) create mode 100644 internal/runtime/executor/claude_executor_test.go diff --git a/internal/runtime/executor/claude_executor.go b/internal/runtime/executor/claude_executor.go index 7be4f41b..e385b41e 100644 --- a/internal/runtime/executor/claude_executor.go +++ b/internal/runtime/executor/claude_executor.go @@ -35,6 +35,8 @@ type ClaudeExecutor struct { cfg *config.Config } +const claudeToolPrefix = "proxy_" + func NewClaudeExecutor(cfg *config.Config) *ClaudeExecutor { return &ClaudeExecutor{cfg: cfg} } func (e *ClaudeExecutor) Identifier() string { return "claude" } @@ -81,6 +83,9 @@ func (e *ClaudeExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r // Extract betas from body and convert to header var extraBetas []string extraBetas, body = extractAndRemoveBetas(body) + if isClaudeOAuthToken(apiKey) { + body = applyClaudeToolPrefix(body, claudeToolPrefix) + } url := fmt.Sprintf("%s/v1/messages?beta=true", baseURL) httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body)) @@ -152,6 +157,9 @@ func (e *ClaudeExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r } else { reporter.publish(ctx, parseClaudeUsage(data)) } + if isClaudeOAuthToken(apiKey) { + data = stripClaudeToolPrefixFromResponse(data, claudeToolPrefix) + } var param any out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, data, ¶m) resp = cliproxyexecutor.Response{Payload: []byte(out)} @@ -193,6 +201,9 @@ func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A // Extract betas from body and convert to header var extraBetas []string extraBetas, body = extractAndRemoveBetas(body) + if isClaudeOAuthToken(apiKey) { + body = applyClaudeToolPrefix(body, claudeToolPrefix) + } url := fmt.Sprintf("%s/v1/messages?beta=true", baseURL) httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body)) @@ -263,6 +274,9 @@ func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A if detail, ok := parseClaudeStreamUsage(line); ok { reporter.publish(ctx, detail) } + if isClaudeOAuthToken(apiKey) { + line = stripClaudeToolPrefixFromStreamLine(line, claudeToolPrefix) + } // Forward the line as-is to preserve SSE format cloned := make([]byte, len(line)+1) copy(cloned, line) @@ -287,6 +301,9 @@ func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A if detail, ok := parseClaudeStreamUsage(line); ok { reporter.publish(ctx, detail) } + if isClaudeOAuthToken(apiKey) { + line = stripClaudeToolPrefixFromStreamLine(line, claudeToolPrefix) + } chunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, bytes.Clone(line), ¶m) for i := range chunks { out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])} @@ -326,6 +343,9 @@ func (e *ClaudeExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Aut // Extract betas from body and convert to header (for count_tokens too) var extraBetas []string extraBetas, body = extractAndRemoveBetas(body) + if isClaudeOAuthToken(apiKey) { + body = applyClaudeToolPrefix(body, claudeToolPrefix) + } url := fmt.Sprintf("%s/v1/messages/count_tokens?beta=true", baseURL) httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body)) @@ -770,3 +790,107 @@ func checkSystemInstructions(payload []byte) []byte { } return payload } + +func isClaudeOAuthToken(apiKey string) bool { + return strings.Contains(apiKey, "sk-ant-oat") +} + +func applyClaudeToolPrefix(body []byte, prefix string) []byte { + if prefix == "" { + return body + } + + if tools := gjson.GetBytes(body, "tools"); tools.Exists() && tools.IsArray() { + tools.ForEach(func(index, tool gjson.Result) bool { + name := tool.Get("name").String() + if name == "" || strings.HasPrefix(name, prefix) { + return true + } + path := fmt.Sprintf("tools.%d.name", index.Int()) + body, _ = sjson.SetBytes(body, path, prefix+name) + return true + }) + } + + if gjson.GetBytes(body, "tool_choice.type").String() == "tool" { + name := gjson.GetBytes(body, "tool_choice.name").String() + if name != "" && !strings.HasPrefix(name, prefix) { + body, _ = sjson.SetBytes(body, "tool_choice.name", prefix+name) + } + } + + if messages := gjson.GetBytes(body, "messages"); messages.Exists() && messages.IsArray() { + messages.ForEach(func(msgIndex, msg gjson.Result) bool { + content := msg.Get("content") + if !content.Exists() || !content.IsArray() { + return true + } + content.ForEach(func(contentIndex, part gjson.Result) bool { + if part.Get("type").String() != "tool_use" { + return true + } + name := part.Get("name").String() + if name == "" || strings.HasPrefix(name, prefix) { + return true + } + path := fmt.Sprintf("messages.%d.content.%d.name", msgIndex.Int(), contentIndex.Int()) + body, _ = sjson.SetBytes(body, path, prefix+name) + return true + }) + return true + }) + } + + return body +} + +func stripClaudeToolPrefixFromResponse(body []byte, prefix string) []byte { + if prefix == "" { + return body + } + content := gjson.GetBytes(body, "content") + if !content.Exists() || !content.IsArray() { + return body + } + content.ForEach(func(index, part gjson.Result) bool { + if part.Get("type").String() != "tool_use" { + return true + } + name := part.Get("name").String() + if !strings.HasPrefix(name, prefix) { + return true + } + path := fmt.Sprintf("content.%d.name", index.Int()) + body, _ = sjson.SetBytes(body, path, strings.TrimPrefix(name, prefix)) + return true + }) + return body +} + +func stripClaudeToolPrefixFromStreamLine(line []byte, prefix string) []byte { + if prefix == "" { + return line + } + payload := jsonPayload(line) + if len(payload) == 0 || !gjson.ValidBytes(payload) { + return line + } + contentBlock := gjson.GetBytes(payload, "content_block") + if !contentBlock.Exists() || contentBlock.Get("type").String() != "tool_use" { + return line + } + name := contentBlock.Get("name").String() + if !strings.HasPrefix(name, prefix) { + return line + } + updated, err := sjson.SetBytes(payload, "content_block.name", strings.TrimPrefix(name, prefix)) + if err != nil { + return line + } + + trimmed := bytes.TrimSpace(line) + if bytes.HasPrefix(trimmed, []byte("data:")) { + return append([]byte("data: "), updated...) + } + return updated +} diff --git a/internal/runtime/executor/claude_executor_test.go b/internal/runtime/executor/claude_executor_test.go new file mode 100644 index 00000000..05f5b60c --- /dev/null +++ b/internal/runtime/executor/claude_executor_test.go @@ -0,0 +1,51 @@ +package executor + +import ( + "bytes" + "testing" + + "github.com/tidwall/gjson" +) + +func TestApplyClaudeToolPrefix(t *testing.T) { + input := []byte(`{"tools":[{"name":"alpha"},{"name":"proxy_bravo"}],"tool_choice":{"type":"tool","name":"charlie"},"messages":[{"role":"assistant","content":[{"type":"tool_use","name":"delta","id":"t1","input":{}}]}]}`) + out := applyClaudeToolPrefix(input, "proxy_") + + if got := gjson.GetBytes(out, "tools.0.name").String(); got != "proxy_alpha" { + t.Fatalf("tools.0.name = %q, want %q", got, "proxy_alpha") + } + if got := gjson.GetBytes(out, "tools.1.name").String(); got != "proxy_bravo" { + t.Fatalf("tools.1.name = %q, want %q", got, "proxy_bravo") + } + if got := gjson.GetBytes(out, "tool_choice.name").String(); got != "proxy_charlie" { + t.Fatalf("tool_choice.name = %q, want %q", got, "proxy_charlie") + } + if got := gjson.GetBytes(out, "messages.0.content.0.name").String(); got != "proxy_delta" { + t.Fatalf("messages.0.content.0.name = %q, want %q", got, "proxy_delta") + } +} + +func TestStripClaudeToolPrefixFromResponse(t *testing.T) { + input := []byte(`{"content":[{"type":"tool_use","name":"proxy_alpha","id":"t1","input":{}},{"type":"tool_use","name":"bravo","id":"t2","input":{}}]}`) + out := stripClaudeToolPrefixFromResponse(input, "proxy_") + + if got := gjson.GetBytes(out, "content.0.name").String(); got != "alpha" { + t.Fatalf("content.0.name = %q, want %q", got, "alpha") + } + if got := gjson.GetBytes(out, "content.1.name").String(); got != "bravo" { + t.Fatalf("content.1.name = %q, want %q", got, "bravo") + } +} + +func TestStripClaudeToolPrefixFromStreamLine(t *testing.T) { + line := []byte(`data: {"type":"content_block_start","content_block":{"type":"tool_use","name":"proxy_alpha","id":"t1"},"index":0}`) + out := stripClaudeToolPrefixFromStreamLine(line, "proxy_") + + payload := bytes.TrimSpace(out) + if bytes.HasPrefix(payload, []byte("data:")) { + payload = bytes.TrimSpace(payload[len("data:"):]) + } + if got := gjson.GetBytes(payload, "content_block.name").String(); got != "alpha" { + t.Fatalf("content_block.name = %q, want %q", got, "alpha") + } +} From 47dacce6ea65e0303368482b9f7f2cd57c55c0dd Mon Sep 17 00:00:00 2001 From: hemanta212 Date: Fri, 9 Jan 2026 13:33:46 +0545 Subject: [PATCH 06/10] fix(server): resolve memory leaks causing OOM in k8s deployment - usage/logger_plugin: cap modelStats.Details at 1000 entries per model - cache/signature_cache: add background cleanup for expired sessions (10 min) - management/handler: add background cleanup for stale IP rate-limit entries (1 hr) - executor/cache_helpers: add mutex protection and TTL cleanup for codexCacheMap (15 min) - executor/codex_executor: use thread-safe cache accessors Add reproduction tests demonstrating leak behavior before/after fixes. Amp-Thread-ID: https://ampcode.com/threads/T-019ba0fc-1d7b-7338-8e1d-ca0520412777 Co-authored-by: Amp --- internal/api/handlers/management/handler.go | 42 +++- internal/cache/signature_cache.go | 43 ++++ internal/memleak_compare_test.go | 219 ++++++++++++++++++++ internal/memleak_repro_test.go | 151 ++++++++++++++ internal/runtime/executor/cache_helpers.go | 62 +++++- internal/runtime/executor/codex_executor.go | 6 +- internal/usage/logger_plugin.go | 9 + 7 files changed, 526 insertions(+), 6 deletions(-) create mode 100644 internal/memleak_compare_test.go create mode 100644 internal/memleak_repro_test.go diff --git a/internal/api/handlers/management/handler.go b/internal/api/handlers/management/handler.go index d3ccbda6..613c9841 100644 --- a/internal/api/handlers/management/handler.go +++ b/internal/api/handlers/management/handler.go @@ -24,8 +24,15 @@ import ( type attemptInfo struct { count int blockedUntil time.Time + lastActivity time.Time // track last activity for cleanup } +// attemptCleanupInterval controls how often stale IP entries are purged +const attemptCleanupInterval = 1 * time.Hour + +// attemptMaxIdleTime controls how long an IP can be idle before cleanup +const attemptMaxIdleTime = 2 * time.Hour + // Handler aggregates config reference, persistence path and helpers. type Handler struct { cfg *config.Config @@ -47,7 +54,7 @@ func NewHandler(cfg *config.Config, configFilePath string, manager *coreauth.Man envSecret, _ := os.LookupEnv("MANAGEMENT_PASSWORD") envSecret = strings.TrimSpace(envSecret) - return &Handler{ + h := &Handler{ cfg: cfg, configFilePath: configFilePath, failedAttempts: make(map[string]*attemptInfo), @@ -57,6 +64,38 @@ func NewHandler(cfg *config.Config, configFilePath string, manager *coreauth.Man allowRemoteOverride: envSecret != "", envSecret: envSecret, } + h.startAttemptCleanup() + return h +} + +// startAttemptCleanup launches a background goroutine that periodically +// removes stale IP entries from failedAttempts to prevent memory leaks. +func (h *Handler) startAttemptCleanup() { + go func() { + ticker := time.NewTicker(attemptCleanupInterval) + defer ticker.Stop() + for range ticker.C { + h.purgeStaleAttempts() + } + }() +} + +// purgeStaleAttempts removes IP entries that have been idle beyond attemptMaxIdleTime +// and whose ban (if any) has expired. +func (h *Handler) purgeStaleAttempts() { + now := time.Now() + h.attemptsMu.Lock() + defer h.attemptsMu.Unlock() + for ip, ai := range h.failedAttempts { + // Skip if still banned + if !ai.blockedUntil.IsZero() && now.Before(ai.blockedUntil) { + continue + } + // Remove if idle too long + if now.Sub(ai.lastActivity) > attemptMaxIdleTime { + delete(h.failedAttempts, ip) + } + } } // NewHandler creates a new management handler instance. @@ -149,6 +188,7 @@ func (h *Handler) Middleware() gin.HandlerFunc { h.failedAttempts[clientIP] = aip } aip.count++ + aip.lastActivity = time.Now() if aip.count >= maxFailures { aip.blockedUntil = time.Now().Add(banDuration) aip.count = 0 diff --git a/internal/cache/signature_cache.go b/internal/cache/signature_cache.go index c1326629..d4a864e0 100644 --- a/internal/cache/signature_cache.go +++ b/internal/cache/signature_cache.go @@ -26,11 +26,17 @@ const ( // MinValidSignatureLen is the minimum length for a signature to be considered valid MinValidSignatureLen = 50 + + // SessionCleanupInterval controls how often stale sessions are purged + SessionCleanupInterval = 10 * time.Minute ) // signatureCache stores signatures by sessionId -> textHash -> SignatureEntry var signatureCache sync.Map +// sessionCleanupOnce ensures the background cleanup goroutine starts only once +var sessionCleanupOnce sync.Once + // sessionCache is the inner map type type sessionCache struct { mu sync.RWMutex @@ -45,6 +51,9 @@ func hashText(text string) string { // getOrCreateSession gets or creates a session cache func getOrCreateSession(sessionID string) *sessionCache { + // Start background cleanup on first access + sessionCleanupOnce.Do(startSessionCleanup) + if val, ok := signatureCache.Load(sessionID); ok { return val.(*sessionCache) } @@ -53,6 +62,40 @@ func getOrCreateSession(sessionID string) *sessionCache { return actual.(*sessionCache) } +// startSessionCleanup launches a background goroutine that periodically +// removes sessions where all entries have expired. +func startSessionCleanup() { + go func() { + ticker := time.NewTicker(SessionCleanupInterval) + defer ticker.Stop() + for range ticker.C { + purgeExpiredSessions() + } + }() +} + +// purgeExpiredSessions removes sessions with no valid (non-expired) entries. +func purgeExpiredSessions() { + now := time.Now() + signatureCache.Range(func(key, value any) bool { + sc := value.(*sessionCache) + sc.mu.Lock() + // Remove expired entries + for k, entry := range sc.entries { + if now.Sub(entry.Timestamp) > SignatureCacheTTL { + delete(sc.entries, k) + } + } + isEmpty := len(sc.entries) == 0 + sc.mu.Unlock() + // Remove session if empty + if isEmpty { + signatureCache.Delete(key) + } + return true + }) +} + // CacheSignature stores a thinking signature for a given session and text. // Used for Claude models that require signed thinking blocks in multi-turn conversations. func CacheSignature(sessionID, text, signature string) { diff --git a/internal/memleak_compare_test.go b/internal/memleak_compare_test.go new file mode 100644 index 00000000..63581abb --- /dev/null +++ b/internal/memleak_compare_test.go @@ -0,0 +1,219 @@ +// Package internal demonstrates the memory leak that existed before the fix. +// This file shows what happens WITHOUT the maxDetailsPerModel cap. +package internal + +import ( + "fmt" + "runtime" + "testing" + "time" +) + +// UnboundedRequestStatistics is a copy of the ORIGINAL code WITHOUT the fix +// to demonstrate the memory leak behavior. +type UnboundedRequestStatistics struct { + totalRequests int64 + apis map[string]*unboundedAPIStats +} + +type unboundedAPIStats struct { + TotalRequests int64 + Models map[string]*unboundedModelStats +} + +type unboundedModelStats struct { + TotalRequests int64 + Details []unboundedRequestDetail // NO CAP - grows forever! +} + +type unboundedRequestDetail struct { + Timestamp time.Time + Tokens int64 +} + +func NewUnboundedRequestStatistics() *UnboundedRequestStatistics { + return &UnboundedRequestStatistics{ + apis: make(map[string]*unboundedAPIStats), + } +} + +// Record is the ORIGINAL implementation that leaks memory +func (s *UnboundedRequestStatistics) Record(apiKey, model string, tokens int64) { + stats, ok := s.apis[apiKey] + if !ok { + stats = &unboundedAPIStats{Models: make(map[string]*unboundedModelStats)} + s.apis[apiKey] = stats + } + modelStats, ok := stats.Models[model] + if !ok { + modelStats = &unboundedModelStats{} + stats.Models[model] = modelStats + } + modelStats.TotalRequests++ + // BUG: This grows forever with no cap! + modelStats.Details = append(modelStats.Details, unboundedRequestDetail{ + Timestamp: time.Now(), + Tokens: tokens, + }) + s.totalRequests++ +} + +func (s *UnboundedRequestStatistics) CountDetails() int { + total := 0 + for _, api := range s.apis { + for _, model := range api.Models { + total += len(model.Details) + } + } + return total +} + +func TestMemoryLeak_BEFORE_Fix_Unbounded(t *testing.T) { + // This demonstrates the LEAK behavior before the fix + stats := NewUnboundedRequestStatistics() + + var m runtime.MemStats + runtime.GC() + runtime.ReadMemStats(&m) + allocBefore := float64(m.Alloc) / 1024 / 1024 + + t.Logf("=== DEMONSTRATING LEAK (unbounded growth) ===") + t.Logf("Before: %.2f MB, Details: %d", allocBefore, stats.CountDetails()) + + // Simulate traffic over "hours" - in production this causes OOM + for hour := 1; hour <= 5; hour++ { + for i := 0; i < 20000; i++ { + stats.Record( + fmt.Sprintf("api-key-%d", i%10), + fmt.Sprintf("model-%d", i%5), + 1500, + ) + } + runtime.GC() + runtime.ReadMemStats(&m) + allocNow := float64(m.Alloc) / 1024 / 1024 + t.Logf("Hour %d: %.2f MB, Details: %d (growth: +%.2f MB)", + hour, allocNow, stats.CountDetails(), allocNow-allocBefore) + } + + // Show the problem: details count = total requests (unbounded) + totalDetails := stats.CountDetails() + totalRequests := 5 * 20000 // 100k requests + t.Logf("LEAK EVIDENCE: %d details stored for %d requests (ratio: %.2f)", + totalDetails, totalRequests, float64(totalDetails)/float64(totalRequests)) + + if totalDetails == totalRequests { + t.Logf("CONFIRMED: Every request stored forever = memory leak!") + } +} + +func TestMemoryLeak_AFTER_Fix_Bounded(t *testing.T) { + // This demonstrates the FIXED behavior with capped growth + // Using the real implementation which now has the fix + stats := NewBoundedRequestStatistics() + + var m runtime.MemStats + runtime.GC() + runtime.ReadMemStats(&m) + allocBefore := float64(m.Alloc) / 1024 / 1024 + + t.Logf("=== DEMONSTRATING FIX (bounded growth) ===") + t.Logf("Before: %.2f MB, Details: %d", allocBefore, stats.CountDetails()) + + for hour := 1; hour <= 5; hour++ { + for i := 0; i < 20000; i++ { + stats.Record( + fmt.Sprintf("api-key-%d", i%10), + fmt.Sprintf("model-%d", i%5), + 1500, + ) + } + runtime.GC() + runtime.ReadMemStats(&m) + allocNow := float64(m.Alloc) / 1024 / 1024 + t.Logf("Hour %d: %.2f MB, Details: %d (growth: +%.2f MB)", + hour, allocNow, stats.CountDetails(), allocNow-allocBefore) + } + + totalDetails := stats.CountDetails() + maxExpected := 10 * 5 * 1000 // 10 API keys * 5 models * 1000 cap = 50k max + t.Logf("FIX EVIDENCE: %d details stored (max possible: %d)", totalDetails, maxExpected) + + if totalDetails <= maxExpected { + t.Logf("CONFIRMED: Details capped, memory bounded!") + } else { + t.Errorf("STILL LEAKING: %d > %d", totalDetails, maxExpected) + } +} + +// BoundedRequestStatistics is the FIXED version with cap +type BoundedRequestStatistics struct { + apis map[string]*boundedAPIStats +} + +type boundedAPIStats struct { + Models map[string]*boundedModelStats +} + +type boundedModelStats struct { + Details []unboundedRequestDetail +} + +const maxDetailsPerModelTest = 1000 + +func NewBoundedRequestStatistics() *BoundedRequestStatistics { + return &BoundedRequestStatistics{ + apis: make(map[string]*boundedAPIStats), + } +} + +func (s *BoundedRequestStatistics) Record(apiKey, model string, tokens int64) { + stats, ok := s.apis[apiKey] + if !ok { + stats = &boundedAPIStats{Models: make(map[string]*boundedModelStats)} + s.apis[apiKey] = stats + } + modelStats, ok := stats.Models[model] + if !ok { + modelStats = &boundedModelStats{} + stats.Models[model] = modelStats + } + modelStats.Details = append(modelStats.Details, unboundedRequestDetail{ + Timestamp: time.Now(), + Tokens: tokens, + }) + // THE FIX: Cap the details slice + if len(modelStats.Details) > maxDetailsPerModelTest { + excess := len(modelStats.Details) - maxDetailsPerModelTest + modelStats.Details = modelStats.Details[excess:] + } +} + +func (s *BoundedRequestStatistics) CountDetails() int { + total := 0 + for _, api := range s.apis { + for _, model := range api.Models { + total += len(model.Details) + } + } + return total +} + +func TestCompare_LeakVsFix(t *testing.T) { + t.Log("=== SIDE-BY-SIDE COMPARISON ===") + + unbounded := NewUnboundedRequestStatistics() + bounded := NewBoundedRequestStatistics() + + // Same workload + for i := 0; i < 50000; i++ { + apiKey := fmt.Sprintf("key-%d", i%10) + model := fmt.Sprintf("model-%d", i%5) + unbounded.Record(apiKey, model, 1500) + bounded.Record(apiKey, model, 1500) + } + + t.Logf("UNBOUNDED (leak): %d details stored", unbounded.CountDetails()) + t.Logf("BOUNDED (fixed): %d details stored", bounded.CountDetails()) + t.Logf("Memory saved: %dx reduction", unbounded.CountDetails()/bounded.CountDetails()) +} diff --git a/internal/memleak_repro_test.go b/internal/memleak_repro_test.go new file mode 100644 index 00000000..9ace27f5 --- /dev/null +++ b/internal/memleak_repro_test.go @@ -0,0 +1,151 @@ +// Package internal contains memory leak reproduction tests. +// Run with: go test -v -run TestMemoryLeak -memprofile=mem.prof ./internal/ +package internal + +import ( + "context" + "fmt" + "runtime" + "testing" + "time" + + "github.com/router-for-me/CLIProxyAPI/v6/internal/cache" + "github.com/router-for-me/CLIProxyAPI/v6/internal/usage" + coreusage "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/usage" +) + +func getMemStats() (allocMB, heapMB float64) { + var m runtime.MemStats + runtime.GC() + runtime.ReadMemStats(&m) + return float64(m.Alloc) / 1024 / 1024, float64(m.HeapAlloc) / 1024 / 1024 +} + +func TestMemoryLeak_UsageStats(t *testing.T) { + // This test simulates the usage statistics leak where Details grows unbounded + stats := usage.NewRequestStatistics() + + allocBefore, heapBefore := getMemStats() + t.Logf("Before: Alloc=%.2f MB, Heap=%.2f MB", allocBefore, heapBefore) + + // Simulate 10k requests (would happen over hours/days in production) + numRequests := 10000 + for i := 0; i < numRequests; i++ { + stats.Record(context.Background(), coreusage.Record{ + Provider: "test-provider", + Model: fmt.Sprintf("model-%d", i%10), // 10 different models + APIKey: fmt.Sprintf("api-key-%d", i%5), + RequestedAt: time.Now(), + Detail: coreusage.Detail{ + InputTokens: 1000, + OutputTokens: 500, + TotalTokens: 1500, + }, + }) + } + + allocAfter, heapAfter := getMemStats() + t.Logf("After %d requests: Alloc=%.2f MB, Heap=%.2f MB", numRequests, allocAfter, heapAfter) + t.Logf("Growth: Alloc=+%.2f MB, Heap=+%.2f MB", allocAfter-allocBefore, heapAfter-heapBefore) + + // Verify the cap is working - check snapshot + snapshot := stats.Snapshot() + for apiName, apiSnap := range snapshot.APIs { + for modelName, modelSnap := range apiSnap.Models { + if len(modelSnap.Details) > 1000 { + t.Errorf("LEAK: API %s Model %s has %d details (should be <= 1000)", + apiName, modelName, len(modelSnap.Details)) + } else { + t.Logf("OK: API %s Model %s has %d details (capped at 1000)", + apiName, modelName, len(modelSnap.Details)) + } + } + } +} + +func TestMemoryLeak_SignatureCache(t *testing.T) { + // This test simulates the signature cache leak where sessions accumulate + allocBefore, heapBefore := getMemStats() + t.Logf("Before: Alloc=%.2f MB, Heap=%.2f MB", allocBefore, heapBefore) + + // Simulate 1000 unique sessions (each with signatures) + numSessions := 1000 + sigText := string(make([]byte, 100)) // 100 byte signature text + sig := string(make([]byte, 200)) // 200 byte signature (> MinValidSignatureLen) + + for i := 0; i < numSessions; i++ { + sessionID := fmt.Sprintf("session-%d", i) + // Each session caches 50 signatures + for j := 0; j < 50; j++ { + text := fmt.Sprintf("%s-text-%d", sigText, j) + signature := fmt.Sprintf("%s-sig-%d", sig, j) + cache.CacheSignature(sessionID, text, signature) + } + } + + allocAfter, heapAfter := getMemStats() + t.Logf("After %d sessions x 50 sigs: Alloc=%.2f MB, Heap=%.2f MB", + numSessions, allocAfter, heapAfter) + t.Logf("Growth: Alloc=+%.2f MB, Heap=+%.2f MB", allocAfter-allocBefore, heapAfter-heapBefore) + + // Clear all and check memory drops + cache.ClearSignatureCache("") + runtime.GC() + + allocCleared, heapCleared := getMemStats() + t.Logf("After clear: Alloc=%.2f MB, Heap=%.2f MB", allocCleared, heapCleared) + t.Logf("Recovered: Alloc=%.2f MB, Heap=%.2f MB", + allocAfter-allocCleared, heapAfter-heapCleared) + + if allocCleared > allocBefore*1.5 { + t.Logf("WARNING: Memory not fully recovered after clear (may indicate leak)") + } +} + +func TestMemoryLeak_SimulateProductionLoad(t *testing.T) { + // Simulate realistic production load pattern over time + stats := usage.NewRequestStatistics() + + t.Log("=== Simulating production load pattern ===") + + // Phase 1: Ramp up + allocStart, _ := getMemStats() + t.Logf("Start: %.2f MB", allocStart) + + // Simulate 1 hour of traffic (compressed into fast iterations) + // Real: ~1000 req/min = 60k/hour + // Test: 60k requests + for hour := 0; hour < 3; hour++ { + for i := 0; i < 20000; i++ { + stats.Record(context.Background(), coreusage.Record{ + Provider: "antigravity", + Model: fmt.Sprintf("gemini-2.5-pro-%d", i%5), + APIKey: fmt.Sprintf("user-%d", i%100), + RequestedAt: time.Now(), + Detail: coreusage.Detail{ + InputTokens: int64(1000 + i%500), + OutputTokens: int64(200 + i%100), + TotalTokens: int64(1200 + i%600), + }, + }) + } + allocNow, _ := getMemStats() + t.Logf("Hour %d: %.2f MB (growth: +%.2f MB)", hour+1, allocNow, allocNow-allocStart) + } + + allocEnd, _ := getMemStats() + totalGrowth := allocEnd - allocStart + + // With the fix, growth should be bounded + // Without fix: would grow linearly with requests + // With fix: should plateau around 1000 details * num_models * detail_size + t.Logf("Total growth over 60k requests: %.2f MB", totalGrowth) + + // Rough estimate: 1000 details * 5 models * 100 APIs * ~200 bytes = ~100MB max + // Should be well under 50MB for this test + if totalGrowth > 100 { + t.Errorf("POTENTIAL LEAK: Growth of %.2f MB is too high for bounded storage", totalGrowth) + } else { + t.Logf("OK: Memory growth is bounded at %.2f MB", totalGrowth) + } +} diff --git a/internal/runtime/executor/cache_helpers.go b/internal/runtime/executor/cache_helpers.go index 5272686b..b6de886d 100644 --- a/internal/runtime/executor/cache_helpers.go +++ b/internal/runtime/executor/cache_helpers.go @@ -1,10 +1,68 @@ package executor -import "time" +import ( + "sync" + "time" +) type codexCache struct { ID string Expire time.Time } -var codexCacheMap = map[string]codexCache{} +// codexCacheMap stores prompt cache IDs keyed by model+user_id. +// Protected by codexCacheMu. Entries expire after 1 hour. +var ( + codexCacheMap = make(map[string]codexCache) + codexCacheMu sync.RWMutex +) + +// codexCacheCleanupInterval controls how often expired entries are purged. +const codexCacheCleanupInterval = 15 * time.Minute + +// codexCacheCleanupOnce ensures the background cleanup goroutine starts only once. +var codexCacheCleanupOnce sync.Once + +// startCodexCacheCleanup launches a background goroutine that periodically +// removes expired entries from codexCacheMap to prevent memory leaks. +func startCodexCacheCleanup() { + go func() { + ticker := time.NewTicker(codexCacheCleanupInterval) + defer ticker.Stop() + for range ticker.C { + purgeExpiredCodexCache() + } + }() +} + +// purgeExpiredCodexCache removes entries that have expired. +func purgeExpiredCodexCache() { + now := time.Now() + codexCacheMu.Lock() + defer codexCacheMu.Unlock() + for key, cache := range codexCacheMap { + if cache.Expire.Before(now) { + delete(codexCacheMap, key) + } + } +} + +// getCodexCache retrieves a cached entry, returning ok=false if not found or expired. +func getCodexCache(key string) (codexCache, bool) { + codexCacheCleanupOnce.Do(startCodexCacheCleanup) + codexCacheMu.RLock() + cache, ok := codexCacheMap[key] + codexCacheMu.RUnlock() + if !ok || cache.Expire.Before(time.Now()) { + return codexCache{}, false + } + return cache, true +} + +// setCodexCache stores a cache entry. +func setCodexCache(key string, cache codexCache) { + codexCacheCleanupOnce.Do(startCodexCacheCleanup) + codexCacheMu.Lock() + codexCacheMap[key] = cache + codexCacheMu.Unlock() +} diff --git a/internal/runtime/executor/codex_executor.go b/internal/runtime/executor/codex_executor.go index 0788e4f1..8e7c8df9 100644 --- a/internal/runtime/executor/codex_executor.go +++ b/internal/runtime/executor/codex_executor.go @@ -457,14 +457,14 @@ func (e *CodexExecutor) cacheHelper(ctx context.Context, from sdktranslator.Form if from == "claude" { userIDResult := gjson.GetBytes(req.Payload, "metadata.user_id") if userIDResult.Exists() { - var hasKey bool key := fmt.Sprintf("%s-%s", req.Model, userIDResult.String()) - if cache, hasKey = codexCacheMap[key]; !hasKey || cache.Expire.Before(time.Now()) { + var ok bool + if cache, ok = getCodexCache(key); !ok { cache = codexCache{ ID: uuid.New().String(), Expire: time.Now().Add(1 * time.Hour), } - codexCacheMap[key] = cache + setCodexCache(key, cache) } } } else if from == "openai-response" { diff --git a/internal/usage/logger_plugin.go b/internal/usage/logger_plugin.go index e4371e8d..38177d7d 100644 --- a/internal/usage/logger_plugin.go +++ b/internal/usage/logger_plugin.go @@ -87,6 +87,10 @@ type modelStats struct { Details []RequestDetail } +// maxDetailsPerModel limits the number of request details retained per model +// to prevent unbounded memory growth. Oldest entries are dropped when exceeded. +const maxDetailsPerModel = 1000 + // RequestDetail stores the timestamp and token usage for a single request. type RequestDetail struct { Timestamp time.Time `json:"timestamp"` @@ -221,6 +225,11 @@ func (s *RequestStatistics) updateAPIStats(stats *apiStats, model string, detail modelStatsValue.TotalRequests++ modelStatsValue.TotalTokens += detail.Tokens.TotalTokens modelStatsValue.Details = append(modelStatsValue.Details, detail) + // Prevent unbounded growth by dropping oldest entries when limit exceeded + if len(modelStatsValue.Details) > maxDetailsPerModel { + excess := len(modelStatsValue.Details) - maxDetailsPerModel + modelStatsValue.Details = modelStatsValue.Details[excess:] + } } // Snapshot returns a copy of the aggregated metrics for external consumption. From e785bfcd127d864136b204211b9609949668c952 Mon Sep 17 00:00:00 2001 From: Ben Vargas Date: Fri, 9 Jan 2026 00:54:35 -0700 Subject: [PATCH 07/10] Use unprefixed Claude request for translation Keep the upstream payload prefixed for OAuth while passing the unprefixed request body into response translators. This avoids proxy_ leaking into OpenAI Responses echoed tool metadata while preserving the Claude OAuth workaround. --- internal/runtime/executor/claude_executor.go | 38 +++++++++++++++----- 1 file changed, 30 insertions(+), 8 deletions(-) diff --git a/internal/runtime/executor/claude_executor.go b/internal/runtime/executor/claude_executor.go index e385b41e..d426326f 100644 --- a/internal/runtime/executor/claude_executor.go +++ b/internal/runtime/executor/claude_executor.go @@ -83,12 +83,14 @@ func (e *ClaudeExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r // Extract betas from body and convert to header var extraBetas []string extraBetas, body = extractAndRemoveBetas(body) + bodyForTranslation := body + bodyForUpstream := body if isClaudeOAuthToken(apiKey) { - body = applyClaudeToolPrefix(body, claudeToolPrefix) + bodyForUpstream = applyClaudeToolPrefix(body, claudeToolPrefix) } url := fmt.Sprintf("%s/v1/messages?beta=true", baseURL) - httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body)) + httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(bodyForUpstream)) if err != nil { return resp, err } @@ -103,7 +105,7 @@ func (e *ClaudeExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r URL: url, Method: http.MethodPost, Headers: httpReq.Header.Clone(), - Body: body, + Body: bodyForUpstream, Provider: e.Identifier(), AuthID: authID, AuthLabel: authLabel, @@ -161,7 +163,16 @@ func (e *ClaudeExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r data = stripClaudeToolPrefixFromResponse(data, claudeToolPrefix) } var param any - out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, data, ¶m) + out := sdktranslator.TranslateNonStream( + ctx, + to, + from, + req.Model, + bytes.Clone(opts.OriginalRequest), + bodyForTranslation, + data, + ¶m, + ) resp = cliproxyexecutor.Response{Payload: []byte(out)} return resp, nil } @@ -201,12 +212,14 @@ func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A // Extract betas from body and convert to header var extraBetas []string extraBetas, body = extractAndRemoveBetas(body) + bodyForTranslation := body + bodyForUpstream := body if isClaudeOAuthToken(apiKey) { - body = applyClaudeToolPrefix(body, claudeToolPrefix) + bodyForUpstream = applyClaudeToolPrefix(body, claudeToolPrefix) } url := fmt.Sprintf("%s/v1/messages?beta=true", baseURL) - httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body)) + httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(bodyForUpstream)) if err != nil { return nil, err } @@ -221,7 +234,7 @@ func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A URL: url, Method: http.MethodPost, Headers: httpReq.Header.Clone(), - Body: body, + Body: bodyForUpstream, Provider: e.Identifier(), AuthID: authID, AuthLabel: authLabel, @@ -304,7 +317,16 @@ func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A if isClaudeOAuthToken(apiKey) { line = stripClaudeToolPrefixFromStreamLine(line, claudeToolPrefix) } - chunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, bytes.Clone(line), ¶m) + chunks := sdktranslator.TranslateStream( + ctx, + to, + from, + req.Model, + bytes.Clone(opts.OriginalRequest), + bodyForTranslation, + bytes.Clone(line), + ¶m, + ) for i := range chunks { out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])} } From 1c773c428f624509d97bb23fd244b2b7b1cf0588 Mon Sep 17 00:00:00 2001 From: hemanta212 Date: Fri, 9 Jan 2026 17:47:59 +0545 Subject: [PATCH 08/10] fix: Remove investigation artifacts --- internal/memleak_compare_test.go | 219 ------------------------------- internal/memleak_repro_test.go | 151 --------------------- 2 files changed, 370 deletions(-) delete mode 100644 internal/memleak_compare_test.go delete mode 100644 internal/memleak_repro_test.go diff --git a/internal/memleak_compare_test.go b/internal/memleak_compare_test.go deleted file mode 100644 index 63581abb..00000000 --- a/internal/memleak_compare_test.go +++ /dev/null @@ -1,219 +0,0 @@ -// Package internal demonstrates the memory leak that existed before the fix. -// This file shows what happens WITHOUT the maxDetailsPerModel cap. -package internal - -import ( - "fmt" - "runtime" - "testing" - "time" -) - -// UnboundedRequestStatistics is a copy of the ORIGINAL code WITHOUT the fix -// to demonstrate the memory leak behavior. -type UnboundedRequestStatistics struct { - totalRequests int64 - apis map[string]*unboundedAPIStats -} - -type unboundedAPIStats struct { - TotalRequests int64 - Models map[string]*unboundedModelStats -} - -type unboundedModelStats struct { - TotalRequests int64 - Details []unboundedRequestDetail // NO CAP - grows forever! -} - -type unboundedRequestDetail struct { - Timestamp time.Time - Tokens int64 -} - -func NewUnboundedRequestStatistics() *UnboundedRequestStatistics { - return &UnboundedRequestStatistics{ - apis: make(map[string]*unboundedAPIStats), - } -} - -// Record is the ORIGINAL implementation that leaks memory -func (s *UnboundedRequestStatistics) Record(apiKey, model string, tokens int64) { - stats, ok := s.apis[apiKey] - if !ok { - stats = &unboundedAPIStats{Models: make(map[string]*unboundedModelStats)} - s.apis[apiKey] = stats - } - modelStats, ok := stats.Models[model] - if !ok { - modelStats = &unboundedModelStats{} - stats.Models[model] = modelStats - } - modelStats.TotalRequests++ - // BUG: This grows forever with no cap! - modelStats.Details = append(modelStats.Details, unboundedRequestDetail{ - Timestamp: time.Now(), - Tokens: tokens, - }) - s.totalRequests++ -} - -func (s *UnboundedRequestStatistics) CountDetails() int { - total := 0 - for _, api := range s.apis { - for _, model := range api.Models { - total += len(model.Details) - } - } - return total -} - -func TestMemoryLeak_BEFORE_Fix_Unbounded(t *testing.T) { - // This demonstrates the LEAK behavior before the fix - stats := NewUnboundedRequestStatistics() - - var m runtime.MemStats - runtime.GC() - runtime.ReadMemStats(&m) - allocBefore := float64(m.Alloc) / 1024 / 1024 - - t.Logf("=== DEMONSTRATING LEAK (unbounded growth) ===") - t.Logf("Before: %.2f MB, Details: %d", allocBefore, stats.CountDetails()) - - // Simulate traffic over "hours" - in production this causes OOM - for hour := 1; hour <= 5; hour++ { - for i := 0; i < 20000; i++ { - stats.Record( - fmt.Sprintf("api-key-%d", i%10), - fmt.Sprintf("model-%d", i%5), - 1500, - ) - } - runtime.GC() - runtime.ReadMemStats(&m) - allocNow := float64(m.Alloc) / 1024 / 1024 - t.Logf("Hour %d: %.2f MB, Details: %d (growth: +%.2f MB)", - hour, allocNow, stats.CountDetails(), allocNow-allocBefore) - } - - // Show the problem: details count = total requests (unbounded) - totalDetails := stats.CountDetails() - totalRequests := 5 * 20000 // 100k requests - t.Logf("LEAK EVIDENCE: %d details stored for %d requests (ratio: %.2f)", - totalDetails, totalRequests, float64(totalDetails)/float64(totalRequests)) - - if totalDetails == totalRequests { - t.Logf("CONFIRMED: Every request stored forever = memory leak!") - } -} - -func TestMemoryLeak_AFTER_Fix_Bounded(t *testing.T) { - // This demonstrates the FIXED behavior with capped growth - // Using the real implementation which now has the fix - stats := NewBoundedRequestStatistics() - - var m runtime.MemStats - runtime.GC() - runtime.ReadMemStats(&m) - allocBefore := float64(m.Alloc) / 1024 / 1024 - - t.Logf("=== DEMONSTRATING FIX (bounded growth) ===") - t.Logf("Before: %.2f MB, Details: %d", allocBefore, stats.CountDetails()) - - for hour := 1; hour <= 5; hour++ { - for i := 0; i < 20000; i++ { - stats.Record( - fmt.Sprintf("api-key-%d", i%10), - fmt.Sprintf("model-%d", i%5), - 1500, - ) - } - runtime.GC() - runtime.ReadMemStats(&m) - allocNow := float64(m.Alloc) / 1024 / 1024 - t.Logf("Hour %d: %.2f MB, Details: %d (growth: +%.2f MB)", - hour, allocNow, stats.CountDetails(), allocNow-allocBefore) - } - - totalDetails := stats.CountDetails() - maxExpected := 10 * 5 * 1000 // 10 API keys * 5 models * 1000 cap = 50k max - t.Logf("FIX EVIDENCE: %d details stored (max possible: %d)", totalDetails, maxExpected) - - if totalDetails <= maxExpected { - t.Logf("CONFIRMED: Details capped, memory bounded!") - } else { - t.Errorf("STILL LEAKING: %d > %d", totalDetails, maxExpected) - } -} - -// BoundedRequestStatistics is the FIXED version with cap -type BoundedRequestStatistics struct { - apis map[string]*boundedAPIStats -} - -type boundedAPIStats struct { - Models map[string]*boundedModelStats -} - -type boundedModelStats struct { - Details []unboundedRequestDetail -} - -const maxDetailsPerModelTest = 1000 - -func NewBoundedRequestStatistics() *BoundedRequestStatistics { - return &BoundedRequestStatistics{ - apis: make(map[string]*boundedAPIStats), - } -} - -func (s *BoundedRequestStatistics) Record(apiKey, model string, tokens int64) { - stats, ok := s.apis[apiKey] - if !ok { - stats = &boundedAPIStats{Models: make(map[string]*boundedModelStats)} - s.apis[apiKey] = stats - } - modelStats, ok := stats.Models[model] - if !ok { - modelStats = &boundedModelStats{} - stats.Models[model] = modelStats - } - modelStats.Details = append(modelStats.Details, unboundedRequestDetail{ - Timestamp: time.Now(), - Tokens: tokens, - }) - // THE FIX: Cap the details slice - if len(modelStats.Details) > maxDetailsPerModelTest { - excess := len(modelStats.Details) - maxDetailsPerModelTest - modelStats.Details = modelStats.Details[excess:] - } -} - -func (s *BoundedRequestStatistics) CountDetails() int { - total := 0 - for _, api := range s.apis { - for _, model := range api.Models { - total += len(model.Details) - } - } - return total -} - -func TestCompare_LeakVsFix(t *testing.T) { - t.Log("=== SIDE-BY-SIDE COMPARISON ===") - - unbounded := NewUnboundedRequestStatistics() - bounded := NewBoundedRequestStatistics() - - // Same workload - for i := 0; i < 50000; i++ { - apiKey := fmt.Sprintf("key-%d", i%10) - model := fmt.Sprintf("model-%d", i%5) - unbounded.Record(apiKey, model, 1500) - bounded.Record(apiKey, model, 1500) - } - - t.Logf("UNBOUNDED (leak): %d details stored", unbounded.CountDetails()) - t.Logf("BOUNDED (fixed): %d details stored", bounded.CountDetails()) - t.Logf("Memory saved: %dx reduction", unbounded.CountDetails()/bounded.CountDetails()) -} diff --git a/internal/memleak_repro_test.go b/internal/memleak_repro_test.go deleted file mode 100644 index 9ace27f5..00000000 --- a/internal/memleak_repro_test.go +++ /dev/null @@ -1,151 +0,0 @@ -// Package internal contains memory leak reproduction tests. -// Run with: go test -v -run TestMemoryLeak -memprofile=mem.prof ./internal/ -package internal - -import ( - "context" - "fmt" - "runtime" - "testing" - "time" - - "github.com/router-for-me/CLIProxyAPI/v6/internal/cache" - "github.com/router-for-me/CLIProxyAPI/v6/internal/usage" - coreusage "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/usage" -) - -func getMemStats() (allocMB, heapMB float64) { - var m runtime.MemStats - runtime.GC() - runtime.ReadMemStats(&m) - return float64(m.Alloc) / 1024 / 1024, float64(m.HeapAlloc) / 1024 / 1024 -} - -func TestMemoryLeak_UsageStats(t *testing.T) { - // This test simulates the usage statistics leak where Details grows unbounded - stats := usage.NewRequestStatistics() - - allocBefore, heapBefore := getMemStats() - t.Logf("Before: Alloc=%.2f MB, Heap=%.2f MB", allocBefore, heapBefore) - - // Simulate 10k requests (would happen over hours/days in production) - numRequests := 10000 - for i := 0; i < numRequests; i++ { - stats.Record(context.Background(), coreusage.Record{ - Provider: "test-provider", - Model: fmt.Sprintf("model-%d", i%10), // 10 different models - APIKey: fmt.Sprintf("api-key-%d", i%5), - RequestedAt: time.Now(), - Detail: coreusage.Detail{ - InputTokens: 1000, - OutputTokens: 500, - TotalTokens: 1500, - }, - }) - } - - allocAfter, heapAfter := getMemStats() - t.Logf("After %d requests: Alloc=%.2f MB, Heap=%.2f MB", numRequests, allocAfter, heapAfter) - t.Logf("Growth: Alloc=+%.2f MB, Heap=+%.2f MB", allocAfter-allocBefore, heapAfter-heapBefore) - - // Verify the cap is working - check snapshot - snapshot := stats.Snapshot() - for apiName, apiSnap := range snapshot.APIs { - for modelName, modelSnap := range apiSnap.Models { - if len(modelSnap.Details) > 1000 { - t.Errorf("LEAK: API %s Model %s has %d details (should be <= 1000)", - apiName, modelName, len(modelSnap.Details)) - } else { - t.Logf("OK: API %s Model %s has %d details (capped at 1000)", - apiName, modelName, len(modelSnap.Details)) - } - } - } -} - -func TestMemoryLeak_SignatureCache(t *testing.T) { - // This test simulates the signature cache leak where sessions accumulate - allocBefore, heapBefore := getMemStats() - t.Logf("Before: Alloc=%.2f MB, Heap=%.2f MB", allocBefore, heapBefore) - - // Simulate 1000 unique sessions (each with signatures) - numSessions := 1000 - sigText := string(make([]byte, 100)) // 100 byte signature text - sig := string(make([]byte, 200)) // 200 byte signature (> MinValidSignatureLen) - - for i := 0; i < numSessions; i++ { - sessionID := fmt.Sprintf("session-%d", i) - // Each session caches 50 signatures - for j := 0; j < 50; j++ { - text := fmt.Sprintf("%s-text-%d", sigText, j) - signature := fmt.Sprintf("%s-sig-%d", sig, j) - cache.CacheSignature(sessionID, text, signature) - } - } - - allocAfter, heapAfter := getMemStats() - t.Logf("After %d sessions x 50 sigs: Alloc=%.2f MB, Heap=%.2f MB", - numSessions, allocAfter, heapAfter) - t.Logf("Growth: Alloc=+%.2f MB, Heap=+%.2f MB", allocAfter-allocBefore, heapAfter-heapBefore) - - // Clear all and check memory drops - cache.ClearSignatureCache("") - runtime.GC() - - allocCleared, heapCleared := getMemStats() - t.Logf("After clear: Alloc=%.2f MB, Heap=%.2f MB", allocCleared, heapCleared) - t.Logf("Recovered: Alloc=%.2f MB, Heap=%.2f MB", - allocAfter-allocCleared, heapAfter-heapCleared) - - if allocCleared > allocBefore*1.5 { - t.Logf("WARNING: Memory not fully recovered after clear (may indicate leak)") - } -} - -func TestMemoryLeak_SimulateProductionLoad(t *testing.T) { - // Simulate realistic production load pattern over time - stats := usage.NewRequestStatistics() - - t.Log("=== Simulating production load pattern ===") - - // Phase 1: Ramp up - allocStart, _ := getMemStats() - t.Logf("Start: %.2f MB", allocStart) - - // Simulate 1 hour of traffic (compressed into fast iterations) - // Real: ~1000 req/min = 60k/hour - // Test: 60k requests - for hour := 0; hour < 3; hour++ { - for i := 0; i < 20000; i++ { - stats.Record(context.Background(), coreusage.Record{ - Provider: "antigravity", - Model: fmt.Sprintf("gemini-2.5-pro-%d", i%5), - APIKey: fmt.Sprintf("user-%d", i%100), - RequestedAt: time.Now(), - Detail: coreusage.Detail{ - InputTokens: int64(1000 + i%500), - OutputTokens: int64(200 + i%100), - TotalTokens: int64(1200 + i%600), - }, - }) - } - allocNow, _ := getMemStats() - t.Logf("Hour %d: %.2f MB (growth: +%.2f MB)", hour+1, allocNow, allocNow-allocStart) - } - - allocEnd, _ := getMemStats() - totalGrowth := allocEnd - allocStart - - // With the fix, growth should be bounded - // Without fix: would grow linearly with requests - // With fix: should plateau around 1000 details * num_models * detail_size - t.Logf("Total growth over 60k requests: %.2f MB", totalGrowth) - - // Rough estimate: 1000 details * 5 models * 100 APIs * ~200 bytes = ~100MB max - // Should be well under 50MB for this test - if totalGrowth > 100 { - t.Errorf("POTENTIAL LEAK: Growth of %.2f MB is too high for bounded storage", totalGrowth) - } else { - t.Logf("OK: Memory growth is bounded at %.2f MB", totalGrowth) - } -} From af6bdca14f492565bfe88aa2e46575bd40f664d4 Mon Sep 17 00:00:00 2001 From: Luis Pater Date: Fri, 9 Jan 2026 23:41:50 +0800 Subject: [PATCH 09/10] Fixed: #942 fix(executor): ignore non-SSE lines in OpenAI-compatible streams --- internal/runtime/executor/openai_compat_executor.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/internal/runtime/executor/openai_compat_executor.go b/internal/runtime/executor/openai_compat_executor.go index 60c80f9d..78b787dd 100644 --- a/internal/runtime/executor/openai_compat_executor.go +++ b/internal/runtime/executor/openai_compat_executor.go @@ -241,6 +241,11 @@ func (e *OpenAICompatExecutor) ExecuteStream(ctx context.Context, auth *cliproxy if len(line) == 0 { continue } + + if !bytes.HasPrefix(line, []byte("data:")) { + continue + } + // OpenAI-compatible streams are SSE: lines typically prefixed with "data: ". // Pass through translator; it yields one or more chunks for the target schema. chunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, bytes.Clone(line), ¶m) From 4d7f389b69776fa8ae754d2b0adc4c9167f63fd4 Mon Sep 17 00:00:00 2001 From: Luis Pater Date: Sat, 10 Jan 2026 01:01:09 +0800 Subject: [PATCH 10/10] Fixed: #941 fix(translator): ensure fallback to valid originalRequestRawJSON in response handling --- .../claude_openai-responses_response.go | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/internal/translator/claude/openai/responses/claude_openai-responses_response.go b/internal/translator/claude/openai/responses/claude_openai-responses_response.go index 354be56e..593ec287 100644 --- a/internal/translator/claude/openai/responses/claude_openai-responses_response.go +++ b/internal/translator/claude/openai/responses/claude_openai-responses_response.go @@ -40,6 +40,16 @@ type claudeToResponsesState struct { var dataTag = []byte("data:") +func pickRequestJSON(originalRequestRawJSON, requestRawJSON []byte) []byte { + if len(originalRequestRawJSON) > 0 && gjson.ValidBytes(originalRequestRawJSON) { + return originalRequestRawJSON + } + if len(requestRawJSON) > 0 && gjson.ValidBytes(requestRawJSON) { + return requestRawJSON + } + return nil +} + func emitEvent(event string, payload string) string { return fmt.Sprintf("event: %s\ndata: %s", event, payload) } @@ -279,8 +289,9 @@ func ConvertClaudeResponseToOpenAIResponses(ctx context.Context, modelName strin completed, _ = sjson.Set(completed, "response.created_at", st.CreatedAt) // Inject original request fields into response as per docs/response.completed.json - if requestRawJSON != nil { - req := gjson.ParseBytes(requestRawJSON) + reqBytes := pickRequestJSON(originalRequestRawJSON, requestRawJSON) + if len(reqBytes) > 0 { + req := gjson.ParseBytes(reqBytes) if v := req.Get("instructions"); v.Exists() { completed, _ = sjson.Set(completed, "response.instructions", v.String()) } @@ -549,8 +560,9 @@ func ConvertClaudeResponseToOpenAIResponsesNonStream(_ context.Context, _ string out, _ = sjson.Set(out, "created_at", createdAt) // Inject request echo fields as top-level (similar to streaming variant) - if requestRawJSON != nil { - req := gjson.ParseBytes(requestRawJSON) + reqBytes := pickRequestJSON(originalRequestRawJSON, requestRawJSON) + if len(reqBytes) > 0 { + req := gjson.ParseBytes(reqBytes) if v := req.Get("instructions"); v.Exists() { out, _ = sjson.Set(out, "instructions", v.String()) }