diff --git a/docker-build.sh b/docker-build.sh index fc846703..944f3e78 100644 --- a/docker-build.sh +++ b/docker-build.sh @@ -152,16 +152,16 @@ case "$choice" in # Build and start the services with a local-only image tag export CLI_PROXY_IMAGE="cli-proxy-api:local" - if [[ "${WITH_USAGE}" == "true" ]]; then - export_stats - fi - echo "Building the Docker image..." docker compose build \ --build-arg VERSION="${VERSION}" \ --build-arg COMMIT="${COMMIT}" \ --build-arg BUILD_DATE="${BUILD_DATE}" + if [[ "${WITH_USAGE}" == "true" ]]; then + export_stats + fi + echo "Starting the services..." docker compose up -d --remove-orphans --pull never diff --git a/internal/api/handlers/management/handler.go b/internal/api/handlers/management/handler.go index d3ccbda6..613c9841 100644 --- a/internal/api/handlers/management/handler.go +++ b/internal/api/handlers/management/handler.go @@ -24,8 +24,15 @@ import ( type attemptInfo struct { count int blockedUntil time.Time + lastActivity time.Time // track last activity for cleanup } +// attemptCleanupInterval controls how often stale IP entries are purged +const attemptCleanupInterval = 1 * time.Hour + +// attemptMaxIdleTime controls how long an IP can be idle before cleanup +const attemptMaxIdleTime = 2 * time.Hour + // Handler aggregates config reference, persistence path and helpers. type Handler struct { cfg *config.Config @@ -47,7 +54,7 @@ func NewHandler(cfg *config.Config, configFilePath string, manager *coreauth.Man envSecret, _ := os.LookupEnv("MANAGEMENT_PASSWORD") envSecret = strings.TrimSpace(envSecret) - return &Handler{ + h := &Handler{ cfg: cfg, configFilePath: configFilePath, failedAttempts: make(map[string]*attemptInfo), @@ -57,6 +64,38 @@ func NewHandler(cfg *config.Config, configFilePath string, manager *coreauth.Man allowRemoteOverride: envSecret != "", envSecret: envSecret, } + h.startAttemptCleanup() + return h +} + +// startAttemptCleanup launches a background goroutine that periodically +// removes stale IP entries from failedAttempts to prevent memory leaks. +func (h *Handler) startAttemptCleanup() { + go func() { + ticker := time.NewTicker(attemptCleanupInterval) + defer ticker.Stop() + for range ticker.C { + h.purgeStaleAttempts() + } + }() +} + +// purgeStaleAttempts removes IP entries that have been idle beyond attemptMaxIdleTime +// and whose ban (if any) has expired. +func (h *Handler) purgeStaleAttempts() { + now := time.Now() + h.attemptsMu.Lock() + defer h.attemptsMu.Unlock() + for ip, ai := range h.failedAttempts { + // Skip if still banned + if !ai.blockedUntil.IsZero() && now.Before(ai.blockedUntil) { + continue + } + // Remove if idle too long + if now.Sub(ai.lastActivity) > attemptMaxIdleTime { + delete(h.failedAttempts, ip) + } + } } // NewHandler creates a new management handler instance. @@ -149,6 +188,7 @@ func (h *Handler) Middleware() gin.HandlerFunc { h.failedAttempts[clientIP] = aip } aip.count++ + aip.lastActivity = time.Now() if aip.count >= maxFailures { aip.blockedUntil = time.Now().Add(banDuration) aip.count = 0 diff --git a/internal/api/modules/amp/response_rewriter.go b/internal/api/modules/amp/response_rewriter.go index e25e7cc9..f5d6b667 100644 --- a/internal/api/modules/amp/response_rewriter.go +++ b/internal/api/modules/amp/response_rewriter.go @@ -127,8 +127,6 @@ var modelFieldPaths = []string{"model", "modelVersion", "response.modelVersion", // rewriteModelInResponse replaces all occurrences of the mapped model with the original model in JSON // It also suppresses "thinking" blocks if "tool_use" is present to ensure Amp client compatibility func (rw *ResponseRewriter) rewriteModelInResponse(data []byte) []byte { - // 1. Amp Compatibility: Suppress thinking blocks if tool use is detected - // The Amp client struggles when both thinking and tool_use blocks are present // 1. Amp Compatibility: Suppress thinking blocks if tool use is detected // The Amp client struggles when both thinking and tool_use blocks are present if gjson.GetBytes(data, `content.#(type=="tool_use")`).Exists() { diff --git a/internal/cache/signature_cache.go b/internal/cache/signature_cache.go index c1326629..d4a864e0 100644 --- a/internal/cache/signature_cache.go +++ b/internal/cache/signature_cache.go @@ -26,11 +26,17 @@ const ( // MinValidSignatureLen is the minimum length for a signature to be considered valid MinValidSignatureLen = 50 + + // SessionCleanupInterval controls how often stale sessions are purged + SessionCleanupInterval = 10 * time.Minute ) // signatureCache stores signatures by sessionId -> textHash -> SignatureEntry var signatureCache sync.Map +// sessionCleanupOnce ensures the background cleanup goroutine starts only once +var sessionCleanupOnce sync.Once + // sessionCache is the inner map type type sessionCache struct { mu sync.RWMutex @@ -45,6 +51,9 @@ func hashText(text string) string { // getOrCreateSession gets or creates a session cache func getOrCreateSession(sessionID string) *sessionCache { + // Start background cleanup on first access + sessionCleanupOnce.Do(startSessionCleanup) + if val, ok := signatureCache.Load(sessionID); ok { return val.(*sessionCache) } @@ -53,6 +62,40 @@ func getOrCreateSession(sessionID string) *sessionCache { return actual.(*sessionCache) } +// startSessionCleanup launches a background goroutine that periodically +// removes sessions where all entries have expired. +func startSessionCleanup() { + go func() { + ticker := time.NewTicker(SessionCleanupInterval) + defer ticker.Stop() + for range ticker.C { + purgeExpiredSessions() + } + }() +} + +// purgeExpiredSessions removes sessions with no valid (non-expired) entries. +func purgeExpiredSessions() { + now := time.Now() + signatureCache.Range(func(key, value any) bool { + sc := value.(*sessionCache) + sc.mu.Lock() + // Remove expired entries + for k, entry := range sc.entries { + if now.Sub(entry.Timestamp) > SignatureCacheTTL { + delete(sc.entries, k) + } + } + isEmpty := len(sc.entries) == 0 + sc.mu.Unlock() + // Remove session if empty + if isEmpty { + signatureCache.Delete(key) + } + return true + }) +} + // CacheSignature stores a thinking signature for a given session and text. // Used for Claude models that require signed thinking blocks in multi-turn conversations. func CacheSignature(sessionID, text, signature string) { diff --git a/internal/runtime/executor/antigravity_executor.go b/internal/runtime/executor/antigravity_executor.go index 89bdbe49..47b2ac48 100644 --- a/internal/runtime/executor/antigravity_executor.go +++ b/internal/runtime/executor/antigravity_executor.go @@ -10,6 +10,7 @@ import ( "crypto/sha256" "encoding/binary" "encoding/json" + "errors" "fmt" "io" "math/rand" @@ -125,6 +126,9 @@ func (e *AntigravityExecutor) Execute(ctx context.Context, auth *cliproxyauth.Au httpResp, errDo := httpClient.Do(httpReq) if errDo != nil { recordAPIResponseError(ctx, e.cfg, errDo) + if errors.Is(errDo, context.Canceled) || errors.Is(errDo, context.DeadlineExceeded) { + return resp, errDo + } lastStatus = 0 lastBody = nil lastErr = errDo @@ -237,6 +241,9 @@ func (e *AntigravityExecutor) executeClaudeNonStream(ctx context.Context, auth * httpResp, errDo := httpClient.Do(httpReq) if errDo != nil { recordAPIResponseError(ctx, e.cfg, errDo) + if errors.Is(errDo, context.Canceled) || errors.Is(errDo, context.DeadlineExceeded) { + return resp, errDo + } lastStatus = 0 lastBody = nil lastErr = errDo @@ -255,6 +262,14 @@ func (e *AntigravityExecutor) executeClaudeNonStream(ctx context.Context, auth * } if errRead != nil { recordAPIResponseError(ctx, e.cfg, errRead) + if errors.Is(errRead, context.Canceled) || errors.Is(errRead, context.DeadlineExceeded) { + err = errRead + return resp, err + } + if errCtx := ctx.Err(); errCtx != nil { + err = errCtx + return resp, err + } lastStatus = 0 lastBody = nil lastErr = errRead @@ -590,6 +605,9 @@ func (e *AntigravityExecutor) ExecuteStream(ctx context.Context, auth *cliproxya httpResp, errDo := httpClient.Do(httpReq) if errDo != nil { recordAPIResponseError(ctx, e.cfg, errDo) + if errors.Is(errDo, context.Canceled) || errors.Is(errDo, context.DeadlineExceeded) { + return nil, errDo + } lastStatus = 0 lastBody = nil lastErr = errDo @@ -608,6 +626,14 @@ func (e *AntigravityExecutor) ExecuteStream(ctx context.Context, auth *cliproxya } if errRead != nil { recordAPIResponseError(ctx, e.cfg, errRead) + if errors.Is(errRead, context.Canceled) || errors.Is(errRead, context.DeadlineExceeded) { + err = errRead + return nil, err + } + if errCtx := ctx.Err(); errCtx != nil { + err = errCtx + return nil, err + } lastStatus = 0 lastBody = nil lastErr = errRead @@ -796,6 +822,9 @@ func (e *AntigravityExecutor) CountTokens(ctx context.Context, auth *cliproxyaut httpResp, errDo := httpClient.Do(httpReq) if errDo != nil { recordAPIResponseError(ctx, e.cfg, errDo) + if errors.Is(errDo, context.Canceled) || errors.Is(errDo, context.DeadlineExceeded) { + return cliproxyexecutor.Response{}, errDo + } lastStatus = 0 lastBody = nil lastErr = errDo @@ -884,6 +913,9 @@ func FetchAntigravityModels(ctx context.Context, auth *cliproxyauth.Auth, cfg *c httpResp, errDo := httpClient.Do(httpReq) if errDo != nil { + if errors.Is(errDo, context.Canceled) || errors.Is(errDo, context.DeadlineExceeded) { + return nil + } if idx+1 < len(baseURLs) { log.Debugf("antigravity executor: models request error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1]) continue diff --git a/internal/runtime/executor/cache_helpers.go b/internal/runtime/executor/cache_helpers.go index 4b553662..1e32f43a 100644 --- a/internal/runtime/executor/cache_helpers.go +++ b/internal/runtime/executor/cache_helpers.go @@ -10,29 +10,69 @@ type codexCache struct { Expire time.Time } +// codexCacheMap stores prompt cache IDs keyed by model+user_id. +// Protected by codexCacheMu. Entries expire after 1 hour. var ( - codexCacheMap = map[string]codexCache{} - codexCacheMutex sync.RWMutex + codexCacheMap = make(map[string]codexCache) + codexCacheMu sync.RWMutex ) -// getCodexCache safely retrieves a cache entry +// codexCacheCleanupInterval controls how often expired entries are purged. +const codexCacheCleanupInterval = 15 * time.Minute + +// codexCacheCleanupOnce ensures the background cleanup goroutine starts only once. +var codexCacheCleanupOnce sync.Once + +// startCodexCacheCleanup launches a background goroutine that periodically +// removes expired entries from codexCacheMap to prevent memory leaks. +func startCodexCacheCleanup() { + go func() { + ticker := time.NewTicker(codexCacheCleanupInterval) + defer ticker.Stop() + + for range ticker.C { + purgeExpiredCodexCache() + } + }() +} + +// purgeExpiredCodexCache removes entries that have expired. +func purgeExpiredCodexCache() { + now := time.Now() + + codexCacheMu.Lock() + defer codexCacheMu.Unlock() + + for key, cache := range codexCacheMap { + if cache.Expire.Before(now) { + delete(codexCacheMap, key) + } + } +} + +// getCodexCache retrieves a cached entry, returning ok=false if not found or expired. func getCodexCache(key string) (codexCache, bool) { - codexCacheMutex.RLock() - defer codexCacheMutex.RUnlock() + codexCacheCleanupOnce.Do(startCodexCacheCleanup) + codexCacheMu.RLock() cache, ok := codexCacheMap[key] - return cache, ok + codexCacheMu.RUnlock() + if !ok || cache.Expire.Before(time.Now()) { + return codexCache{}, false + } + return cache, true } -// setCodexCache safely sets a cache entry +// setCodexCache stores a cache entry. func setCodexCache(key string, cache codexCache) { - codexCacheMutex.Lock() - defer codexCacheMutex.Unlock() + codexCacheCleanupOnce.Do(startCodexCacheCleanup) + codexCacheMu.Lock() codexCacheMap[key] = cache + codexCacheMu.Unlock() } -// deleteCodexCache safely deletes a cache entry +// deleteCodexCache deletes a cache entry. func deleteCodexCache(key string) { - codexCacheMutex.Lock() - defer codexCacheMutex.Unlock() + codexCacheMu.Lock() delete(codexCacheMap, key) + codexCacheMu.Unlock() } diff --git a/internal/runtime/executor/claude_executor.go b/internal/runtime/executor/claude_executor.go index 7be4f41b..d426326f 100644 --- a/internal/runtime/executor/claude_executor.go +++ b/internal/runtime/executor/claude_executor.go @@ -35,6 +35,8 @@ type ClaudeExecutor struct { cfg *config.Config } +const claudeToolPrefix = "proxy_" + func NewClaudeExecutor(cfg *config.Config) *ClaudeExecutor { return &ClaudeExecutor{cfg: cfg} } func (e *ClaudeExecutor) Identifier() string { return "claude" } @@ -81,9 +83,14 @@ func (e *ClaudeExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r // Extract betas from body and convert to header var extraBetas []string extraBetas, body = extractAndRemoveBetas(body) + bodyForTranslation := body + bodyForUpstream := body + if isClaudeOAuthToken(apiKey) { + bodyForUpstream = applyClaudeToolPrefix(body, claudeToolPrefix) + } url := fmt.Sprintf("%s/v1/messages?beta=true", baseURL) - httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body)) + httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(bodyForUpstream)) if err != nil { return resp, err } @@ -98,7 +105,7 @@ func (e *ClaudeExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r URL: url, Method: http.MethodPost, Headers: httpReq.Header.Clone(), - Body: body, + Body: bodyForUpstream, Provider: e.Identifier(), AuthID: authID, AuthLabel: authLabel, @@ -152,8 +159,20 @@ func (e *ClaudeExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r } else { reporter.publish(ctx, parseClaudeUsage(data)) } + if isClaudeOAuthToken(apiKey) { + data = stripClaudeToolPrefixFromResponse(data, claudeToolPrefix) + } var param any - out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, data, ¶m) + out := sdktranslator.TranslateNonStream( + ctx, + to, + from, + req.Model, + bytes.Clone(opts.OriginalRequest), + bodyForTranslation, + data, + ¶m, + ) resp = cliproxyexecutor.Response{Payload: []byte(out)} return resp, nil } @@ -193,9 +212,14 @@ func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A // Extract betas from body and convert to header var extraBetas []string extraBetas, body = extractAndRemoveBetas(body) + bodyForTranslation := body + bodyForUpstream := body + if isClaudeOAuthToken(apiKey) { + bodyForUpstream = applyClaudeToolPrefix(body, claudeToolPrefix) + } url := fmt.Sprintf("%s/v1/messages?beta=true", baseURL) - httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body)) + httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(bodyForUpstream)) if err != nil { return nil, err } @@ -210,7 +234,7 @@ func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A URL: url, Method: http.MethodPost, Headers: httpReq.Header.Clone(), - Body: body, + Body: bodyForUpstream, Provider: e.Identifier(), AuthID: authID, AuthLabel: authLabel, @@ -263,6 +287,9 @@ func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A if detail, ok := parseClaudeStreamUsage(line); ok { reporter.publish(ctx, detail) } + if isClaudeOAuthToken(apiKey) { + line = stripClaudeToolPrefixFromStreamLine(line, claudeToolPrefix) + } // Forward the line as-is to preserve SSE format cloned := make([]byte, len(line)+1) copy(cloned, line) @@ -287,7 +314,19 @@ func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A if detail, ok := parseClaudeStreamUsage(line); ok { reporter.publish(ctx, detail) } - chunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, bytes.Clone(line), ¶m) + if isClaudeOAuthToken(apiKey) { + line = stripClaudeToolPrefixFromStreamLine(line, claudeToolPrefix) + } + chunks := sdktranslator.TranslateStream( + ctx, + to, + from, + req.Model, + bytes.Clone(opts.OriginalRequest), + bodyForTranslation, + bytes.Clone(line), + ¶m, + ) for i := range chunks { out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])} } @@ -326,6 +365,9 @@ func (e *ClaudeExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Aut // Extract betas from body and convert to header (for count_tokens too) var extraBetas []string extraBetas, body = extractAndRemoveBetas(body) + if isClaudeOAuthToken(apiKey) { + body = applyClaudeToolPrefix(body, claudeToolPrefix) + } url := fmt.Sprintf("%s/v1/messages/count_tokens?beta=true", baseURL) httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body)) @@ -770,3 +812,107 @@ func checkSystemInstructions(payload []byte) []byte { } return payload } + +func isClaudeOAuthToken(apiKey string) bool { + return strings.Contains(apiKey, "sk-ant-oat") +} + +func applyClaudeToolPrefix(body []byte, prefix string) []byte { + if prefix == "" { + return body + } + + if tools := gjson.GetBytes(body, "tools"); tools.Exists() && tools.IsArray() { + tools.ForEach(func(index, tool gjson.Result) bool { + name := tool.Get("name").String() + if name == "" || strings.HasPrefix(name, prefix) { + return true + } + path := fmt.Sprintf("tools.%d.name", index.Int()) + body, _ = sjson.SetBytes(body, path, prefix+name) + return true + }) + } + + if gjson.GetBytes(body, "tool_choice.type").String() == "tool" { + name := gjson.GetBytes(body, "tool_choice.name").String() + if name != "" && !strings.HasPrefix(name, prefix) { + body, _ = sjson.SetBytes(body, "tool_choice.name", prefix+name) + } + } + + if messages := gjson.GetBytes(body, "messages"); messages.Exists() && messages.IsArray() { + messages.ForEach(func(msgIndex, msg gjson.Result) bool { + content := msg.Get("content") + if !content.Exists() || !content.IsArray() { + return true + } + content.ForEach(func(contentIndex, part gjson.Result) bool { + if part.Get("type").String() != "tool_use" { + return true + } + name := part.Get("name").String() + if name == "" || strings.HasPrefix(name, prefix) { + return true + } + path := fmt.Sprintf("messages.%d.content.%d.name", msgIndex.Int(), contentIndex.Int()) + body, _ = sjson.SetBytes(body, path, prefix+name) + return true + }) + return true + }) + } + + return body +} + +func stripClaudeToolPrefixFromResponse(body []byte, prefix string) []byte { + if prefix == "" { + return body + } + content := gjson.GetBytes(body, "content") + if !content.Exists() || !content.IsArray() { + return body + } + content.ForEach(func(index, part gjson.Result) bool { + if part.Get("type").String() != "tool_use" { + return true + } + name := part.Get("name").String() + if !strings.HasPrefix(name, prefix) { + return true + } + path := fmt.Sprintf("content.%d.name", index.Int()) + body, _ = sjson.SetBytes(body, path, strings.TrimPrefix(name, prefix)) + return true + }) + return body +} + +func stripClaudeToolPrefixFromStreamLine(line []byte, prefix string) []byte { + if prefix == "" { + return line + } + payload := jsonPayload(line) + if len(payload) == 0 || !gjson.ValidBytes(payload) { + return line + } + contentBlock := gjson.GetBytes(payload, "content_block") + if !contentBlock.Exists() || contentBlock.Get("type").String() != "tool_use" { + return line + } + name := contentBlock.Get("name").String() + if !strings.HasPrefix(name, prefix) { + return line + } + updated, err := sjson.SetBytes(payload, "content_block.name", strings.TrimPrefix(name, prefix)) + if err != nil { + return line + } + + trimmed := bytes.TrimSpace(line) + if bytes.HasPrefix(trimmed, []byte("data:")) { + return append([]byte("data: "), updated...) + } + return updated +} diff --git a/internal/runtime/executor/claude_executor_test.go b/internal/runtime/executor/claude_executor_test.go new file mode 100644 index 00000000..05f5b60c --- /dev/null +++ b/internal/runtime/executor/claude_executor_test.go @@ -0,0 +1,51 @@ +package executor + +import ( + "bytes" + "testing" + + "github.com/tidwall/gjson" +) + +func TestApplyClaudeToolPrefix(t *testing.T) { + input := []byte(`{"tools":[{"name":"alpha"},{"name":"proxy_bravo"}],"tool_choice":{"type":"tool","name":"charlie"},"messages":[{"role":"assistant","content":[{"type":"tool_use","name":"delta","id":"t1","input":{}}]}]}`) + out := applyClaudeToolPrefix(input, "proxy_") + + if got := gjson.GetBytes(out, "tools.0.name").String(); got != "proxy_alpha" { + t.Fatalf("tools.0.name = %q, want %q", got, "proxy_alpha") + } + if got := gjson.GetBytes(out, "tools.1.name").String(); got != "proxy_bravo" { + t.Fatalf("tools.1.name = %q, want %q", got, "proxy_bravo") + } + if got := gjson.GetBytes(out, "tool_choice.name").String(); got != "proxy_charlie" { + t.Fatalf("tool_choice.name = %q, want %q", got, "proxy_charlie") + } + if got := gjson.GetBytes(out, "messages.0.content.0.name").String(); got != "proxy_delta" { + t.Fatalf("messages.0.content.0.name = %q, want %q", got, "proxy_delta") + } +} + +func TestStripClaudeToolPrefixFromResponse(t *testing.T) { + input := []byte(`{"content":[{"type":"tool_use","name":"proxy_alpha","id":"t1","input":{}},{"type":"tool_use","name":"bravo","id":"t2","input":{}}]}`) + out := stripClaudeToolPrefixFromResponse(input, "proxy_") + + if got := gjson.GetBytes(out, "content.0.name").String(); got != "alpha" { + t.Fatalf("content.0.name = %q, want %q", got, "alpha") + } + if got := gjson.GetBytes(out, "content.1.name").String(); got != "bravo" { + t.Fatalf("content.1.name = %q, want %q", got, "bravo") + } +} + +func TestStripClaudeToolPrefixFromStreamLine(t *testing.T) { + line := []byte(`data: {"type":"content_block_start","content_block":{"type":"tool_use","name":"proxy_alpha","id":"t1"},"index":0}`) + out := stripClaudeToolPrefixFromStreamLine(line, "proxy_") + + payload := bytes.TrimSpace(out) + if bytes.HasPrefix(payload, []byte("data:")) { + payload = bytes.TrimSpace(payload[len("data:"):]) + } + if got := gjson.GetBytes(payload, "content_block.name").String(); got != "alpha" { + t.Fatalf("content_block.name = %q, want %q", got, "alpha") + } +} diff --git a/internal/runtime/executor/codex_executor.go b/internal/runtime/executor/codex_executor.go index fec32f29..8e7c8df9 100644 --- a/internal/runtime/executor/codex_executor.go +++ b/internal/runtime/executor/codex_executor.go @@ -457,9 +457,9 @@ func (e *CodexExecutor) cacheHelper(ctx context.Context, from sdktranslator.Form if from == "claude" { userIDResult := gjson.GetBytes(req.Payload, "metadata.user_id") if userIDResult.Exists() { - var hasKey bool key := fmt.Sprintf("%s-%s", req.Model, userIDResult.String()) - if cache, hasKey = getCodexCache(key); !hasKey || cache.Expire.Before(time.Now()) { + var ok bool + if cache, ok = getCodexCache(key); !ok { cache = codexCache{ ID: uuid.New().String(), Expire: time.Now().Add(1 * time.Hour), diff --git a/internal/runtime/executor/logging_helpers.go b/internal/runtime/executor/logging_helpers.go index 26931f53..90532772 100644 --- a/internal/runtime/executor/logging_helpers.go +++ b/internal/runtime/executor/logging_helpers.go @@ -304,11 +304,7 @@ func formatAuthInfo(info upstreamRequestLog) string { parts = append(parts, "type=api_key") } case "oauth": - if authValue != "" { - parts = append(parts, fmt.Sprintf("type=oauth account=%s", authValue)) - } else { - parts = append(parts, "type=oauth") - } + parts = append(parts, "type=oauth") default: if authType != "" { if authValue != "" { diff --git a/internal/runtime/executor/openai_compat_executor.go b/internal/runtime/executor/openai_compat_executor.go index 60c80f9d..78b787dd 100644 --- a/internal/runtime/executor/openai_compat_executor.go +++ b/internal/runtime/executor/openai_compat_executor.go @@ -241,6 +241,11 @@ func (e *OpenAICompatExecutor) ExecuteStream(ctx context.Context, auth *cliproxy if len(line) == 0 { continue } + + if !bytes.HasPrefix(line, []byte("data:")) { + continue + } + // OpenAI-compatible streams are SSE: lines typically prefixed with "data: ". // Pass through translator; it yields one or more chunks for the target schema. chunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, bytes.Clone(line), ¶m) diff --git a/internal/translator/antigravity/claude/antigravity_claude_request.go b/internal/translator/antigravity/claude/antigravity_claude_request.go index 2287bccc..d5064c3c 100644 --- a/internal/translator/antigravity/claude/antigravity_claude_request.go +++ b/internal/translator/antigravity/claude/antigravity_claude_request.go @@ -14,7 +14,6 @@ import ( "github.com/router-for-me/CLIProxyAPI/v6/internal/cache" "github.com/router-for-me/CLIProxyAPI/v6/internal/translator/gemini/common" "github.com/router-for-me/CLIProxyAPI/v6/internal/util" - log "github.com/sirupsen/logrus" "github.com/tidwall/gjson" "github.com/tidwall/sjson" ) @@ -136,14 +135,14 @@ func ConvertClaudeRequestToAntigravity(modelName string, inputRawJSON []byte, _ if sessionID != "" && thinkingText != "" { if cachedSig := cache.GetCachedSignature(sessionID, thinkingText); cachedSig != "" { signature = cachedSig - log.Debugf("Using cached signature for thinking block") + // log.Debugf("Using cached signature for thinking block") } } // Fallback to client signature only if cache miss and client signature is valid if signature == "" && cache.HasValidSignature(clientSignature) { signature = clientSignature - log.Debugf("Using client-provided signature for thinking block") + // log.Debugf("Using client-provided signature for thinking block") } // Store for subsequent tool_use in the same message @@ -158,8 +157,7 @@ func ConvertClaudeRequestToAntigravity(modelName string, inputRawJSON []byte, _ // Claude requires assistant messages to start with thinking blocks when thinking is enabled // Converting to text would break this requirement if isUnsigned { - // TypeScript plugin approach: drop unsigned thinking blocks entirely - log.Debugf("Dropping unsigned thinking block (no valid signature)") + // log.Debugf("Dropping unsigned thinking block (no valid signature)") continue } @@ -183,7 +181,6 @@ func ConvertClaudeRequestToAntigravity(modelName string, inputRawJSON []byte, _ } else if contentTypeResult.Type == gjson.String && contentTypeResult.String() == "tool_use" { // NOTE: Do NOT inject dummy thinking blocks here. // Antigravity API validates signatures, so dummy values are rejected. - // The TypeScript plugin removes unsigned thinking blocks instead of injecting dummies. functionName := contentResult.Get("name").String() argsResult := contentResult.Get("input") diff --git a/internal/translator/antigravity/claude/antigravity_claude_response.go b/internal/translator/antigravity/claude/antigravity_claude_response.go index 875e54a7..1672a835 100644 --- a/internal/translator/antigravity/claude/antigravity_claude_response.go +++ b/internal/translator/antigravity/claude/antigravity_claude_response.go @@ -136,11 +136,11 @@ func ConvertAntigravityResponseToClaude(_ context.Context, _ string, originalReq // Process thinking content (internal reasoning) if partResult.Get("thought").Bool() { if thoughtSignature := partResult.Get("thoughtSignature"); thoughtSignature.Exists() && thoughtSignature.String() != "" { - log.Debug("Branch: signature_delta") + // log.Debug("Branch: signature_delta") if params.SessionID != "" && params.CurrentThinkingText.Len() > 0 { cache.CacheSignature(params.SessionID, params.CurrentThinkingText.String(), thoughtSignature.String()) - log.Debugf("Cached signature for thinking block (sessionID=%s, textLen=%d)", params.SessionID, params.CurrentThinkingText.Len()) + // log.Debugf("Cached signature for thinking block (sessionID=%s, textLen=%d)", params.SessionID, params.CurrentThinkingText.Len()) params.CurrentThinkingText.Reset() } diff --git a/internal/translator/claude/openai/responses/claude_openai-responses_response.go b/internal/translator/claude/openai/responses/claude_openai-responses_response.go index 354be56e..593ec287 100644 --- a/internal/translator/claude/openai/responses/claude_openai-responses_response.go +++ b/internal/translator/claude/openai/responses/claude_openai-responses_response.go @@ -40,6 +40,16 @@ type claudeToResponsesState struct { var dataTag = []byte("data:") +func pickRequestJSON(originalRequestRawJSON, requestRawJSON []byte) []byte { + if len(originalRequestRawJSON) > 0 && gjson.ValidBytes(originalRequestRawJSON) { + return originalRequestRawJSON + } + if len(requestRawJSON) > 0 && gjson.ValidBytes(requestRawJSON) { + return requestRawJSON + } + return nil +} + func emitEvent(event string, payload string) string { return fmt.Sprintf("event: %s\ndata: %s", event, payload) } @@ -279,8 +289,9 @@ func ConvertClaudeResponseToOpenAIResponses(ctx context.Context, modelName strin completed, _ = sjson.Set(completed, "response.created_at", st.CreatedAt) // Inject original request fields into response as per docs/response.completed.json - if requestRawJSON != nil { - req := gjson.ParseBytes(requestRawJSON) + reqBytes := pickRequestJSON(originalRequestRawJSON, requestRawJSON) + if len(reqBytes) > 0 { + req := gjson.ParseBytes(reqBytes) if v := req.Get("instructions"); v.Exists() { completed, _ = sjson.Set(completed, "response.instructions", v.String()) } @@ -549,8 +560,9 @@ func ConvertClaudeResponseToOpenAIResponsesNonStream(_ context.Context, _ string out, _ = sjson.Set(out, "created_at", createdAt) // Inject request echo fields as top-level (similar to streaming variant) - if requestRawJSON != nil { - req := gjson.ParseBytes(requestRawJSON) + reqBytes := pickRequestJSON(originalRequestRawJSON, requestRawJSON) + if len(reqBytes) > 0 { + req := gjson.ParseBytes(reqBytes) if v := req.Get("instructions"); v.Exists() { out, _ = sjson.Set(out, "instructions", v.String()) } diff --git a/internal/usage/logger_plugin.go b/internal/usage/logger_plugin.go index e4371e8d..38177d7d 100644 --- a/internal/usage/logger_plugin.go +++ b/internal/usage/logger_plugin.go @@ -87,6 +87,10 @@ type modelStats struct { Details []RequestDetail } +// maxDetailsPerModel limits the number of request details retained per model +// to prevent unbounded memory growth. Oldest entries are dropped when exceeded. +const maxDetailsPerModel = 1000 + // RequestDetail stores the timestamp and token usage for a single request. type RequestDetail struct { Timestamp time.Time `json:"timestamp"` @@ -221,6 +225,11 @@ func (s *RequestStatistics) updateAPIStats(stats *apiStats, model string, detail modelStatsValue.TotalRequests++ modelStatsValue.TotalTokens += detail.Tokens.TotalTokens modelStatsValue.Details = append(modelStatsValue.Details, detail) + // Prevent unbounded growth by dropping oldest entries when limit exceeded + if len(modelStatsValue.Details) > maxDetailsPerModel { + excess := len(modelStatsValue.Details) - maxDetailsPerModel + modelStatsValue.Details = modelStatsValue.Details[excess:] + } } // Snapshot returns a copy of the aggregated metrics for external consumption. diff --git a/sdk/cliproxy/auth/conductor.go b/sdk/cliproxy/auth/conductor.go index 52da9a44..bcba1bb5 100644 --- a/sdk/cliproxy/auth/conductor.go +++ b/sdk/cliproxy/auth/conductor.go @@ -1611,7 +1611,6 @@ func formatOauthIdentity(auth *Auth, provider string, accountInfo string) string if auth == nil { return "" } - authIndex := auth.EnsureIndex() // Prefer the auth's provider when available. providerName := strings.TrimSpace(auth.Provider) if providerName == "" { @@ -1633,16 +1632,10 @@ func formatOauthIdentity(auth *Auth, provider string, accountInfo string) string if authFile != "" { parts = append(parts, "auth_file="+authFile) } - if authIndex != "" { - parts = append(parts, "auth_index="+authIndex) - } if len(parts) == 0 { return accountInfo } - if accountInfo == "" { - return strings.Join(parts, " ") - } - return strings.Join(parts, " ") + " account=" + strconv.Quote(accountInfo) + return strings.Join(parts, " ") } // InjectCredentials delegates per-provider HTTP request preparation when supported.