From 511b8a992e88a2ffefc806606023dc310a56b808 Mon Sep 17 00:00:00 2001 From: VooDisss Date: Fri, 27 Mar 2026 17:49:29 +0200 Subject: [PATCH 1/8] fix(codex): restore prompt cache continuity for Codex requests Prompt caching on Codex was not reliably reusable through the proxy because repeated chat-completions requests could reach the upstream without the same continuity envelope. In practice this showed up most clearly with OpenCode, where cache reads worked in the reference client but not through CLIProxyAPI, although the root cause is broader than OpenCode itself. The proxy was breaking continuity in several ways: executor-layer Codex request preparation stripped prompt_cache_retention, chat-completions translation did not preserve that field, continuity headers used a different shape than the working client behavior, and OpenAI-style Codex requests could be sent without a stable prompt_cache_key. When that happened, session_id fell back to a fresh random value per request, so upstream Codex treated repeated requests as unrelated turns instead of as part of the same cacheable context. This change fixes that by preserving caller-provided prompt_cache_retention on Codex execution paths, preserving prompt_cache_retention when translating OpenAI chat-completions requests to Codex, aligning Codex continuity headers to session_id, and introducing an explicit Codex continuity policy that derives a stable continuity key from the best available signal. The resolution order prefers an explicit prompt_cache_key, then execution session metadata, then an explicit idempotency key, then stable request-affinity metadata, then a stable client-principal hash, and finally a stable auth-ID hash when no better continuity signal exists. The same continuity key is applied to both prompt_cache_key in the request body and session_id in the request headers so repeated requests reuse the same upstream cache/session identity. The auth manager also keeps auth selection sticky for repeated request sequences, preventing otherwise-equivalent Codex requests from drifting across different upstream auth contexts and accidentally breaking cache reuse. To keep the implementation maintainable, the continuity resolution and diagnostics are centralized in a dedicated Codex continuity helper instead of being scattered across executor flow code. Regression coverage now verifies retention preservation, continuity-key precedence, stable auth-ID fallback, websocket parity, translator preservation, and auth-affinity behavior. Manual validation confirmed prompt cache reads now occur through CLIProxyAPI when using Codex via OpenCode, and the fix should also benefit other clients that rely on stable repeated Codex request continuity. --- internal/runtime/executor/codex_continuity.go | 153 ++++++++++++++++++ internal/runtime/executor/codex_executor.go | 35 ++-- .../executor/codex_executor_cache_test.go | 101 +++++++++++- .../executor/codex_websockets_executor.go | 10 +- .../codex_websockets_executor_test.go | 25 +++ .../chat-completions/codex_openai_request.go | 3 + .../codex_openai_request_test.go | 16 ++ sdk/api/handlers/handlers.go | 14 +- sdk/cliproxy/auth/conductor.go | 111 ++++++++++++- sdk/cliproxy/auth/conductor_affinity_test.go | 85 ++++++++++ 10 files changed, 516 insertions(+), 37 deletions(-) create mode 100644 internal/runtime/executor/codex_continuity.go create mode 100644 sdk/cliproxy/auth/conductor_affinity_test.go diff --git a/internal/runtime/executor/codex_continuity.go b/internal/runtime/executor/codex_continuity.go new file mode 100644 index 00000000..e7d4508f --- /dev/null +++ b/internal/runtime/executor/codex_continuity.go @@ -0,0 +1,153 @@ +package executor + +import ( + "context" + "fmt" + "net/http" + "strings" + + "github.com/google/uuid" + cliproxyauth "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth" + cliproxyexecutor "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/executor" + log "github.com/sirupsen/logrus" + "github.com/tidwall/gjson" + "github.com/tidwall/sjson" +) + +const codexAuthAffinityMetadataKey = "auth_affinity_key" + +type codexContinuity struct { + Key string + Source string +} + +func resolveCodexContinuity(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) codexContinuity { + if promptCacheKey := strings.TrimSpace(gjson.GetBytes(req.Payload, "prompt_cache_key").String()); promptCacheKey != "" { + return codexContinuity{Key: promptCacheKey, Source: "prompt_cache_key"} + } + if opts.Metadata != nil { + if raw, ok := opts.Metadata[cliproxyexecutor.ExecutionSessionMetadataKey]; ok && raw != nil { + switch v := raw.(type) { + case string: + if trimmed := strings.TrimSpace(v); trimmed != "" { + return codexContinuity{Key: trimmed, Source: "execution_session"} + } + case []byte: + if trimmed := strings.TrimSpace(string(v)); trimmed != "" { + return codexContinuity{Key: trimmed, Source: "execution_session"} + } + } + } + } + if ginCtx := ginContextFrom(ctx); ginCtx != nil { + if ginCtx.Request != nil { + if v := strings.TrimSpace(ginCtx.GetHeader("Idempotency-Key")); v != "" { + return codexContinuity{Key: v, Source: "idempotency_key"} + } + } + if v, exists := ginCtx.Get("apiKey"); exists && v != nil { + switch value := v.(type) { + case string: + if trimmed := strings.TrimSpace(value); trimmed != "" { + return codexContinuity{Key: uuid.NewSHA1(uuid.NameSpaceOID, []byte("cli-proxy-api:codex:prompt-cache:"+trimmed)).String(), Source: "client_principal"} + } + case fmt.Stringer: + if trimmed := strings.TrimSpace(value.String()); trimmed != "" { + return codexContinuity{Key: uuid.NewSHA1(uuid.NameSpaceOID, []byte("cli-proxy-api:codex:prompt-cache:"+trimmed)).String(), Source: "client_principal"} + } + default: + trimmed := strings.TrimSpace(fmt.Sprintf("%v", value)) + if trimmed != "" { + return codexContinuity{Key: uuid.NewSHA1(uuid.NameSpaceOID, []byte("cli-proxy-api:codex:prompt-cache:"+trimmed)).String(), Source: "client_principal"} + } + } + } + } + if opts.Metadata != nil { + if raw, ok := opts.Metadata[codexAuthAffinityMetadataKey]; ok && raw != nil { + switch v := raw.(type) { + case string: + if trimmed := strings.TrimSpace(v); trimmed != "" { + return codexContinuity{Key: trimmed, Source: "auth_affinity"} + } + case []byte: + if trimmed := strings.TrimSpace(string(v)); trimmed != "" { + return codexContinuity{Key: trimmed, Source: "auth_affinity"} + } + } + } + } + if auth != nil { + if authID := strings.TrimSpace(auth.ID); authID != "" { + return codexContinuity{Key: uuid.NewSHA1(uuid.NameSpaceOID, []byte("cli-proxy-api:codex:prompt-cache:auth:"+authID)).String(), Source: "auth_id"} + } + } + return codexContinuity{} +} + +func applyCodexContinuityBody(rawJSON []byte, continuity codexContinuity) []byte { + if continuity.Key == "" { + return rawJSON + } + rawJSON, _ = sjson.SetBytes(rawJSON, "prompt_cache_key", continuity.Key) + return rawJSON +} + +func applyCodexContinuityHeaders(headers http.Header, continuity codexContinuity) { + if headers == nil || continuity.Key == "" { + return + } + headers.Set("session_id", continuity.Key) +} + +func logCodexRequestDiagnostics(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options, headers http.Header, body []byte, continuity codexContinuity) { + if !log.IsLevelEnabled(log.DebugLevel) { + return + } + entry := logWithRequestID(ctx) + authID := "" + authFile := "" + if auth != nil { + authID = strings.TrimSpace(auth.ID) + authFile = strings.TrimSpace(auth.FileName) + } + selectedAuthID := "" + executionSessionID := "" + if opts.Metadata != nil { + if raw, ok := opts.Metadata[cliproxyexecutor.SelectedAuthMetadataKey]; ok && raw != nil { + switch v := raw.(type) { + case string: + selectedAuthID = strings.TrimSpace(v) + case []byte: + selectedAuthID = strings.TrimSpace(string(v)) + } + } + if raw, ok := opts.Metadata[cliproxyexecutor.ExecutionSessionMetadataKey]; ok && raw != nil { + switch v := raw.(type) { + case string: + executionSessionID = strings.TrimSpace(v) + case []byte: + executionSessionID = strings.TrimSpace(string(v)) + } + } + } + entry.Debugf( + "codex request diagnostics auth_id=%s selected_auth_id=%s auth_file=%s exec_session=%s continuity_source=%s session_id=%s prompt_cache_key=%s prompt_cache_retention=%s store=%t has_instructions=%t reasoning_effort=%s reasoning_summary=%s chatgpt_account_id=%t originator=%s model=%s source_format=%s", + authID, + selectedAuthID, + authFile, + executionSessionID, + continuity.Source, + strings.TrimSpace(headers.Get("session_id")), + gjson.GetBytes(body, "prompt_cache_key").String(), + gjson.GetBytes(body, "prompt_cache_retention").String(), + gjson.GetBytes(body, "store").Bool(), + gjson.GetBytes(body, "instructions").Exists(), + gjson.GetBytes(body, "reasoning.effort").String(), + gjson.GetBytes(body, "reasoning.summary").String(), + strings.TrimSpace(headers.Get("Chatgpt-Account-Id")) != "", + strings.TrimSpace(headers.Get("Originator")), + req.Model, + opts.SourceFormat.String(), + ) +} diff --git a/internal/runtime/executor/codex_executor.go b/internal/runtime/executor/codex_executor.go index 7e4163b8..766a081a 100644 --- a/internal/runtime/executor/codex_executor.go +++ b/internal/runtime/executor/codex_executor.go @@ -111,18 +111,18 @@ func (e *CodexExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, re body, _ = sjson.SetBytes(body, "model", baseModel) body, _ = sjson.SetBytes(body, "stream", true) body, _ = sjson.DeleteBytes(body, "previous_response_id") - body, _ = sjson.DeleteBytes(body, "prompt_cache_retention") body, _ = sjson.DeleteBytes(body, "safety_identifier") if !gjson.GetBytes(body, "instructions").Exists() { body, _ = sjson.SetBytes(body, "instructions", "") } url := strings.TrimSuffix(baseURL, "/") + "/responses" - httpReq, err := e.cacheHelper(ctx, from, url, req, body) + httpReq, continuity, err := e.cacheHelper(ctx, auth, from, url, req, opts, body) if err != nil { return resp, err } applyCodexHeaders(httpReq, auth, apiKey, true, e.cfg) + logCodexRequestDiagnostics(ctx, auth, req, opts, httpReq.Header, body, continuity) var authID, authLabel, authType, authValue string if auth != nil { authID = auth.ID @@ -222,11 +222,12 @@ func (e *CodexExecutor) executeCompact(ctx context.Context, auth *cliproxyauth.A body, _ = sjson.DeleteBytes(body, "stream") url := strings.TrimSuffix(baseURL, "/") + "/responses/compact" - httpReq, err := e.cacheHelper(ctx, from, url, req, body) + httpReq, continuity, err := e.cacheHelper(ctx, auth, from, url, req, opts, body) if err != nil { return resp, err } applyCodexHeaders(httpReq, auth, apiKey, false, e.cfg) + logCodexRequestDiagnostics(ctx, auth, req, opts, httpReq.Header, body, continuity) var authID, authLabel, authType, authValue string if auth != nil { authID = auth.ID @@ -309,7 +310,6 @@ func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au requestedModel := payloadRequestedModel(opts, req.Model) body = applyPayloadConfigWithRoot(e.cfg, baseModel, to.String(), "", body, originalTranslated, requestedModel) body, _ = sjson.DeleteBytes(body, "previous_response_id") - body, _ = sjson.DeleteBytes(body, "prompt_cache_retention") body, _ = sjson.DeleteBytes(body, "safety_identifier") body, _ = sjson.SetBytes(body, "model", baseModel) if !gjson.GetBytes(body, "instructions").Exists() { @@ -317,11 +317,12 @@ func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au } url := strings.TrimSuffix(baseURL, "/") + "/responses" - httpReq, err := e.cacheHelper(ctx, from, url, req, body) + httpReq, continuity, err := e.cacheHelper(ctx, auth, from, url, req, opts, body) if err != nil { return nil, err } applyCodexHeaders(httpReq, auth, apiKey, true, e.cfg) + logCodexRequestDiagnostics(ctx, auth, req, opts, httpReq.Header, body, continuity) var authID, authLabel, authType, authValue string if auth != nil { authID = auth.ID @@ -596,8 +597,9 @@ func (e *CodexExecutor) Refresh(ctx context.Context, auth *cliproxyauth.Auth) (* return auth, nil } -func (e *CodexExecutor) cacheHelper(ctx context.Context, from sdktranslator.Format, url string, req cliproxyexecutor.Request, rawJSON []byte) (*http.Request, error) { +func (e *CodexExecutor) cacheHelper(ctx context.Context, auth *cliproxyauth.Auth, from sdktranslator.Format, url string, req cliproxyexecutor.Request, opts cliproxyexecutor.Options, rawJSON []byte) (*http.Request, codexContinuity, error) { var cache codexCache + continuity := codexContinuity{} if from == "claude" { userIDResult := gjson.GetBytes(req.Payload, "metadata.user_id") if userIDResult.Exists() { @@ -615,25 +617,20 @@ func (e *CodexExecutor) cacheHelper(ctx context.Context, from sdktranslator.Form promptCacheKey := gjson.GetBytes(req.Payload, "prompt_cache_key") if promptCacheKey.Exists() { cache.ID = promptCacheKey.String() + continuity = codexContinuity{Key: cache.ID, Source: "prompt_cache_key"} } } else if from == "openai" { - if apiKey := strings.TrimSpace(apiKeyFromContext(ctx)); apiKey != "" { - cache.ID = uuid.NewSHA1(uuid.NameSpaceOID, []byte("cli-proxy-api:codex:prompt-cache:"+apiKey)).String() - } + continuity = resolveCodexContinuity(ctx, auth, req, opts) + cache.ID = continuity.Key } - if cache.ID != "" { - rawJSON, _ = sjson.SetBytes(rawJSON, "prompt_cache_key", cache.ID) - } + rawJSON = applyCodexContinuityBody(rawJSON, continuity) httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(rawJSON)) if err != nil { - return nil, err + return nil, continuity, err } - if cache.ID != "" { - httpReq.Header.Set("Conversation_id", cache.ID) - httpReq.Header.Set("Session_id", cache.ID) - } - return httpReq, nil + applyCodexContinuityHeaders(httpReq.Header, continuity) + return httpReq, continuity, nil } func applyCodexHeaders(r *http.Request, auth *cliproxyauth.Auth, token string, stream bool, cfg *config.Config) { @@ -646,7 +643,7 @@ func applyCodexHeaders(r *http.Request, auth *cliproxyauth.Auth, token string, s } misc.EnsureHeader(r.Header, ginHeaders, "Version", "") - misc.EnsureHeader(r.Header, ginHeaders, "Session_id", uuid.NewString()) + misc.EnsureHeader(r.Header, ginHeaders, "session_id", uuid.NewString()) misc.EnsureHeader(r.Header, ginHeaders, "X-Codex-Turn-Metadata", "") misc.EnsureHeader(r.Header, ginHeaders, "X-Client-Request-Id", "") cfgUserAgent, _ := codexHeaderDefaults(cfg, auth) diff --git a/internal/runtime/executor/codex_executor_cache_test.go b/internal/runtime/executor/codex_executor_cache_test.go index d6dca031..116b06ff 100644 --- a/internal/runtime/executor/codex_executor_cache_test.go +++ b/internal/runtime/executor/codex_executor_cache_test.go @@ -8,6 +8,7 @@ import ( "github.com/gin-gonic/gin" "github.com/google/uuid" + cliproxyauth "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth" cliproxyexecutor "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/executor" sdktranslator "github.com/router-for-me/CLIProxyAPI/v6/sdk/translator" "github.com/tidwall/gjson" @@ -27,7 +28,7 @@ func TestCodexExecutorCacheHelper_OpenAIChatCompletions_StablePromptCacheKeyFrom } url := "https://example.com/responses" - httpReq, err := executor.cacheHelper(ctx, sdktranslator.FromString("openai"), url, req, rawJSON) + httpReq, _, err := executor.cacheHelper(ctx, nil, sdktranslator.FromString("openai"), url, req, cliproxyexecutor.Options{}, rawJSON) if err != nil { t.Fatalf("cacheHelper error: %v", err) } @@ -42,14 +43,14 @@ func TestCodexExecutorCacheHelper_OpenAIChatCompletions_StablePromptCacheKeyFrom if gotKey != expectedKey { t.Fatalf("prompt_cache_key = %q, want %q", gotKey, expectedKey) } - if gotConversation := httpReq.Header.Get("Conversation_id"); gotConversation != expectedKey { - t.Fatalf("Conversation_id = %q, want %q", gotConversation, expectedKey) + if gotSession := httpReq.Header.Get("session_id"); gotSession != expectedKey { + t.Fatalf("session_id = %q, want %q", gotSession, expectedKey) } - if gotSession := httpReq.Header.Get("Session_id"); gotSession != expectedKey { - t.Fatalf("Session_id = %q, want %q", gotSession, expectedKey) + if got := httpReq.Header.Get("Conversation_id"); got != "" { + t.Fatalf("Conversation_id = %q, want empty", got) } - httpReq2, err := executor.cacheHelper(ctx, sdktranslator.FromString("openai"), url, req, rawJSON) + httpReq2, _, err := executor.cacheHelper(ctx, nil, sdktranslator.FromString("openai"), url, req, cliproxyexecutor.Options{}, rawJSON) if err != nil { t.Fatalf("cacheHelper error (second call): %v", err) } @@ -62,3 +63,91 @@ func TestCodexExecutorCacheHelper_OpenAIChatCompletions_StablePromptCacheKeyFrom t.Fatalf("prompt_cache_key (second call) = %q, want %q", gotKey2, expectedKey) } } + +func TestCodexExecutorCacheHelper_OpenAIResponses_PreservesPromptCacheRetention(t *testing.T) { + executor := &CodexExecutor{} + url := "https://example.com/responses" + req := cliproxyexecutor.Request{ + Model: "gpt-5.3-codex", + Payload: []byte(`{"model":"gpt-5.3-codex","prompt_cache_key":"cache-key-1","prompt_cache_retention":"persistent"}`), + } + rawJSON := []byte(`{"model":"gpt-5.3-codex","stream":true,"prompt_cache_retention":"persistent"}`) + + httpReq, _, err := executor.cacheHelper(context.Background(), nil, sdktranslator.FromString("openai-response"), url, req, cliproxyexecutor.Options{}, rawJSON) + if err != nil { + t.Fatalf("cacheHelper error: %v", err) + } + + body, err := io.ReadAll(httpReq.Body) + if err != nil { + t.Fatalf("read request body: %v", err) + } + + if got := gjson.GetBytes(body, "prompt_cache_key").String(); got != "cache-key-1" { + t.Fatalf("prompt_cache_key = %q, want %q", got, "cache-key-1") + } + if got := gjson.GetBytes(body, "prompt_cache_retention").String(); got != "persistent" { + t.Fatalf("prompt_cache_retention = %q, want %q", got, "persistent") + } + if got := httpReq.Header.Get("session_id"); got != "cache-key-1" { + t.Fatalf("session_id = %q, want %q", got, "cache-key-1") + } + if got := httpReq.Header.Get("Conversation_id"); got != "" { + t.Fatalf("Conversation_id = %q, want empty", got) + } +} + +func TestCodexExecutorCacheHelper_OpenAIChatCompletions_UsesExecutionSessionForContinuity(t *testing.T) { + executor := &CodexExecutor{} + rawJSON := []byte(`{"model":"gpt-5.4","stream":true}`) + req := cliproxyexecutor.Request{ + Model: "gpt-5.4", + Payload: []byte(`{"model":"gpt-5.4"}`), + } + opts := cliproxyexecutor.Options{Metadata: map[string]any{cliproxyexecutor.ExecutionSessionMetadataKey: "exec-session-1"}} + + httpReq, _, err := executor.cacheHelper(context.Background(), nil, sdktranslator.FromString("openai"), "https://example.com/responses", req, opts, rawJSON) + if err != nil { + t.Fatalf("cacheHelper error: %v", err) + } + + body, err := io.ReadAll(httpReq.Body) + if err != nil { + t.Fatalf("read request body: %v", err) + } + + if got := gjson.GetBytes(body, "prompt_cache_key").String(); got != "exec-session-1" { + t.Fatalf("prompt_cache_key = %q, want %q", got, "exec-session-1") + } + if got := httpReq.Header.Get("session_id"); got != "exec-session-1" { + t.Fatalf("session_id = %q, want %q", got, "exec-session-1") + } +} + +func TestCodexExecutorCacheHelper_OpenAIChatCompletions_FallsBackToStableAuthID(t *testing.T) { + executor := &CodexExecutor{} + rawJSON := []byte(`{"model":"gpt-5.4","stream":true}`) + req := cliproxyexecutor.Request{ + Model: "gpt-5.4", + Payload: []byte(`{"model":"gpt-5.4"}`), + } + auth := &cliproxyauth.Auth{ID: "codex-auth-1", Provider: "codex"} + + httpReq, _, err := executor.cacheHelper(context.Background(), auth, sdktranslator.FromString("openai"), "https://example.com/responses", req, cliproxyexecutor.Options{}, rawJSON) + if err != nil { + t.Fatalf("cacheHelper error: %v", err) + } + + body, err := io.ReadAll(httpReq.Body) + if err != nil { + t.Fatalf("read request body: %v", err) + } + + expected := uuid.NewSHA1(uuid.NameSpaceOID, []byte("cli-proxy-api:codex:prompt-cache:auth:codex-auth-1")).String() + if got := gjson.GetBytes(body, "prompt_cache_key").String(); got != expected { + t.Fatalf("prompt_cache_key = %q, want %q", got, expected) + } + if got := httpReq.Header.Get("session_id"); got != expected { + t.Fatalf("session_id = %q, want %q", got, expected) + } +} diff --git a/internal/runtime/executor/codex_websockets_executor.go b/internal/runtime/executor/codex_websockets_executor.go index fca82fe7..b8ae11ae 100644 --- a/internal/runtime/executor/codex_websockets_executor.go +++ b/internal/runtime/executor/codex_websockets_executor.go @@ -178,7 +178,6 @@ func (e *CodexWebsocketsExecutor) Execute(ctx context.Context, auth *cliproxyaut body, _ = sjson.SetBytes(body, "model", baseModel) body, _ = sjson.SetBytes(body, "stream", true) body, _ = sjson.DeleteBytes(body, "previous_response_id") - body, _ = sjson.DeleteBytes(body, "prompt_cache_retention") body, _ = sjson.DeleteBytes(body, "safety_identifier") if !gjson.GetBytes(body, "instructions").Exists() { body, _ = sjson.SetBytes(body, "instructions", "") @@ -191,6 +190,7 @@ func (e *CodexWebsocketsExecutor) Execute(ctx context.Context, auth *cliproxyaut } body, wsHeaders := applyCodexPromptCacheHeaders(from, req, body) + continuity := codexContinuity{Key: strings.TrimSpace(wsHeaders.Get("session_id"))} wsHeaders = applyCodexWebsocketHeaders(ctx, wsHeaders, auth, apiKey, e.cfg) var authID, authLabel, authType, authValue string @@ -209,6 +209,7 @@ func (e *CodexWebsocketsExecutor) Execute(ctx context.Context, auth *cliproxyaut } wsReqBody := buildCodexWebsocketRequestBody(body) + logCodexRequestDiagnostics(ctx, auth, req, opts, wsHeaders, body, continuity) recordAPIRequest(ctx, e.cfg, upstreamRequestLog{ URL: wsURL, Method: "WEBSOCKET", @@ -386,6 +387,7 @@ func (e *CodexWebsocketsExecutor) ExecuteStream(ctx context.Context, auth *clipr } body, wsHeaders := applyCodexPromptCacheHeaders(from, req, body) + continuity := codexContinuity{Key: strings.TrimSpace(wsHeaders.Get("session_id"))} wsHeaders = applyCodexWebsocketHeaders(ctx, wsHeaders, auth, apiKey, e.cfg) var authID, authLabel, authType, authValue string @@ -403,6 +405,7 @@ func (e *CodexWebsocketsExecutor) ExecuteStream(ctx context.Context, auth *clipr } wsReqBody := buildCodexWebsocketRequestBody(body) + logCodexRequestDiagnostics(ctx, auth, req, opts, wsHeaders, body, continuity) recordAPIRequest(ctx, e.cfg, upstreamRequestLog{ URL: wsURL, Method: "WEBSOCKET", @@ -790,8 +793,7 @@ func applyCodexPromptCacheHeaders(from sdktranslator.Format, req cliproxyexecuto if cache.ID != "" { rawJSON, _ = sjson.SetBytes(rawJSON, "prompt_cache_key", cache.ID) - headers.Set("Conversation_id", cache.ID) - headers.Set("Session_id", cache.ID) + headers.Set("session_id", cache.ID) } return rawJSON, headers @@ -826,7 +828,7 @@ func applyCodexWebsocketHeaders(ctx context.Context, headers http.Header, auth * betaHeader = codexResponsesWebsocketBetaHeaderValue } headers.Set("OpenAI-Beta", betaHeader) - misc.EnsureHeader(headers, ginHeaders, "Session_id", uuid.NewString()) + misc.EnsureHeader(headers, ginHeaders, "session_id", uuid.NewString()) ensureHeaderWithConfigPrecedence(headers, ginHeaders, "User-Agent", cfgUserAgent, codexUserAgent) isAPIKey := false diff --git a/internal/runtime/executor/codex_websockets_executor_test.go b/internal/runtime/executor/codex_websockets_executor_test.go index d34e7c39..733318a3 100644 --- a/internal/runtime/executor/codex_websockets_executor_test.go +++ b/internal/runtime/executor/codex_websockets_executor_test.go @@ -9,7 +9,9 @@ import ( "github.com/gin-gonic/gin" "github.com/router-for-me/CLIProxyAPI/v6/internal/config" cliproxyauth "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth" + cliproxyexecutor "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/executor" sdkconfig "github.com/router-for-me/CLIProxyAPI/v6/sdk/config" + sdktranslator "github.com/router-for-me/CLIProxyAPI/v6/sdk/translator" "github.com/tidwall/gjson" ) @@ -32,6 +34,29 @@ func TestBuildCodexWebsocketRequestBodyPreservesPreviousResponseID(t *testing.T) } } +func TestApplyCodexPromptCacheHeaders_PreservesPromptCacheRetention(t *testing.T) { + req := cliproxyexecutor.Request{ + Model: "gpt-5-codex", + Payload: []byte(`{"prompt_cache_key":"cache-key-1","prompt_cache_retention":"persistent"}`), + } + body := []byte(`{"model":"gpt-5-codex","stream":true,"prompt_cache_retention":"persistent"}`) + + updatedBody, headers := applyCodexPromptCacheHeaders(sdktranslator.FromString("openai-response"), req, body) + + if got := gjson.GetBytes(updatedBody, "prompt_cache_key").String(); got != "cache-key-1" { + t.Fatalf("prompt_cache_key = %q, want %q", got, "cache-key-1") + } + if got := gjson.GetBytes(updatedBody, "prompt_cache_retention").String(); got != "persistent" { + t.Fatalf("prompt_cache_retention = %q, want %q", got, "persistent") + } + if got := headers.Get("session_id"); got != "cache-key-1" { + t.Fatalf("session_id = %q, want %q", got, "cache-key-1") + } + if got := headers.Get("Conversation_id"); got != "" { + t.Fatalf("Conversation_id = %q, want empty", got) + } +} + func TestApplyCodexWebsocketHeadersDefaultsToCurrentResponsesBeta(t *testing.T) { headers := applyCodexWebsocketHeaders(context.Background(), http.Header{}, nil, "", nil) diff --git a/internal/translator/codex/openai/chat-completions/codex_openai_request.go b/internal/translator/codex/openai/chat-completions/codex_openai_request.go index 6cc701e7..7d24d60e 100644 --- a/internal/translator/codex/openai/chat-completions/codex_openai_request.go +++ b/internal/translator/codex/openai/chat-completions/codex_openai_request.go @@ -65,6 +65,9 @@ func ConvertOpenAIRequestToCodex(modelName string, inputRawJSON []byte, stream b // Model out, _ = sjson.SetBytes(out, "model", modelName) + if v := gjson.GetBytes(rawJSON, "prompt_cache_retention"); v.Exists() { + out, _ = sjson.SetBytes(out, "prompt_cache_retention", v.Value()) + } // Build tool name shortening map from original tools (if any) originalToolNameMap := map[string]string{} diff --git a/internal/translator/codex/openai/chat-completions/codex_openai_request_test.go b/internal/translator/codex/openai/chat-completions/codex_openai_request_test.go index 84c8dad2..1202980f 100644 --- a/internal/translator/codex/openai/chat-completions/codex_openai_request_test.go +++ b/internal/translator/codex/openai/chat-completions/codex_openai_request_test.go @@ -633,3 +633,19 @@ func TestToolsDefinitionTranslated(t *testing.T) { t.Errorf("tool 'search' not found in output tools: %s", gjson.Get(result, "tools").Raw) } } + +func TestPromptCacheRetentionPreserved(t *testing.T) { + input := []byte(`{ + "model": "gpt-4o", + "prompt_cache_retention": "persistent", + "messages": [ + {"role": "user", "content": "Hello"} + ] + }`) + + out := ConvertOpenAIRequestToCodex("gpt-4o", input, true) + + if got := gjson.GetBytes(out, "prompt_cache_retention").String(); got != "persistent" { + t.Fatalf("prompt_cache_retention = %q, want %q", got, "persistent") + } +} diff --git a/sdk/api/handlers/handlers.go b/sdk/api/handlers/handlers.go index 28ab970d..8679f1a1 100644 --- a/sdk/api/handlers/handlers.go +++ b/sdk/api/handlers/handlers.go @@ -46,6 +46,7 @@ type ErrorDetail struct { } const idempotencyKeyMetadataKey = "idempotency_key" +const authAffinityMetadataKey = "auth_affinity_key" const ( defaultStreamingKeepAliveSeconds = 0 @@ -189,9 +190,11 @@ func requestExecutionMetadata(ctx context.Context) map[string]any { // Idempotency-Key is an optional client-supplied header used to correlate retries. // It is forwarded as execution metadata; when absent we generate a UUID. key := "" + explicitIdempotencyKey := "" if ctx != nil { if ginCtx, ok := ctx.Value("gin").(*gin.Context); ok && ginCtx != nil && ginCtx.Request != nil { - key = strings.TrimSpace(ginCtx.GetHeader("Idempotency-Key")) + explicitIdempotencyKey = strings.TrimSpace(ginCtx.GetHeader("Idempotency-Key")) + key = explicitIdempotencyKey } } if key == "" { @@ -207,6 +210,15 @@ func requestExecutionMetadata(ctx context.Context) map[string]any { } if executionSessionID := executionSessionIDFromContext(ctx); executionSessionID != "" { meta[coreexecutor.ExecutionSessionMetadataKey] = executionSessionID + meta[authAffinityMetadataKey] = executionSessionID + } else if explicitIdempotencyKey != "" { + meta[authAffinityMetadataKey] = explicitIdempotencyKey + } else if ctx != nil { + if ginCtx, ok := ctx.Value("gin").(*gin.Context); ok && ginCtx != nil { + if apiKey, exists := ginCtx.Get("apiKey"); exists && apiKey != nil { + meta[authAffinityMetadataKey] = fmt.Sprintf("principal:%v", apiKey) + } + } } return meta } diff --git a/sdk/cliproxy/auth/conductor.go b/sdk/cliproxy/auth/conductor.go index 9f46c7cf..7a62f852 100644 --- a/sdk/cliproxy/auth/conductor.go +++ b/sdk/cliproxy/auth/conductor.go @@ -128,13 +128,15 @@ func (NoopHook) OnResult(context.Context, Result) {} // Manager orchestrates auth lifecycle, selection, execution, and persistence. type Manager struct { - store Store - executors map[string]ProviderExecutor - selector Selector - hook Hook - mu sync.RWMutex - auths map[string]*Auth - scheduler *authScheduler + store Store + executors map[string]ProviderExecutor + selector Selector + hook Hook + mu sync.RWMutex + auths map[string]*Auth + scheduler *authScheduler + affinityMu sync.RWMutex + affinity map[string]string // providerOffsets tracks per-model provider rotation state for multi-provider routing. providerOffsets map[string]int @@ -179,6 +181,7 @@ func NewManager(store Store, selector Selector, hook Hook) *Manager { selector: selector, hook: hook, auths: make(map[string]*Auth), + affinity: make(map[string]string), providerOffsets: make(map[string]int), modelPoolOffsets: make(map[string]int), refreshSemaphore: make(chan struct{}, refreshMaxConcurrency), @@ -1090,6 +1093,12 @@ func (m *Manager) executeMixedOnce(ctx context.Context, providers []string, req entry := logEntryWithRequestID(ctx) debugLogAuthSelection(entry, auth, provider, req.Model) publishSelectedAuthMetadata(opts.Metadata, auth.ID) + if affinityKey := authAffinityKeyFromMetadata(opts.Metadata); affinityKey != "" { + m.SetAuthAffinity(affinityKey, auth.ID) + if log.IsLevelEnabled(log.DebugLevel) { + entry.Debugf("auth affinity pinned key=%s auth_id=%s provider=%s model=%s", affinityKey, auth.ID, provider, req.Model) + } + } tried[auth.ID] = struct{}{} execCtx := ctx @@ -1168,6 +1177,12 @@ func (m *Manager) executeCountMixedOnce(ctx context.Context, providers []string, entry := logEntryWithRequestID(ctx) debugLogAuthSelection(entry, auth, provider, req.Model) publishSelectedAuthMetadata(opts.Metadata, auth.ID) + if affinityKey := authAffinityKeyFromMetadata(opts.Metadata); affinityKey != "" { + m.SetAuthAffinity(affinityKey, auth.ID) + if log.IsLevelEnabled(log.DebugLevel) { + entry.Debugf("auth affinity pinned key=%s auth_id=%s provider=%s model=%s", affinityKey, auth.ID, provider, req.Model) + } + } tried[auth.ID] = struct{}{} execCtx := ctx @@ -1254,6 +1269,12 @@ func (m *Manager) executeStreamMixedOnce(ctx context.Context, providers []string entry := logEntryWithRequestID(ctx) debugLogAuthSelection(entry, auth, provider, req.Model) publishSelectedAuthMetadata(opts.Metadata, auth.ID) + if affinityKey := authAffinityKeyFromMetadata(opts.Metadata); affinityKey != "" { + m.SetAuthAffinity(affinityKey, auth.ID) + if log.IsLevelEnabled(log.DebugLevel) { + entry.Debugf("auth affinity pinned key=%s auth_id=%s provider=%s model=%s", affinityKey, auth.ID, provider, req.Model) + } + } tried[auth.ID] = struct{}{} execCtx := ctx @@ -2222,6 +2243,58 @@ func (m *Manager) CloseExecutionSession(sessionID string) { } } +func authAffinityKeyFromMetadata(meta map[string]any) string { + if len(meta) == 0 { + return "" + } + raw, ok := meta["auth_affinity_key"] + if !ok || raw == nil { + return "" + } + switch val := raw.(type) { + case string: + return strings.TrimSpace(val) + case []byte: + return strings.TrimSpace(string(val)) + default: + return "" + } +} + +func (m *Manager) AuthAffinity(key string) string { + key = strings.TrimSpace(key) + if m == nil || key == "" { + return "" + } + m.affinityMu.RLock() + defer m.affinityMu.RUnlock() + return strings.TrimSpace(m.affinity[key]) +} + +func (m *Manager) SetAuthAffinity(key, authID string) { + key = strings.TrimSpace(key) + authID = strings.TrimSpace(authID) + if m == nil || key == "" || authID == "" { + return + } + m.affinityMu.Lock() + if m.affinity == nil { + m.affinity = make(map[string]string) + } + m.affinity[key] = authID + m.affinityMu.Unlock() +} + +func (m *Manager) ClearAuthAffinity(key string) { + key = strings.TrimSpace(key) + if m == nil || key == "" { + return + } + m.affinityMu.Lock() + delete(m.affinity, key) + m.affinityMu.Unlock() +} + func (m *Manager) useSchedulerFastPath() bool { if m == nil || m.scheduler == nil { return false @@ -2305,6 +2378,18 @@ func (m *Manager) pickNextLegacy(ctx context.Context, provider, model string, op } func (m *Manager) pickNext(ctx context.Context, provider, model string, opts cliproxyexecutor.Options, tried map[string]struct{}) (*Auth, ProviderExecutor, error) { + if pinnedAuthID := pinnedAuthIDFromMetadata(opts.Metadata); pinnedAuthID == "" { + if affinityKey := authAffinityKeyFromMetadata(opts.Metadata); affinityKey != "" { + if affinityAuthID := m.AuthAffinity(affinityKey); affinityAuthID != "" { + meta := opts.Metadata + if meta == nil { + meta = make(map[string]any) + opts.Metadata = meta + } + meta[cliproxyexecutor.PinnedAuthMetadataKey] = affinityAuthID + } + } + } if !m.useSchedulerFastPath() { return m.pickNextLegacy(ctx, provider, model, opts, tried) } @@ -2419,6 +2504,18 @@ func (m *Manager) pickNextMixedLegacy(ctx context.Context, providers []string, m } func (m *Manager) pickNextMixed(ctx context.Context, providers []string, model string, opts cliproxyexecutor.Options, tried map[string]struct{}) (*Auth, ProviderExecutor, string, error) { + if pinnedAuthID := pinnedAuthIDFromMetadata(opts.Metadata); pinnedAuthID == "" { + if affinityKey := authAffinityKeyFromMetadata(opts.Metadata); affinityKey != "" { + if affinityAuthID := m.AuthAffinity(affinityKey); affinityAuthID != "" { + meta := opts.Metadata + if meta == nil { + meta = make(map[string]any) + opts.Metadata = meta + } + meta[cliproxyexecutor.PinnedAuthMetadataKey] = affinityAuthID + } + } + } if !m.useSchedulerFastPath() { return m.pickNextMixedLegacy(ctx, providers, model, opts, tried) } diff --git a/sdk/cliproxy/auth/conductor_affinity_test.go b/sdk/cliproxy/auth/conductor_affinity_test.go new file mode 100644 index 00000000..e84f7c96 --- /dev/null +++ b/sdk/cliproxy/auth/conductor_affinity_test.go @@ -0,0 +1,85 @@ +package auth + +import ( + "context" + "net/http" + "testing" + + "github.com/router-for-me/CLIProxyAPI/v6/internal/registry" + cliproxyexecutor "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/executor" +) + +type affinityTestExecutor struct{ id string } + +func (e affinityTestExecutor) Identifier() string { return e.id } + +func (e affinityTestExecutor) Execute(context.Context, *Auth, cliproxyexecutor.Request, cliproxyexecutor.Options) (cliproxyexecutor.Response, error) { + return cliproxyexecutor.Response{}, nil +} + +func (e affinityTestExecutor) ExecuteStream(context.Context, *Auth, cliproxyexecutor.Request, cliproxyexecutor.Options) (*cliproxyexecutor.StreamResult, error) { + ch := make(chan cliproxyexecutor.StreamChunk) + close(ch) + return &cliproxyexecutor.StreamResult{Chunks: ch}, nil +} + +func (e affinityTestExecutor) Refresh(_ context.Context, auth *Auth) (*Auth, error) { return auth, nil } + +func (e affinityTestExecutor) CountTokens(context.Context, *Auth, cliproxyexecutor.Request, cliproxyexecutor.Options) (cliproxyexecutor.Response, error) { + return cliproxyexecutor.Response{}, nil +} + +func (e affinityTestExecutor) HttpRequest(context.Context, *Auth, *http.Request) (*http.Response, error) { + return nil, nil +} + +func TestManagerPickNextMixedUsesAuthAffinity(t *testing.T) { + t.Parallel() + + manager := NewManager(nil, &RoundRobinSelector{}, nil) + manager.executors["codex"] = affinityTestExecutor{id: "codex"} + reg := registry.GetGlobalRegistry() + reg.RegisterClient("codex-a", "codex", []*registry.ModelInfo{{ID: "gpt-5.4"}}) + reg.RegisterClient("codex-b", "codex", []*registry.ModelInfo{{ID: "gpt-5.4"}}) + t.Cleanup(func() { + reg.UnregisterClient("codex-a") + reg.UnregisterClient("codex-b") + }) + if _, errRegister := manager.Register(context.Background(), &Auth{ID: "codex-a", Provider: "codex"}); errRegister != nil { + t.Fatalf("Register(codex-a) error = %v", errRegister) + } + if _, errRegister := manager.Register(context.Background(), &Auth{ID: "codex-b", Provider: "codex"}); errRegister != nil { + t.Fatalf("Register(codex-b) error = %v", errRegister) + } + + manager.SetAuthAffinity("idem-1", "codex-b") + opts := cliproxyexecutor.Options{Metadata: map[string]any{"auth_affinity_key": "idem-1"}} + + got, _, provider, errPick := manager.pickNextMixed(context.Background(), []string{"codex"}, "gpt-5.4", opts, map[string]struct{}{}) + if errPick != nil { + t.Fatalf("pickNextMixed() error = %v", errPick) + } + if provider != "codex" { + t.Fatalf("provider = %q, want %q", provider, "codex") + } + if got == nil || got.ID != "codex-b" { + t.Fatalf("auth.ID = %v, want codex-b", got) + } + if pinned := pinnedAuthIDFromMetadata(opts.Metadata); pinned != "codex-b" { + t.Fatalf("pinned auth metadata = %q, want %q", pinned, "codex-b") + } +} + +func TestManagerAuthAffinityRoundTrip(t *testing.T) { + t.Parallel() + + manager := NewManager(nil, nil, nil) + manager.SetAuthAffinity("idem-2", "auth-1") + if got := manager.AuthAffinity("idem-2"); got != "auth-1" { + t.Fatalf("AuthAffinity = %q, want %q", got, "auth-1") + } + manager.ClearAuthAffinity("idem-2") + if got := manager.AuthAffinity("idem-2"); got != "" { + t.Fatalf("AuthAffinity after clear = %q, want empty", got) + } +} From 62b17f40a100c312bbdd32f9e5631858ff85af8c Mon Sep 17 00:00:00 2001 From: VooDisss Date: Fri, 27 Mar 2026 18:11:57 +0200 Subject: [PATCH 2/8] refactor(codex): align continuity helpers with review feedback Align websocket continuity resolution with the HTTP Codex path, make auth-affinity principal keys use a stable string representation, and extract small helpers that remove duplicated continuity and affinity logic without changing the validated cache-hit behavior. --- internal/runtime/executor/codex_continuity.go | 99 +++++++------------ .../executor/codex_websockets_executor.go | 23 ++--- .../codex_websockets_executor_test.go | 2 +- sdk/api/handlers/handlers.go | 20 +++- sdk/cliproxy/auth/conductor.go | 40 +++----- 5 files changed, 86 insertions(+), 98 deletions(-) diff --git a/internal/runtime/executor/codex_continuity.go b/internal/runtime/executor/codex_continuity.go index e7d4508f..3ebb721f 100644 --- a/internal/runtime/executor/codex_continuity.go +++ b/internal/runtime/executor/codex_continuity.go @@ -21,23 +21,44 @@ type codexContinuity struct { Source string } +func metadataString(meta map[string]any, key string) string { + if len(meta) == 0 { + return "" + } + raw, ok := meta[key] + if !ok || raw == nil { + return "" + } + switch v := raw.(type) { + case string: + return strings.TrimSpace(v) + case []byte: + return strings.TrimSpace(string(v)) + default: + return "" + } +} + +func principalString(raw any) string { + switch v := raw.(type) { + case string: + return strings.TrimSpace(v) + case fmt.Stringer: + return strings.TrimSpace(v.String()) + default: + return strings.TrimSpace(fmt.Sprintf("%v", raw)) + } +} + func resolveCodexContinuity(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) codexContinuity { if promptCacheKey := strings.TrimSpace(gjson.GetBytes(req.Payload, "prompt_cache_key").String()); promptCacheKey != "" { return codexContinuity{Key: promptCacheKey, Source: "prompt_cache_key"} } - if opts.Metadata != nil { - if raw, ok := opts.Metadata[cliproxyexecutor.ExecutionSessionMetadataKey]; ok && raw != nil { - switch v := raw.(type) { - case string: - if trimmed := strings.TrimSpace(v); trimmed != "" { - return codexContinuity{Key: trimmed, Source: "execution_session"} - } - case []byte: - if trimmed := strings.TrimSpace(string(v)); trimmed != "" { - return codexContinuity{Key: trimmed, Source: "execution_session"} - } - } - } + if executionSession := metadataString(opts.Metadata, cliproxyexecutor.ExecutionSessionMetadataKey); executionSession != "" { + return codexContinuity{Key: executionSession, Source: "execution_session"} + } + if affinityKey := metadataString(opts.Metadata, codexAuthAffinityMetadataKey); affinityKey != "" { + return codexContinuity{Key: affinityKey, Source: "auth_affinity"} } if ginCtx := ginContextFrom(ctx); ginCtx != nil { if ginCtx.Request != nil { @@ -46,34 +67,8 @@ func resolveCodexContinuity(ctx context.Context, auth *cliproxyauth.Auth, req cl } } if v, exists := ginCtx.Get("apiKey"); exists && v != nil { - switch value := v.(type) { - case string: - if trimmed := strings.TrimSpace(value); trimmed != "" { - return codexContinuity{Key: uuid.NewSHA1(uuid.NameSpaceOID, []byte("cli-proxy-api:codex:prompt-cache:"+trimmed)).String(), Source: "client_principal"} - } - case fmt.Stringer: - if trimmed := strings.TrimSpace(value.String()); trimmed != "" { - return codexContinuity{Key: uuid.NewSHA1(uuid.NameSpaceOID, []byte("cli-proxy-api:codex:prompt-cache:"+trimmed)).String(), Source: "client_principal"} - } - default: - trimmed := strings.TrimSpace(fmt.Sprintf("%v", value)) - if trimmed != "" { - return codexContinuity{Key: uuid.NewSHA1(uuid.NameSpaceOID, []byte("cli-proxy-api:codex:prompt-cache:"+trimmed)).String(), Source: "client_principal"} - } - } - } - } - if opts.Metadata != nil { - if raw, ok := opts.Metadata[codexAuthAffinityMetadataKey]; ok && raw != nil { - switch v := raw.(type) { - case string: - if trimmed := strings.TrimSpace(v); trimmed != "" { - return codexContinuity{Key: trimmed, Source: "auth_affinity"} - } - case []byte: - if trimmed := strings.TrimSpace(string(v)); trimmed != "" { - return codexContinuity{Key: trimmed, Source: "auth_affinity"} - } + if trimmed := principalString(v); trimmed != "" { + return codexContinuity{Key: uuid.NewSHA1(uuid.NameSpaceOID, []byte("cli-proxy-api:codex:prompt-cache:"+trimmed)).String(), Source: "client_principal"} } } } @@ -111,26 +106,8 @@ func logCodexRequestDiagnostics(ctx context.Context, auth *cliproxyauth.Auth, re authID = strings.TrimSpace(auth.ID) authFile = strings.TrimSpace(auth.FileName) } - selectedAuthID := "" - executionSessionID := "" - if opts.Metadata != nil { - if raw, ok := opts.Metadata[cliproxyexecutor.SelectedAuthMetadataKey]; ok && raw != nil { - switch v := raw.(type) { - case string: - selectedAuthID = strings.TrimSpace(v) - case []byte: - selectedAuthID = strings.TrimSpace(string(v)) - } - } - if raw, ok := opts.Metadata[cliproxyexecutor.ExecutionSessionMetadataKey]; ok && raw != nil { - switch v := raw.(type) { - case string: - executionSessionID = strings.TrimSpace(v) - case []byte: - executionSessionID = strings.TrimSpace(string(v)) - } - } - } + selectedAuthID := metadataString(opts.Metadata, cliproxyexecutor.SelectedAuthMetadataKey) + executionSessionID := metadataString(opts.Metadata, cliproxyexecutor.ExecutionSessionMetadataKey) entry.Debugf( "codex request diagnostics auth_id=%s selected_auth_id=%s auth_file=%s exec_session=%s continuity_source=%s session_id=%s prompt_cache_key=%s prompt_cache_retention=%s store=%t has_instructions=%t reasoning_effort=%s reasoning_summary=%s chatgpt_account_id=%t originator=%s model=%s source_format=%s", authID, diff --git a/internal/runtime/executor/codex_websockets_executor.go b/internal/runtime/executor/codex_websockets_executor.go index b8ae11ae..d0dd22c3 100644 --- a/internal/runtime/executor/codex_websockets_executor.go +++ b/internal/runtime/executor/codex_websockets_executor.go @@ -189,8 +189,7 @@ func (e *CodexWebsocketsExecutor) Execute(ctx context.Context, auth *cliproxyaut return resp, err } - body, wsHeaders := applyCodexPromptCacheHeaders(from, req, body) - continuity := codexContinuity{Key: strings.TrimSpace(wsHeaders.Get("session_id"))} + body, wsHeaders, continuity := applyCodexPromptCacheHeaders(ctx, auth, from, req, opts, body) wsHeaders = applyCodexWebsocketHeaders(ctx, wsHeaders, auth, apiKey, e.cfg) var authID, authLabel, authType, authValue string @@ -386,8 +385,7 @@ func (e *CodexWebsocketsExecutor) ExecuteStream(ctx context.Context, auth *clipr return nil, err } - body, wsHeaders := applyCodexPromptCacheHeaders(from, req, body) - continuity := codexContinuity{Key: strings.TrimSpace(wsHeaders.Get("session_id"))} + body, wsHeaders, continuity := applyCodexPromptCacheHeaders(ctx, auth, from, req, opts, body) wsHeaders = applyCodexWebsocketHeaders(ctx, wsHeaders, auth, apiKey, e.cfg) var authID, authLabel, authType, authValue string @@ -764,13 +762,14 @@ func buildCodexResponsesWebsocketURL(httpURL string) (string, error) { return parsed.String(), nil } -func applyCodexPromptCacheHeaders(from sdktranslator.Format, req cliproxyexecutor.Request, rawJSON []byte) ([]byte, http.Header) { +func applyCodexPromptCacheHeaders(ctx context.Context, auth *cliproxyauth.Auth, from sdktranslator.Format, req cliproxyexecutor.Request, opts cliproxyexecutor.Options, rawJSON []byte) ([]byte, http.Header, codexContinuity) { headers := http.Header{} if len(rawJSON) == 0 { - return rawJSON, headers + return rawJSON, headers, codexContinuity{} } var cache codexCache + continuity := codexContinuity{} if from == "claude" { userIDResult := gjson.GetBytes(req.Payload, "metadata.user_id") if userIDResult.Exists() { @@ -788,15 +787,17 @@ func applyCodexPromptCacheHeaders(from sdktranslator.Format, req cliproxyexecuto } else if from == "openai-response" { if promptCacheKey := gjson.GetBytes(req.Payload, "prompt_cache_key"); promptCacheKey.Exists() { cache.ID = promptCacheKey.String() + continuity = codexContinuity{Key: cache.ID, Source: "prompt_cache_key"} } + } else if from == "openai" { + continuity = resolveCodexContinuity(ctx, auth, req, opts) + cache.ID = continuity.Key } - if cache.ID != "" { - rawJSON, _ = sjson.SetBytes(rawJSON, "prompt_cache_key", cache.ID) - headers.Set("session_id", cache.ID) - } + rawJSON = applyCodexContinuityBody(rawJSON, continuity) + applyCodexContinuityHeaders(headers, continuity) - return rawJSON, headers + return rawJSON, headers, continuity } func applyCodexWebsocketHeaders(ctx context.Context, headers http.Header, auth *cliproxyauth.Auth, token string, cfg *config.Config) http.Header { diff --git a/internal/runtime/executor/codex_websockets_executor_test.go b/internal/runtime/executor/codex_websockets_executor_test.go index 733318a3..e86036bc 100644 --- a/internal/runtime/executor/codex_websockets_executor_test.go +++ b/internal/runtime/executor/codex_websockets_executor_test.go @@ -41,7 +41,7 @@ func TestApplyCodexPromptCacheHeaders_PreservesPromptCacheRetention(t *testing.T } body := []byte(`{"model":"gpt-5-codex","stream":true,"prompt_cache_retention":"persistent"}`) - updatedBody, headers := applyCodexPromptCacheHeaders(sdktranslator.FromString("openai-response"), req, body) + updatedBody, headers, _ := applyCodexPromptCacheHeaders(context.Background(), nil, sdktranslator.FromString("openai-response"), req, cliproxyexecutor.Options{}, body) if got := gjson.GetBytes(updatedBody, "prompt_cache_key").String(); got != "cache-key-1" { t.Fatalf("prompt_cache_key = %q, want %q", got, "cache-key-1") diff --git a/sdk/api/handlers/handlers.go b/sdk/api/handlers/handlers.go index 8679f1a1..5fc1154e 100644 --- a/sdk/api/handlers/handlers.go +++ b/sdk/api/handlers/handlers.go @@ -216,13 +216,31 @@ func requestExecutionMetadata(ctx context.Context) map[string]any { } else if ctx != nil { if ginCtx, ok := ctx.Value("gin").(*gin.Context); ok && ginCtx != nil { if apiKey, exists := ginCtx.Get("apiKey"); exists && apiKey != nil { - meta[authAffinityMetadataKey] = fmt.Sprintf("principal:%v", apiKey) + if principal := stablePrincipalMetadataKey(apiKey); principal != "" { + meta[authAffinityMetadataKey] = principal + } } } } return meta } +func stablePrincipalMetadataKey(raw any) string { + var keyStr string + switch v := raw.(type) { + case string: + keyStr = v + case fmt.Stringer: + keyStr = v.String() + default: + keyStr = fmt.Sprintf("%v", raw) + } + if trimmed := strings.TrimSpace(keyStr); trimmed != "" { + return "principal:" + trimmed + } + return "" +} + func pinnedAuthIDFromContext(ctx context.Context) string { if ctx == nil { return "" diff --git a/sdk/cliproxy/auth/conductor.go b/sdk/cliproxy/auth/conductor.go index 7a62f852..d7736cf4 100644 --- a/sdk/cliproxy/auth/conductor.go +++ b/sdk/cliproxy/auth/conductor.go @@ -2271,6 +2271,20 @@ func (m *Manager) AuthAffinity(key string) string { return strings.TrimSpace(m.affinity[key]) } +func (m *Manager) applyAuthAffinity(opts *cliproxyexecutor.Options) { + if m == nil || opts == nil || pinnedAuthIDFromMetadata(opts.Metadata) != "" { + return + } + if affinityKey := authAffinityKeyFromMetadata(opts.Metadata); affinityKey != "" { + if affinityAuthID := m.AuthAffinity(affinityKey); affinityAuthID != "" { + if opts.Metadata == nil { + opts.Metadata = make(map[string]any) + } + opts.Metadata[cliproxyexecutor.PinnedAuthMetadataKey] = affinityAuthID + } + } +} + func (m *Manager) SetAuthAffinity(key, authID string) { key = strings.TrimSpace(key) authID = strings.TrimSpace(authID) @@ -2378,18 +2392,7 @@ func (m *Manager) pickNextLegacy(ctx context.Context, provider, model string, op } func (m *Manager) pickNext(ctx context.Context, provider, model string, opts cliproxyexecutor.Options, tried map[string]struct{}) (*Auth, ProviderExecutor, error) { - if pinnedAuthID := pinnedAuthIDFromMetadata(opts.Metadata); pinnedAuthID == "" { - if affinityKey := authAffinityKeyFromMetadata(opts.Metadata); affinityKey != "" { - if affinityAuthID := m.AuthAffinity(affinityKey); affinityAuthID != "" { - meta := opts.Metadata - if meta == nil { - meta = make(map[string]any) - opts.Metadata = meta - } - meta[cliproxyexecutor.PinnedAuthMetadataKey] = affinityAuthID - } - } - } + m.applyAuthAffinity(&opts) if !m.useSchedulerFastPath() { return m.pickNextLegacy(ctx, provider, model, opts, tried) } @@ -2504,18 +2507,7 @@ func (m *Manager) pickNextMixedLegacy(ctx context.Context, providers []string, m } func (m *Manager) pickNextMixed(ctx context.Context, providers []string, model string, opts cliproxyexecutor.Options, tried map[string]struct{}) (*Auth, ProviderExecutor, string, error) { - if pinnedAuthID := pinnedAuthIDFromMetadata(opts.Metadata); pinnedAuthID == "" { - if affinityKey := authAffinityKeyFromMetadata(opts.Metadata); affinityKey != "" { - if affinityAuthID := m.AuthAffinity(affinityKey); affinityAuthID != "" { - meta := opts.Metadata - if meta == nil { - meta = make(map[string]any) - opts.Metadata = meta - } - meta[cliproxyexecutor.PinnedAuthMetadataKey] = affinityAuthID - } - } - } + m.applyAuthAffinity(&opts) if !m.useSchedulerFastPath() { return m.pickNextMixedLegacy(ctx, providers, model, opts, tried) } From 26eca8b6ba66783cff8fd5747dc630b6e4d9bb14 Mon Sep 17 00:00:00 2001 From: VooDisss Date: Fri, 27 Mar 2026 18:27:33 +0200 Subject: [PATCH 3/8] fix(codex): preserve continuity and safe affinity fallback Restore Claude continuity after the continuity refactor, keep auth-affinity keys out of upstream Codex session identifiers, and only persist affinity after successful execution so retries can still rotate to healthy credentials when the first auth fails. --- internal/runtime/executor/codex_continuity.go | 3 -- internal/runtime/executor/codex_executor.go | 1 + .../executor/codex_executor_cache_test.go | 42 +++++++++++++++++++ .../executor/codex_websockets_executor.go | 1 + .../codex_websockets_executor_test.go | 20 +++++++++ sdk/cliproxy/auth/conductor.go | 33 +++++++-------- 6 files changed, 79 insertions(+), 21 deletions(-) diff --git a/internal/runtime/executor/codex_continuity.go b/internal/runtime/executor/codex_continuity.go index 3ebb721f..e2fa8de0 100644 --- a/internal/runtime/executor/codex_continuity.go +++ b/internal/runtime/executor/codex_continuity.go @@ -57,9 +57,6 @@ func resolveCodexContinuity(ctx context.Context, auth *cliproxyauth.Auth, req cl if executionSession := metadataString(opts.Metadata, cliproxyexecutor.ExecutionSessionMetadataKey); executionSession != "" { return codexContinuity{Key: executionSession, Source: "execution_session"} } - if affinityKey := metadataString(opts.Metadata, codexAuthAffinityMetadataKey); affinityKey != "" { - return codexContinuity{Key: affinityKey, Source: "auth_affinity"} - } if ginCtx := ginContextFrom(ctx); ginCtx != nil { if ginCtx.Request != nil { if v := strings.TrimSpace(ginCtx.GetHeader("Idempotency-Key")); v != "" { diff --git a/internal/runtime/executor/codex_executor.go b/internal/runtime/executor/codex_executor.go index 766a081a..5f06ace2 100644 --- a/internal/runtime/executor/codex_executor.go +++ b/internal/runtime/executor/codex_executor.go @@ -612,6 +612,7 @@ func (e *CodexExecutor) cacheHelper(ctx context.Context, auth *cliproxyauth.Auth } setCodexCache(key, cache) } + continuity = codexContinuity{Key: cache.ID, Source: "claude_user_cache"} } } else if from == "openai-response" { promptCacheKey := gjson.GetBytes(req.Payload, "prompt_cache_key") diff --git a/internal/runtime/executor/codex_executor_cache_test.go b/internal/runtime/executor/codex_executor_cache_test.go index 116b06ff..8c61a22e 100644 --- a/internal/runtime/executor/codex_executor_cache_test.go +++ b/internal/runtime/executor/codex_executor_cache_test.go @@ -151,3 +151,45 @@ func TestCodexExecutorCacheHelper_OpenAIChatCompletions_FallsBackToStableAuthID( t.Fatalf("session_id = %q, want %q", got, expected) } } + +func TestCodexExecutorCacheHelper_ClaudePreservesCacheContinuity(t *testing.T) { + executor := &CodexExecutor{} + req := cliproxyexecutor.Request{ + Model: "claude-3-7-sonnet", + Payload: []byte(`{"metadata":{"user_id":"user-1"}}`), + } + rawJSON := []byte(`{"model":"gpt-5.4","stream":true}`) + + httpReq, continuity, err := executor.cacheHelper(context.Background(), nil, sdktranslator.FromString("claude"), "https://example.com/responses", req, cliproxyexecutor.Options{}, rawJSON) + if err != nil { + t.Fatalf("cacheHelper error: %v", err) + } + if continuity.Key == "" { + t.Fatal("continuity.Key = empty, want non-empty") + } + body, err := io.ReadAll(httpReq.Body) + if err != nil { + t.Fatalf("read request body: %v", err) + } + if got := gjson.GetBytes(body, "prompt_cache_key").String(); got != continuity.Key { + t.Fatalf("prompt_cache_key = %q, want %q", got, continuity.Key) + } + if got := httpReq.Header.Get("session_id"); got != continuity.Key { + t.Fatalf("session_id = %q, want %q", got, continuity.Key) + } +} + +func TestResolveCodexContinuity_DoesNotForwardAuthAffinityKey(t *testing.T) { + req := cliproxyexecutor.Request{Payload: []byte(`{"model":"gpt-5.4"}`)} + opts := cliproxyexecutor.Options{Metadata: map[string]any{"auth_affinity_key": "principal:raw-client-secret"}} + auth := &cliproxyauth.Auth{ID: "codex-auth-1", Provider: "codex"} + + continuity := resolveCodexContinuity(context.Background(), auth, req, opts) + + if continuity.Source != "auth_id" { + t.Fatalf("continuity.Source = %q, want %q", continuity.Source, "auth_id") + } + if continuity.Key == "principal:raw-client-secret" { + t.Fatal("continuity.Key leaked raw auth affinity key") + } +} diff --git a/internal/runtime/executor/codex_websockets_executor.go b/internal/runtime/executor/codex_websockets_executor.go index d0dd22c3..50cc736d 100644 --- a/internal/runtime/executor/codex_websockets_executor.go +++ b/internal/runtime/executor/codex_websockets_executor.go @@ -783,6 +783,7 @@ func applyCodexPromptCacheHeaders(ctx context.Context, auth *cliproxyauth.Auth, } setCodexCache(key, cache) } + continuity = codexContinuity{Key: cache.ID, Source: "claude_user_cache"} } } else if from == "openai-response" { if promptCacheKey := gjson.GetBytes(req.Payload, "prompt_cache_key"); promptCacheKey.Exists() { diff --git a/internal/runtime/executor/codex_websockets_executor_test.go b/internal/runtime/executor/codex_websockets_executor_test.go index e86036bc..0a06982f 100644 --- a/internal/runtime/executor/codex_websockets_executor_test.go +++ b/internal/runtime/executor/codex_websockets_executor_test.go @@ -57,6 +57,26 @@ func TestApplyCodexPromptCacheHeaders_PreservesPromptCacheRetention(t *testing.T } } +func TestApplyCodexPromptCacheHeaders_ClaudePreservesContinuity(t *testing.T) { + req := cliproxyexecutor.Request{ + Model: "claude-3-7-sonnet", + Payload: []byte(`{"metadata":{"user_id":"user-1"}}`), + } + body := []byte(`{"model":"gpt-5.4","stream":true}`) + + updatedBody, headers, continuity := applyCodexPromptCacheHeaders(context.Background(), nil, sdktranslator.FromString("claude"), req, cliproxyexecutor.Options{}, body) + + if continuity.Key == "" { + t.Fatal("continuity.Key = empty, want non-empty") + } + if got := gjson.GetBytes(updatedBody, "prompt_cache_key").String(); got != continuity.Key { + t.Fatalf("prompt_cache_key = %q, want %q", got, continuity.Key) + } + if got := headers.Get("session_id"); got != continuity.Key { + t.Fatalf("session_id = %q, want %q", got, continuity.Key) + } +} + func TestApplyCodexWebsocketHeadersDefaultsToCurrentResponsesBeta(t *testing.T) { headers := applyCodexWebsocketHeaders(context.Background(), http.Header{}, nil, "", nil) diff --git a/sdk/cliproxy/auth/conductor.go b/sdk/cliproxy/auth/conductor.go index d7736cf4..6ef13baa 100644 --- a/sdk/cliproxy/auth/conductor.go +++ b/sdk/cliproxy/auth/conductor.go @@ -1093,12 +1093,6 @@ func (m *Manager) executeMixedOnce(ctx context.Context, providers []string, req entry := logEntryWithRequestID(ctx) debugLogAuthSelection(entry, auth, provider, req.Model) publishSelectedAuthMetadata(opts.Metadata, auth.ID) - if affinityKey := authAffinityKeyFromMetadata(opts.Metadata); affinityKey != "" { - m.SetAuthAffinity(affinityKey, auth.ID) - if log.IsLevelEnabled(log.DebugLevel) { - entry.Debugf("auth affinity pinned key=%s auth_id=%s provider=%s model=%s", affinityKey, auth.ID, provider, req.Model) - } - } tried[auth.ID] = struct{}{} execCtx := ctx @@ -1138,6 +1132,7 @@ func (m *Manager) executeMixedOnce(ctx context.Context, providers []string, req continue } m.MarkResult(execCtx, result) + m.persistAuthAffinity(entry, opts, auth.ID, provider, req.Model) return resp, nil } if authErr != nil { @@ -1177,12 +1172,6 @@ func (m *Manager) executeCountMixedOnce(ctx context.Context, providers []string, entry := logEntryWithRequestID(ctx) debugLogAuthSelection(entry, auth, provider, req.Model) publishSelectedAuthMetadata(opts.Metadata, auth.ID) - if affinityKey := authAffinityKeyFromMetadata(opts.Metadata); affinityKey != "" { - m.SetAuthAffinity(affinityKey, auth.ID) - if log.IsLevelEnabled(log.DebugLevel) { - entry.Debugf("auth affinity pinned key=%s auth_id=%s provider=%s model=%s", affinityKey, auth.ID, provider, req.Model) - } - } tried[auth.ID] = struct{}{} execCtx := ctx @@ -1222,6 +1211,7 @@ func (m *Manager) executeCountMixedOnce(ctx context.Context, providers []string, continue } m.MarkResult(execCtx, result) + m.persistAuthAffinity(entry, opts, auth.ID, provider, req.Model) return resp, nil } if authErr != nil { @@ -1269,12 +1259,6 @@ func (m *Manager) executeStreamMixedOnce(ctx context.Context, providers []string entry := logEntryWithRequestID(ctx) debugLogAuthSelection(entry, auth, provider, req.Model) publishSelectedAuthMetadata(opts.Metadata, auth.ID) - if affinityKey := authAffinityKeyFromMetadata(opts.Metadata); affinityKey != "" { - m.SetAuthAffinity(affinityKey, auth.ID) - if log.IsLevelEnabled(log.DebugLevel) { - entry.Debugf("auth affinity pinned key=%s auth_id=%s provider=%s model=%s", affinityKey, auth.ID, provider, req.Model) - } - } tried[auth.ID] = struct{}{} execCtx := ctx @@ -1298,6 +1282,7 @@ func (m *Manager) executeStreamMixedOnce(ctx context.Context, providers []string lastErr = errStream continue } + m.persistAuthAffinity(entry, opts, auth.ID, provider, req.Model) return streamResult, nil } } @@ -2285,6 +2270,18 @@ func (m *Manager) applyAuthAffinity(opts *cliproxyexecutor.Options) { } } +func (m *Manager) persistAuthAffinity(entry *log.Entry, opts cliproxyexecutor.Options, authID, provider, model string) { + if m == nil { + return + } + if affinityKey := authAffinityKeyFromMetadata(opts.Metadata); affinityKey != "" { + m.SetAuthAffinity(affinityKey, authID) + if entry != nil && log.IsLevelEnabled(log.DebugLevel) { + entry.Debugf("auth affinity pinned key=%s auth_id=%s provider=%s model=%s", affinityKey, authID, provider, model) + } + } +} + func (m *Manager) SetAuthAffinity(key, authID string) { key = strings.TrimSpace(key) authID = strings.TrimSpace(authID) From 4c4cbd44dab25856182b8dec8c887c519465e512 Mon Sep 17 00:00:00 2001 From: VooDisss Date: Fri, 27 Mar 2026 18:34:51 +0200 Subject: [PATCH 4/8] fix(auth): avoid leaking or over-persisting affinity keys Stop using one-shot idempotency keys as long-lived auth-affinity identifiers and remove raw affinity-key values from debug logs so sticky routing keeps its continuity benefits without creating avoidable memory growth or credential exposure risks. --- sdk/api/handlers/handlers.go | 2 -- sdk/cliproxy/auth/conductor.go | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/sdk/api/handlers/handlers.go b/sdk/api/handlers/handlers.go index 5fc1154e..420d1fcc 100644 --- a/sdk/api/handlers/handlers.go +++ b/sdk/api/handlers/handlers.go @@ -211,8 +211,6 @@ func requestExecutionMetadata(ctx context.Context) map[string]any { if executionSessionID := executionSessionIDFromContext(ctx); executionSessionID != "" { meta[coreexecutor.ExecutionSessionMetadataKey] = executionSessionID meta[authAffinityMetadataKey] = executionSessionID - } else if explicitIdempotencyKey != "" { - meta[authAffinityMetadataKey] = explicitIdempotencyKey } else if ctx != nil { if ginCtx, ok := ctx.Value("gin").(*gin.Context); ok && ginCtx != nil { if apiKey, exists := ginCtx.Get("apiKey"); exists && apiKey != nil { diff --git a/sdk/cliproxy/auth/conductor.go b/sdk/cliproxy/auth/conductor.go index 6ef13baa..147b0ece 100644 --- a/sdk/cliproxy/auth/conductor.go +++ b/sdk/cliproxy/auth/conductor.go @@ -2277,7 +2277,7 @@ func (m *Manager) persistAuthAffinity(entry *log.Entry, opts cliproxyexecutor.Op if affinityKey := authAffinityKeyFromMetadata(opts.Metadata); affinityKey != "" { m.SetAuthAffinity(affinityKey, authID) if entry != nil && log.IsLevelEnabled(log.DebugLevel) { - entry.Debugf("auth affinity pinned key=%s auth_id=%s provider=%s model=%s", affinityKey, authID, provider, model) + entry.Debugf("auth affinity pinned auth_id=%s provider=%s model=%s", authID, provider, model) } } } From 6962e09dd945120760b3ae3166d9e9480a1afd7a Mon Sep 17 00:00:00 2001 From: VooDisss Date: Fri, 27 Mar 2026 18:52:58 +0200 Subject: [PATCH 5/8] fix(auth): scope affinity by provider Keep sticky auth affinity limited to matching providers and stop persisting execution-session IDs as long-lived affinity keys so provider switching and normal streaming traffic do not create incorrect pins or stale affinity state. --- sdk/api/handlers/handlers.go | 1 - sdk/cliproxy/auth/conductor.go | 40 +++++++++++++++----- sdk/cliproxy/auth/conductor_affinity_test.go | 25 +++++++++--- 3 files changed, 50 insertions(+), 16 deletions(-) diff --git a/sdk/api/handlers/handlers.go b/sdk/api/handlers/handlers.go index 420d1fcc..69dd6007 100644 --- a/sdk/api/handlers/handlers.go +++ b/sdk/api/handlers/handlers.go @@ -210,7 +210,6 @@ func requestExecutionMetadata(ctx context.Context) map[string]any { } if executionSessionID := executionSessionIDFromContext(ctx); executionSessionID != "" { meta[coreexecutor.ExecutionSessionMetadataKey] = executionSessionID - meta[authAffinityMetadataKey] = executionSessionID } else if ctx != nil { if ginCtx, ok := ctx.Value("gin").(*gin.Context); ok && ginCtx != nil { if apiKey, exists := ginCtx.Get("apiKey"); exists && apiKey != nil { diff --git a/sdk/cliproxy/auth/conductor.go b/sdk/cliproxy/auth/conductor.go index 147b0ece..5c38654b 100644 --- a/sdk/cliproxy/auth/conductor.go +++ b/sdk/cliproxy/auth/conductor.go @@ -2246,8 +2246,17 @@ func authAffinityKeyFromMetadata(meta map[string]any) string { } } -func (m *Manager) AuthAffinity(key string) string { +func scopedAuthAffinityKey(provider, key string) string { + provider = strings.TrimSpace(strings.ToLower(provider)) key = strings.TrimSpace(key) + if provider == "" || key == "" { + return "" + } + return provider + "|" + key +} + +func (m *Manager) AuthAffinity(provider, key string) string { + key = scopedAuthAffinityKey(provider, key) if m == nil || key == "" { return "" } @@ -2256,12 +2265,12 @@ func (m *Manager) AuthAffinity(key string) string { return strings.TrimSpace(m.affinity[key]) } -func (m *Manager) applyAuthAffinity(opts *cliproxyexecutor.Options) { +func (m *Manager) applyAuthAffinity(provider string, opts *cliproxyexecutor.Options) { if m == nil || opts == nil || pinnedAuthIDFromMetadata(opts.Metadata) != "" { return } if affinityKey := authAffinityKeyFromMetadata(opts.Metadata); affinityKey != "" { - if affinityAuthID := m.AuthAffinity(affinityKey); affinityAuthID != "" { + if affinityAuthID := m.AuthAffinity(provider, affinityKey); affinityAuthID != "" { if opts.Metadata == nil { opts.Metadata = make(map[string]any) } @@ -2275,15 +2284,15 @@ func (m *Manager) persistAuthAffinity(entry *log.Entry, opts cliproxyexecutor.Op return } if affinityKey := authAffinityKeyFromMetadata(opts.Metadata); affinityKey != "" { - m.SetAuthAffinity(affinityKey, authID) + m.SetAuthAffinity(provider, affinityKey, authID) if entry != nil && log.IsLevelEnabled(log.DebugLevel) { entry.Debugf("auth affinity pinned auth_id=%s provider=%s model=%s", authID, provider, model) } } } -func (m *Manager) SetAuthAffinity(key, authID string) { - key = strings.TrimSpace(key) +func (m *Manager) SetAuthAffinity(provider, key, authID string) { + key = scopedAuthAffinityKey(provider, key) authID = strings.TrimSpace(authID) if m == nil || key == "" || authID == "" { return @@ -2296,8 +2305,8 @@ func (m *Manager) SetAuthAffinity(key, authID string) { m.affinityMu.Unlock() } -func (m *Manager) ClearAuthAffinity(key string) { - key = strings.TrimSpace(key) +func (m *Manager) ClearAuthAffinity(provider, key string) { + key = scopedAuthAffinityKey(provider, key) if m == nil || key == "" { return } @@ -2389,7 +2398,7 @@ func (m *Manager) pickNextLegacy(ctx context.Context, provider, model string, op } func (m *Manager) pickNext(ctx context.Context, provider, model string, opts cliproxyexecutor.Options, tried map[string]struct{}) (*Auth, ProviderExecutor, error) { - m.applyAuthAffinity(&opts) + m.applyAuthAffinity(provider, &opts) if !m.useSchedulerFastPath() { return m.pickNextLegacy(ctx, provider, model, opts, tried) } @@ -2504,7 +2513,18 @@ func (m *Manager) pickNextMixedLegacy(ctx context.Context, providers []string, m } func (m *Manager) pickNextMixed(ctx context.Context, providers []string, model string, opts cliproxyexecutor.Options, tried map[string]struct{}) (*Auth, ProviderExecutor, string, error) { - m.applyAuthAffinity(&opts) + if pinnedAuthIDFromMetadata(opts.Metadata) == "" { + for _, provider := range providers { + providerKey := strings.TrimSpace(strings.ToLower(provider)) + if providerKey == "" { + continue + } + m.applyAuthAffinity(providerKey, &opts) + if pinnedAuthIDFromMetadata(opts.Metadata) != "" { + break + } + } + } if !m.useSchedulerFastPath() { return m.pickNextMixedLegacy(ctx, providers, model, opts, tried) } diff --git a/sdk/cliproxy/auth/conductor_affinity_test.go b/sdk/cliproxy/auth/conductor_affinity_test.go index e84f7c96..363e2367 100644 --- a/sdk/cliproxy/auth/conductor_affinity_test.go +++ b/sdk/cliproxy/auth/conductor_affinity_test.go @@ -52,7 +52,7 @@ func TestManagerPickNextMixedUsesAuthAffinity(t *testing.T) { t.Fatalf("Register(codex-b) error = %v", errRegister) } - manager.SetAuthAffinity("idem-1", "codex-b") + manager.SetAuthAffinity("codex", "idem-1", "codex-b") opts := cliproxyexecutor.Options{Metadata: map[string]any{"auth_affinity_key": "idem-1"}} got, _, provider, errPick := manager.pickNextMixed(context.Background(), []string{"codex"}, "gpt-5.4", opts, map[string]struct{}{}) @@ -74,12 +74,27 @@ func TestManagerAuthAffinityRoundTrip(t *testing.T) { t.Parallel() manager := NewManager(nil, nil, nil) - manager.SetAuthAffinity("idem-2", "auth-1") - if got := manager.AuthAffinity("idem-2"); got != "auth-1" { + manager.SetAuthAffinity("codex", "idem-2", "auth-1") + if got := manager.AuthAffinity("codex", "idem-2"); got != "auth-1" { t.Fatalf("AuthAffinity = %q, want %q", got, "auth-1") } - manager.ClearAuthAffinity("idem-2") - if got := manager.AuthAffinity("idem-2"); got != "" { + manager.ClearAuthAffinity("codex", "idem-2") + if got := manager.AuthAffinity("codex", "idem-2"); got != "" { t.Fatalf("AuthAffinity after clear = %q, want empty", got) } } + +func TestManagerAuthAffinityScopedByProvider(t *testing.T) { + t.Parallel() + + manager := NewManager(nil, nil, nil) + manager.SetAuthAffinity("codex", "shared-key", "codex-auth") + manager.SetAuthAffinity("gemini", "shared-key", "gemini-auth") + + if got := manager.AuthAffinity("codex", "shared-key"); got != "codex-auth" { + t.Fatalf("codex affinity = %q, want %q", got, "codex-auth") + } + if got := manager.AuthAffinity("gemini", "shared-key"); got != "gemini-auth" { + t.Fatalf("gemini affinity = %q, want %q", got, "gemini-auth") + } +} From 35f158d5261ee6cb7b511ade41bddbebdc08c4b8 Mon Sep 17 00:00:00 2001 From: VooDisss Date: Fri, 27 Mar 2026 19:06:34 +0200 Subject: [PATCH 6/8] refactor(pr): narrow Codex cache fix scope Remove the experimental auth-affinity routing changes from this PR so it stays focused on the validated Codex continuity fix. This keeps the prompt-cache repair while avoiding unrelated routing-policy concerns such as provider/model affinity scope, lifecycle cleanup, and hard-pin fallback semantics. --- sdk/api/handlers/handlers.go | 29 +---- sdk/cliproxy/auth/conductor.go | 120 ++----------------- sdk/cliproxy/auth/conductor_affinity_test.go | 100 ---------------- 3 files changed, 8 insertions(+), 241 deletions(-) delete mode 100644 sdk/cliproxy/auth/conductor_affinity_test.go diff --git a/sdk/api/handlers/handlers.go b/sdk/api/handlers/handlers.go index 69dd6007..28ab970d 100644 --- a/sdk/api/handlers/handlers.go +++ b/sdk/api/handlers/handlers.go @@ -46,7 +46,6 @@ type ErrorDetail struct { } const idempotencyKeyMetadataKey = "idempotency_key" -const authAffinityMetadataKey = "auth_affinity_key" const ( defaultStreamingKeepAliveSeconds = 0 @@ -190,11 +189,9 @@ func requestExecutionMetadata(ctx context.Context) map[string]any { // Idempotency-Key is an optional client-supplied header used to correlate retries. // It is forwarded as execution metadata; when absent we generate a UUID. key := "" - explicitIdempotencyKey := "" if ctx != nil { if ginCtx, ok := ctx.Value("gin").(*gin.Context); ok && ginCtx != nil && ginCtx.Request != nil { - explicitIdempotencyKey = strings.TrimSpace(ginCtx.GetHeader("Idempotency-Key")) - key = explicitIdempotencyKey + key = strings.TrimSpace(ginCtx.GetHeader("Idempotency-Key")) } } if key == "" { @@ -210,34 +207,10 @@ func requestExecutionMetadata(ctx context.Context) map[string]any { } if executionSessionID := executionSessionIDFromContext(ctx); executionSessionID != "" { meta[coreexecutor.ExecutionSessionMetadataKey] = executionSessionID - } else if ctx != nil { - if ginCtx, ok := ctx.Value("gin").(*gin.Context); ok && ginCtx != nil { - if apiKey, exists := ginCtx.Get("apiKey"); exists && apiKey != nil { - if principal := stablePrincipalMetadataKey(apiKey); principal != "" { - meta[authAffinityMetadataKey] = principal - } - } - } } return meta } -func stablePrincipalMetadataKey(raw any) string { - var keyStr string - switch v := raw.(type) { - case string: - keyStr = v - case fmt.Stringer: - keyStr = v.String() - default: - keyStr = fmt.Sprintf("%v", raw) - } - if trimmed := strings.TrimSpace(keyStr); trimmed != "" { - return "principal:" + trimmed - } - return "" -} - func pinnedAuthIDFromContext(ctx context.Context) string { if ctx == nil { return "" diff --git a/sdk/cliproxy/auth/conductor.go b/sdk/cliproxy/auth/conductor.go index 5c38654b..9f46c7cf 100644 --- a/sdk/cliproxy/auth/conductor.go +++ b/sdk/cliproxy/auth/conductor.go @@ -128,15 +128,13 @@ func (NoopHook) OnResult(context.Context, Result) {} // Manager orchestrates auth lifecycle, selection, execution, and persistence. type Manager struct { - store Store - executors map[string]ProviderExecutor - selector Selector - hook Hook - mu sync.RWMutex - auths map[string]*Auth - scheduler *authScheduler - affinityMu sync.RWMutex - affinity map[string]string + store Store + executors map[string]ProviderExecutor + selector Selector + hook Hook + mu sync.RWMutex + auths map[string]*Auth + scheduler *authScheduler // providerOffsets tracks per-model provider rotation state for multi-provider routing. providerOffsets map[string]int @@ -181,7 +179,6 @@ func NewManager(store Store, selector Selector, hook Hook) *Manager { selector: selector, hook: hook, auths: make(map[string]*Auth), - affinity: make(map[string]string), providerOffsets: make(map[string]int), modelPoolOffsets: make(map[string]int), refreshSemaphore: make(chan struct{}, refreshMaxConcurrency), @@ -1132,7 +1129,6 @@ func (m *Manager) executeMixedOnce(ctx context.Context, providers []string, req continue } m.MarkResult(execCtx, result) - m.persistAuthAffinity(entry, opts, auth.ID, provider, req.Model) return resp, nil } if authErr != nil { @@ -1211,7 +1207,6 @@ func (m *Manager) executeCountMixedOnce(ctx context.Context, providers []string, continue } m.MarkResult(execCtx, result) - m.persistAuthAffinity(entry, opts, auth.ID, provider, req.Model) return resp, nil } if authErr != nil { @@ -1282,7 +1277,6 @@ func (m *Manager) executeStreamMixedOnce(ctx context.Context, providers []string lastErr = errStream continue } - m.persistAuthAffinity(entry, opts, auth.ID, provider, req.Model) return streamResult, nil } } @@ -2228,93 +2222,6 @@ func (m *Manager) CloseExecutionSession(sessionID string) { } } -func authAffinityKeyFromMetadata(meta map[string]any) string { - if len(meta) == 0 { - return "" - } - raw, ok := meta["auth_affinity_key"] - if !ok || raw == nil { - return "" - } - switch val := raw.(type) { - case string: - return strings.TrimSpace(val) - case []byte: - return strings.TrimSpace(string(val)) - default: - return "" - } -} - -func scopedAuthAffinityKey(provider, key string) string { - provider = strings.TrimSpace(strings.ToLower(provider)) - key = strings.TrimSpace(key) - if provider == "" || key == "" { - return "" - } - return provider + "|" + key -} - -func (m *Manager) AuthAffinity(provider, key string) string { - key = scopedAuthAffinityKey(provider, key) - if m == nil || key == "" { - return "" - } - m.affinityMu.RLock() - defer m.affinityMu.RUnlock() - return strings.TrimSpace(m.affinity[key]) -} - -func (m *Manager) applyAuthAffinity(provider string, opts *cliproxyexecutor.Options) { - if m == nil || opts == nil || pinnedAuthIDFromMetadata(opts.Metadata) != "" { - return - } - if affinityKey := authAffinityKeyFromMetadata(opts.Metadata); affinityKey != "" { - if affinityAuthID := m.AuthAffinity(provider, affinityKey); affinityAuthID != "" { - if opts.Metadata == nil { - opts.Metadata = make(map[string]any) - } - opts.Metadata[cliproxyexecutor.PinnedAuthMetadataKey] = affinityAuthID - } - } -} - -func (m *Manager) persistAuthAffinity(entry *log.Entry, opts cliproxyexecutor.Options, authID, provider, model string) { - if m == nil { - return - } - if affinityKey := authAffinityKeyFromMetadata(opts.Metadata); affinityKey != "" { - m.SetAuthAffinity(provider, affinityKey, authID) - if entry != nil && log.IsLevelEnabled(log.DebugLevel) { - entry.Debugf("auth affinity pinned auth_id=%s provider=%s model=%s", authID, provider, model) - } - } -} - -func (m *Manager) SetAuthAffinity(provider, key, authID string) { - key = scopedAuthAffinityKey(provider, key) - authID = strings.TrimSpace(authID) - if m == nil || key == "" || authID == "" { - return - } - m.affinityMu.Lock() - if m.affinity == nil { - m.affinity = make(map[string]string) - } - m.affinity[key] = authID - m.affinityMu.Unlock() -} - -func (m *Manager) ClearAuthAffinity(provider, key string) { - key = scopedAuthAffinityKey(provider, key) - if m == nil || key == "" { - return - } - m.affinityMu.Lock() - delete(m.affinity, key) - m.affinityMu.Unlock() -} - func (m *Manager) useSchedulerFastPath() bool { if m == nil || m.scheduler == nil { return false @@ -2398,7 +2305,6 @@ func (m *Manager) pickNextLegacy(ctx context.Context, provider, model string, op } func (m *Manager) pickNext(ctx context.Context, provider, model string, opts cliproxyexecutor.Options, tried map[string]struct{}) (*Auth, ProviderExecutor, error) { - m.applyAuthAffinity(provider, &opts) if !m.useSchedulerFastPath() { return m.pickNextLegacy(ctx, provider, model, opts, tried) } @@ -2513,18 +2419,6 @@ func (m *Manager) pickNextMixedLegacy(ctx context.Context, providers []string, m } func (m *Manager) pickNextMixed(ctx context.Context, providers []string, model string, opts cliproxyexecutor.Options, tried map[string]struct{}) (*Auth, ProviderExecutor, string, error) { - if pinnedAuthIDFromMetadata(opts.Metadata) == "" { - for _, provider := range providers { - providerKey := strings.TrimSpace(strings.ToLower(provider)) - if providerKey == "" { - continue - } - m.applyAuthAffinity(providerKey, &opts) - if pinnedAuthIDFromMetadata(opts.Metadata) != "" { - break - } - } - } if !m.useSchedulerFastPath() { return m.pickNextMixedLegacy(ctx, providers, model, opts, tried) } diff --git a/sdk/cliproxy/auth/conductor_affinity_test.go b/sdk/cliproxy/auth/conductor_affinity_test.go deleted file mode 100644 index 363e2367..00000000 --- a/sdk/cliproxy/auth/conductor_affinity_test.go +++ /dev/null @@ -1,100 +0,0 @@ -package auth - -import ( - "context" - "net/http" - "testing" - - "github.com/router-for-me/CLIProxyAPI/v6/internal/registry" - cliproxyexecutor "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/executor" -) - -type affinityTestExecutor struct{ id string } - -func (e affinityTestExecutor) Identifier() string { return e.id } - -func (e affinityTestExecutor) Execute(context.Context, *Auth, cliproxyexecutor.Request, cliproxyexecutor.Options) (cliproxyexecutor.Response, error) { - return cliproxyexecutor.Response{}, nil -} - -func (e affinityTestExecutor) ExecuteStream(context.Context, *Auth, cliproxyexecutor.Request, cliproxyexecutor.Options) (*cliproxyexecutor.StreamResult, error) { - ch := make(chan cliproxyexecutor.StreamChunk) - close(ch) - return &cliproxyexecutor.StreamResult{Chunks: ch}, nil -} - -func (e affinityTestExecutor) Refresh(_ context.Context, auth *Auth) (*Auth, error) { return auth, nil } - -func (e affinityTestExecutor) CountTokens(context.Context, *Auth, cliproxyexecutor.Request, cliproxyexecutor.Options) (cliproxyexecutor.Response, error) { - return cliproxyexecutor.Response{}, nil -} - -func (e affinityTestExecutor) HttpRequest(context.Context, *Auth, *http.Request) (*http.Response, error) { - return nil, nil -} - -func TestManagerPickNextMixedUsesAuthAffinity(t *testing.T) { - t.Parallel() - - manager := NewManager(nil, &RoundRobinSelector{}, nil) - manager.executors["codex"] = affinityTestExecutor{id: "codex"} - reg := registry.GetGlobalRegistry() - reg.RegisterClient("codex-a", "codex", []*registry.ModelInfo{{ID: "gpt-5.4"}}) - reg.RegisterClient("codex-b", "codex", []*registry.ModelInfo{{ID: "gpt-5.4"}}) - t.Cleanup(func() { - reg.UnregisterClient("codex-a") - reg.UnregisterClient("codex-b") - }) - if _, errRegister := manager.Register(context.Background(), &Auth{ID: "codex-a", Provider: "codex"}); errRegister != nil { - t.Fatalf("Register(codex-a) error = %v", errRegister) - } - if _, errRegister := manager.Register(context.Background(), &Auth{ID: "codex-b", Provider: "codex"}); errRegister != nil { - t.Fatalf("Register(codex-b) error = %v", errRegister) - } - - manager.SetAuthAffinity("codex", "idem-1", "codex-b") - opts := cliproxyexecutor.Options{Metadata: map[string]any{"auth_affinity_key": "idem-1"}} - - got, _, provider, errPick := manager.pickNextMixed(context.Background(), []string{"codex"}, "gpt-5.4", opts, map[string]struct{}{}) - if errPick != nil { - t.Fatalf("pickNextMixed() error = %v", errPick) - } - if provider != "codex" { - t.Fatalf("provider = %q, want %q", provider, "codex") - } - if got == nil || got.ID != "codex-b" { - t.Fatalf("auth.ID = %v, want codex-b", got) - } - if pinned := pinnedAuthIDFromMetadata(opts.Metadata); pinned != "codex-b" { - t.Fatalf("pinned auth metadata = %q, want %q", pinned, "codex-b") - } -} - -func TestManagerAuthAffinityRoundTrip(t *testing.T) { - t.Parallel() - - manager := NewManager(nil, nil, nil) - manager.SetAuthAffinity("codex", "idem-2", "auth-1") - if got := manager.AuthAffinity("codex", "idem-2"); got != "auth-1" { - t.Fatalf("AuthAffinity = %q, want %q", got, "auth-1") - } - manager.ClearAuthAffinity("codex", "idem-2") - if got := manager.AuthAffinity("codex", "idem-2"); got != "" { - t.Fatalf("AuthAffinity after clear = %q, want empty", got) - } -} - -func TestManagerAuthAffinityScopedByProvider(t *testing.T) { - t.Parallel() - - manager := NewManager(nil, nil, nil) - manager.SetAuthAffinity("codex", "shared-key", "codex-auth") - manager.SetAuthAffinity("gemini", "shared-key", "gemini-auth") - - if got := manager.AuthAffinity("codex", "shared-key"); got != "codex-auth" { - t.Fatalf("codex affinity = %q, want %q", got, "codex-auth") - } - if got := manager.AuthAffinity("gemini", "shared-key"); got != "gemini-auth" { - t.Fatalf("gemini affinity = %q, want %q", got, "gemini-auth") - } -} From 79755e76ea1d503f0a586091c851a45700d7364a Mon Sep 17 00:00:00 2001 From: VooDisss Date: Fri, 27 Mar 2026 19:34:13 +0200 Subject: [PATCH 7/8] refactor(pr): remove forbidden translator changes Drop the chat-completions translator edits from this PR so the branch complies with the repository policy that forbids pull-request changes under internal/translator. The remaining PR stays focused on the executor-level Codex continuity fix that was validated to restore cache reuse. --- .../chat-completions/codex_openai_request.go | 3 --- .../codex_openai_request_test.go | 16 ---------------- 2 files changed, 19 deletions(-) diff --git a/internal/translator/codex/openai/chat-completions/codex_openai_request.go b/internal/translator/codex/openai/chat-completions/codex_openai_request.go index 7d24d60e..6cc701e7 100644 --- a/internal/translator/codex/openai/chat-completions/codex_openai_request.go +++ b/internal/translator/codex/openai/chat-completions/codex_openai_request.go @@ -65,9 +65,6 @@ func ConvertOpenAIRequestToCodex(modelName string, inputRawJSON []byte, stream b // Model out, _ = sjson.SetBytes(out, "model", modelName) - if v := gjson.GetBytes(rawJSON, "prompt_cache_retention"); v.Exists() { - out, _ = sjson.SetBytes(out, "prompt_cache_retention", v.Value()) - } // Build tool name shortening map from original tools (if any) originalToolNameMap := map[string]string{} diff --git a/internal/translator/codex/openai/chat-completions/codex_openai_request_test.go b/internal/translator/codex/openai/chat-completions/codex_openai_request_test.go index 1202980f..84c8dad2 100644 --- a/internal/translator/codex/openai/chat-completions/codex_openai_request_test.go +++ b/internal/translator/codex/openai/chat-completions/codex_openai_request_test.go @@ -633,19 +633,3 @@ func TestToolsDefinitionTranslated(t *testing.T) { t.Errorf("tool 'search' not found in output tools: %s", gjson.Get(result, "tools").Raw) } } - -func TestPromptCacheRetentionPreserved(t *testing.T) { - input := []byte(`{ - "model": "gpt-4o", - "prompt_cache_retention": "persistent", - "messages": [ - {"role": "user", "content": "Hello"} - ] - }`) - - out := ConvertOpenAIRequestToCodex("gpt-4o", input, true) - - if got := gjson.GetBytes(out, "prompt_cache_retention").String(); got != "persistent" { - t.Fatalf("prompt_cache_retention = %q, want %q", got, "persistent") - } -} From e5d3541b5a70bdc9fcc65a69cf26f1986e57d35b Mon Sep 17 00:00:00 2001 From: VooDisss Date: Fri, 27 Mar 2026 20:40:26 +0200 Subject: [PATCH 8/8] refactor(codex): remove stale affinity cleanup leftovers Drop the last affinity-related executor artifacts so the PR stays focused on the minimal Codex continuity fix set: stable prompt cache identity, stable session_id, and the executor-only behavior that was validated to restore cache reads. --- internal/runtime/executor/codex_continuity.go | 2 -- .../runtime/executor/codex_executor_cache_test.go | 15 --------------- 2 files changed, 17 deletions(-) diff --git a/internal/runtime/executor/codex_continuity.go b/internal/runtime/executor/codex_continuity.go index e2fa8de0..9a0cd1b4 100644 --- a/internal/runtime/executor/codex_continuity.go +++ b/internal/runtime/executor/codex_continuity.go @@ -14,8 +14,6 @@ import ( "github.com/tidwall/sjson" ) -const codexAuthAffinityMetadataKey = "auth_affinity_key" - type codexContinuity struct { Key string Source string diff --git a/internal/runtime/executor/codex_executor_cache_test.go b/internal/runtime/executor/codex_executor_cache_test.go index 8c61a22e..f6def7ae 100644 --- a/internal/runtime/executor/codex_executor_cache_test.go +++ b/internal/runtime/executor/codex_executor_cache_test.go @@ -178,18 +178,3 @@ func TestCodexExecutorCacheHelper_ClaudePreservesCacheContinuity(t *testing.T) { t.Fatalf("session_id = %q, want %q", got, continuity.Key) } } - -func TestResolveCodexContinuity_DoesNotForwardAuthAffinityKey(t *testing.T) { - req := cliproxyexecutor.Request{Payload: []byte(`{"model":"gpt-5.4"}`)} - opts := cliproxyexecutor.Options{Metadata: map[string]any{"auth_affinity_key": "principal:raw-client-secret"}} - auth := &cliproxyauth.Auth{ID: "codex-auth-1", Provider: "codex"} - - continuity := resolveCodexContinuity(context.Background(), auth, req, opts) - - if continuity.Source != "auth_id" { - t.Fatalf("continuity.Source = %q, want %q", continuity.Source, "auth_id") - } - if continuity.Key == "principal:raw-client-secret" { - t.Fatal("continuity.Key leaked raw auth affinity key") - } -}