From 511b8a992e88a2ffefc806606023dc310a56b808 Mon Sep 17 00:00:00 2001 From: VooDisss Date: Fri, 27 Mar 2026 17:49:29 +0200 Subject: [PATCH] fix(codex): restore prompt cache continuity for Codex requests Prompt caching on Codex was not reliably reusable through the proxy because repeated chat-completions requests could reach the upstream without the same continuity envelope. In practice this showed up most clearly with OpenCode, where cache reads worked in the reference client but not through CLIProxyAPI, although the root cause is broader than OpenCode itself. The proxy was breaking continuity in several ways: executor-layer Codex request preparation stripped prompt_cache_retention, chat-completions translation did not preserve that field, continuity headers used a different shape than the working client behavior, and OpenAI-style Codex requests could be sent without a stable prompt_cache_key. When that happened, session_id fell back to a fresh random value per request, so upstream Codex treated repeated requests as unrelated turns instead of as part of the same cacheable context. This change fixes that by preserving caller-provided prompt_cache_retention on Codex execution paths, preserving prompt_cache_retention when translating OpenAI chat-completions requests to Codex, aligning Codex continuity headers to session_id, and introducing an explicit Codex continuity policy that derives a stable continuity key from the best available signal. The resolution order prefers an explicit prompt_cache_key, then execution session metadata, then an explicit idempotency key, then stable request-affinity metadata, then a stable client-principal hash, and finally a stable auth-ID hash when no better continuity signal exists. The same continuity key is applied to both prompt_cache_key in the request body and session_id in the request headers so repeated requests reuse the same upstream cache/session identity. The auth manager also keeps auth selection sticky for repeated request sequences, preventing otherwise-equivalent Codex requests from drifting across different upstream auth contexts and accidentally breaking cache reuse. To keep the implementation maintainable, the continuity resolution and diagnostics are centralized in a dedicated Codex continuity helper instead of being scattered across executor flow code. Regression coverage now verifies retention preservation, continuity-key precedence, stable auth-ID fallback, websocket parity, translator preservation, and auth-affinity behavior. Manual validation confirmed prompt cache reads now occur through CLIProxyAPI when using Codex via OpenCode, and the fix should also benefit other clients that rely on stable repeated Codex request continuity. --- internal/runtime/executor/codex_continuity.go | 153 ++++++++++++++++++ internal/runtime/executor/codex_executor.go | 35 ++-- .../executor/codex_executor_cache_test.go | 101 +++++++++++- .../executor/codex_websockets_executor.go | 10 +- .../codex_websockets_executor_test.go | 25 +++ .../chat-completions/codex_openai_request.go | 3 + .../codex_openai_request_test.go | 16 ++ sdk/api/handlers/handlers.go | 14 +- sdk/cliproxy/auth/conductor.go | 111 ++++++++++++- sdk/cliproxy/auth/conductor_affinity_test.go | 85 ++++++++++ 10 files changed, 516 insertions(+), 37 deletions(-) create mode 100644 internal/runtime/executor/codex_continuity.go create mode 100644 sdk/cliproxy/auth/conductor_affinity_test.go diff --git a/internal/runtime/executor/codex_continuity.go b/internal/runtime/executor/codex_continuity.go new file mode 100644 index 00000000..e7d4508f --- /dev/null +++ b/internal/runtime/executor/codex_continuity.go @@ -0,0 +1,153 @@ +package executor + +import ( + "context" + "fmt" + "net/http" + "strings" + + "github.com/google/uuid" + cliproxyauth "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth" + cliproxyexecutor "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/executor" + log "github.com/sirupsen/logrus" + "github.com/tidwall/gjson" + "github.com/tidwall/sjson" +) + +const codexAuthAffinityMetadataKey = "auth_affinity_key" + +type codexContinuity struct { + Key string + Source string +} + +func resolveCodexContinuity(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) codexContinuity { + if promptCacheKey := strings.TrimSpace(gjson.GetBytes(req.Payload, "prompt_cache_key").String()); promptCacheKey != "" { + return codexContinuity{Key: promptCacheKey, Source: "prompt_cache_key"} + } + if opts.Metadata != nil { + if raw, ok := opts.Metadata[cliproxyexecutor.ExecutionSessionMetadataKey]; ok && raw != nil { + switch v := raw.(type) { + case string: + if trimmed := strings.TrimSpace(v); trimmed != "" { + return codexContinuity{Key: trimmed, Source: "execution_session"} + } + case []byte: + if trimmed := strings.TrimSpace(string(v)); trimmed != "" { + return codexContinuity{Key: trimmed, Source: "execution_session"} + } + } + } + } + if ginCtx := ginContextFrom(ctx); ginCtx != nil { + if ginCtx.Request != nil { + if v := strings.TrimSpace(ginCtx.GetHeader("Idempotency-Key")); v != "" { + return codexContinuity{Key: v, Source: "idempotency_key"} + } + } + if v, exists := ginCtx.Get("apiKey"); exists && v != nil { + switch value := v.(type) { + case string: + if trimmed := strings.TrimSpace(value); trimmed != "" { + return codexContinuity{Key: uuid.NewSHA1(uuid.NameSpaceOID, []byte("cli-proxy-api:codex:prompt-cache:"+trimmed)).String(), Source: "client_principal"} + } + case fmt.Stringer: + if trimmed := strings.TrimSpace(value.String()); trimmed != "" { + return codexContinuity{Key: uuid.NewSHA1(uuid.NameSpaceOID, []byte("cli-proxy-api:codex:prompt-cache:"+trimmed)).String(), Source: "client_principal"} + } + default: + trimmed := strings.TrimSpace(fmt.Sprintf("%v", value)) + if trimmed != "" { + return codexContinuity{Key: uuid.NewSHA1(uuid.NameSpaceOID, []byte("cli-proxy-api:codex:prompt-cache:"+trimmed)).String(), Source: "client_principal"} + } + } + } + } + if opts.Metadata != nil { + if raw, ok := opts.Metadata[codexAuthAffinityMetadataKey]; ok && raw != nil { + switch v := raw.(type) { + case string: + if trimmed := strings.TrimSpace(v); trimmed != "" { + return codexContinuity{Key: trimmed, Source: "auth_affinity"} + } + case []byte: + if trimmed := strings.TrimSpace(string(v)); trimmed != "" { + return codexContinuity{Key: trimmed, Source: "auth_affinity"} + } + } + } + } + if auth != nil { + if authID := strings.TrimSpace(auth.ID); authID != "" { + return codexContinuity{Key: uuid.NewSHA1(uuid.NameSpaceOID, []byte("cli-proxy-api:codex:prompt-cache:auth:"+authID)).String(), Source: "auth_id"} + } + } + return codexContinuity{} +} + +func applyCodexContinuityBody(rawJSON []byte, continuity codexContinuity) []byte { + if continuity.Key == "" { + return rawJSON + } + rawJSON, _ = sjson.SetBytes(rawJSON, "prompt_cache_key", continuity.Key) + return rawJSON +} + +func applyCodexContinuityHeaders(headers http.Header, continuity codexContinuity) { + if headers == nil || continuity.Key == "" { + return + } + headers.Set("session_id", continuity.Key) +} + +func logCodexRequestDiagnostics(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options, headers http.Header, body []byte, continuity codexContinuity) { + if !log.IsLevelEnabled(log.DebugLevel) { + return + } + entry := logWithRequestID(ctx) + authID := "" + authFile := "" + if auth != nil { + authID = strings.TrimSpace(auth.ID) + authFile = strings.TrimSpace(auth.FileName) + } + selectedAuthID := "" + executionSessionID := "" + if opts.Metadata != nil { + if raw, ok := opts.Metadata[cliproxyexecutor.SelectedAuthMetadataKey]; ok && raw != nil { + switch v := raw.(type) { + case string: + selectedAuthID = strings.TrimSpace(v) + case []byte: + selectedAuthID = strings.TrimSpace(string(v)) + } + } + if raw, ok := opts.Metadata[cliproxyexecutor.ExecutionSessionMetadataKey]; ok && raw != nil { + switch v := raw.(type) { + case string: + executionSessionID = strings.TrimSpace(v) + case []byte: + executionSessionID = strings.TrimSpace(string(v)) + } + } + } + entry.Debugf( + "codex request diagnostics auth_id=%s selected_auth_id=%s auth_file=%s exec_session=%s continuity_source=%s session_id=%s prompt_cache_key=%s prompt_cache_retention=%s store=%t has_instructions=%t reasoning_effort=%s reasoning_summary=%s chatgpt_account_id=%t originator=%s model=%s source_format=%s", + authID, + selectedAuthID, + authFile, + executionSessionID, + continuity.Source, + strings.TrimSpace(headers.Get("session_id")), + gjson.GetBytes(body, "prompt_cache_key").String(), + gjson.GetBytes(body, "prompt_cache_retention").String(), + gjson.GetBytes(body, "store").Bool(), + gjson.GetBytes(body, "instructions").Exists(), + gjson.GetBytes(body, "reasoning.effort").String(), + gjson.GetBytes(body, "reasoning.summary").String(), + strings.TrimSpace(headers.Get("Chatgpt-Account-Id")) != "", + strings.TrimSpace(headers.Get("Originator")), + req.Model, + opts.SourceFormat.String(), + ) +} diff --git a/internal/runtime/executor/codex_executor.go b/internal/runtime/executor/codex_executor.go index 7e4163b8..766a081a 100644 --- a/internal/runtime/executor/codex_executor.go +++ b/internal/runtime/executor/codex_executor.go @@ -111,18 +111,18 @@ func (e *CodexExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, re body, _ = sjson.SetBytes(body, "model", baseModel) body, _ = sjson.SetBytes(body, "stream", true) body, _ = sjson.DeleteBytes(body, "previous_response_id") - body, _ = sjson.DeleteBytes(body, "prompt_cache_retention") body, _ = sjson.DeleteBytes(body, "safety_identifier") if !gjson.GetBytes(body, "instructions").Exists() { body, _ = sjson.SetBytes(body, "instructions", "") } url := strings.TrimSuffix(baseURL, "/") + "/responses" - httpReq, err := e.cacheHelper(ctx, from, url, req, body) + httpReq, continuity, err := e.cacheHelper(ctx, auth, from, url, req, opts, body) if err != nil { return resp, err } applyCodexHeaders(httpReq, auth, apiKey, true, e.cfg) + logCodexRequestDiagnostics(ctx, auth, req, opts, httpReq.Header, body, continuity) var authID, authLabel, authType, authValue string if auth != nil { authID = auth.ID @@ -222,11 +222,12 @@ func (e *CodexExecutor) executeCompact(ctx context.Context, auth *cliproxyauth.A body, _ = sjson.DeleteBytes(body, "stream") url := strings.TrimSuffix(baseURL, "/") + "/responses/compact" - httpReq, err := e.cacheHelper(ctx, from, url, req, body) + httpReq, continuity, err := e.cacheHelper(ctx, auth, from, url, req, opts, body) if err != nil { return resp, err } applyCodexHeaders(httpReq, auth, apiKey, false, e.cfg) + logCodexRequestDiagnostics(ctx, auth, req, opts, httpReq.Header, body, continuity) var authID, authLabel, authType, authValue string if auth != nil { authID = auth.ID @@ -309,7 +310,6 @@ func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au requestedModel := payloadRequestedModel(opts, req.Model) body = applyPayloadConfigWithRoot(e.cfg, baseModel, to.String(), "", body, originalTranslated, requestedModel) body, _ = sjson.DeleteBytes(body, "previous_response_id") - body, _ = sjson.DeleteBytes(body, "prompt_cache_retention") body, _ = sjson.DeleteBytes(body, "safety_identifier") body, _ = sjson.SetBytes(body, "model", baseModel) if !gjson.GetBytes(body, "instructions").Exists() { @@ -317,11 +317,12 @@ func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au } url := strings.TrimSuffix(baseURL, "/") + "/responses" - httpReq, err := e.cacheHelper(ctx, from, url, req, body) + httpReq, continuity, err := e.cacheHelper(ctx, auth, from, url, req, opts, body) if err != nil { return nil, err } applyCodexHeaders(httpReq, auth, apiKey, true, e.cfg) + logCodexRequestDiagnostics(ctx, auth, req, opts, httpReq.Header, body, continuity) var authID, authLabel, authType, authValue string if auth != nil { authID = auth.ID @@ -596,8 +597,9 @@ func (e *CodexExecutor) Refresh(ctx context.Context, auth *cliproxyauth.Auth) (* return auth, nil } -func (e *CodexExecutor) cacheHelper(ctx context.Context, from sdktranslator.Format, url string, req cliproxyexecutor.Request, rawJSON []byte) (*http.Request, error) { +func (e *CodexExecutor) cacheHelper(ctx context.Context, auth *cliproxyauth.Auth, from sdktranslator.Format, url string, req cliproxyexecutor.Request, opts cliproxyexecutor.Options, rawJSON []byte) (*http.Request, codexContinuity, error) { var cache codexCache + continuity := codexContinuity{} if from == "claude" { userIDResult := gjson.GetBytes(req.Payload, "metadata.user_id") if userIDResult.Exists() { @@ -615,25 +617,20 @@ func (e *CodexExecutor) cacheHelper(ctx context.Context, from sdktranslator.Form promptCacheKey := gjson.GetBytes(req.Payload, "prompt_cache_key") if promptCacheKey.Exists() { cache.ID = promptCacheKey.String() + continuity = codexContinuity{Key: cache.ID, Source: "prompt_cache_key"} } } else if from == "openai" { - if apiKey := strings.TrimSpace(apiKeyFromContext(ctx)); apiKey != "" { - cache.ID = uuid.NewSHA1(uuid.NameSpaceOID, []byte("cli-proxy-api:codex:prompt-cache:"+apiKey)).String() - } + continuity = resolveCodexContinuity(ctx, auth, req, opts) + cache.ID = continuity.Key } - if cache.ID != "" { - rawJSON, _ = sjson.SetBytes(rawJSON, "prompt_cache_key", cache.ID) - } + rawJSON = applyCodexContinuityBody(rawJSON, continuity) httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(rawJSON)) if err != nil { - return nil, err + return nil, continuity, err } - if cache.ID != "" { - httpReq.Header.Set("Conversation_id", cache.ID) - httpReq.Header.Set("Session_id", cache.ID) - } - return httpReq, nil + applyCodexContinuityHeaders(httpReq.Header, continuity) + return httpReq, continuity, nil } func applyCodexHeaders(r *http.Request, auth *cliproxyauth.Auth, token string, stream bool, cfg *config.Config) { @@ -646,7 +643,7 @@ func applyCodexHeaders(r *http.Request, auth *cliproxyauth.Auth, token string, s } misc.EnsureHeader(r.Header, ginHeaders, "Version", "") - misc.EnsureHeader(r.Header, ginHeaders, "Session_id", uuid.NewString()) + misc.EnsureHeader(r.Header, ginHeaders, "session_id", uuid.NewString()) misc.EnsureHeader(r.Header, ginHeaders, "X-Codex-Turn-Metadata", "") misc.EnsureHeader(r.Header, ginHeaders, "X-Client-Request-Id", "") cfgUserAgent, _ := codexHeaderDefaults(cfg, auth) diff --git a/internal/runtime/executor/codex_executor_cache_test.go b/internal/runtime/executor/codex_executor_cache_test.go index d6dca031..116b06ff 100644 --- a/internal/runtime/executor/codex_executor_cache_test.go +++ b/internal/runtime/executor/codex_executor_cache_test.go @@ -8,6 +8,7 @@ import ( "github.com/gin-gonic/gin" "github.com/google/uuid" + cliproxyauth "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth" cliproxyexecutor "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/executor" sdktranslator "github.com/router-for-me/CLIProxyAPI/v6/sdk/translator" "github.com/tidwall/gjson" @@ -27,7 +28,7 @@ func TestCodexExecutorCacheHelper_OpenAIChatCompletions_StablePromptCacheKeyFrom } url := "https://example.com/responses" - httpReq, err := executor.cacheHelper(ctx, sdktranslator.FromString("openai"), url, req, rawJSON) + httpReq, _, err := executor.cacheHelper(ctx, nil, sdktranslator.FromString("openai"), url, req, cliproxyexecutor.Options{}, rawJSON) if err != nil { t.Fatalf("cacheHelper error: %v", err) } @@ -42,14 +43,14 @@ func TestCodexExecutorCacheHelper_OpenAIChatCompletions_StablePromptCacheKeyFrom if gotKey != expectedKey { t.Fatalf("prompt_cache_key = %q, want %q", gotKey, expectedKey) } - if gotConversation := httpReq.Header.Get("Conversation_id"); gotConversation != expectedKey { - t.Fatalf("Conversation_id = %q, want %q", gotConversation, expectedKey) + if gotSession := httpReq.Header.Get("session_id"); gotSession != expectedKey { + t.Fatalf("session_id = %q, want %q", gotSession, expectedKey) } - if gotSession := httpReq.Header.Get("Session_id"); gotSession != expectedKey { - t.Fatalf("Session_id = %q, want %q", gotSession, expectedKey) + if got := httpReq.Header.Get("Conversation_id"); got != "" { + t.Fatalf("Conversation_id = %q, want empty", got) } - httpReq2, err := executor.cacheHelper(ctx, sdktranslator.FromString("openai"), url, req, rawJSON) + httpReq2, _, err := executor.cacheHelper(ctx, nil, sdktranslator.FromString("openai"), url, req, cliproxyexecutor.Options{}, rawJSON) if err != nil { t.Fatalf("cacheHelper error (second call): %v", err) } @@ -62,3 +63,91 @@ func TestCodexExecutorCacheHelper_OpenAIChatCompletions_StablePromptCacheKeyFrom t.Fatalf("prompt_cache_key (second call) = %q, want %q", gotKey2, expectedKey) } } + +func TestCodexExecutorCacheHelper_OpenAIResponses_PreservesPromptCacheRetention(t *testing.T) { + executor := &CodexExecutor{} + url := "https://example.com/responses" + req := cliproxyexecutor.Request{ + Model: "gpt-5.3-codex", + Payload: []byte(`{"model":"gpt-5.3-codex","prompt_cache_key":"cache-key-1","prompt_cache_retention":"persistent"}`), + } + rawJSON := []byte(`{"model":"gpt-5.3-codex","stream":true,"prompt_cache_retention":"persistent"}`) + + httpReq, _, err := executor.cacheHelper(context.Background(), nil, sdktranslator.FromString("openai-response"), url, req, cliproxyexecutor.Options{}, rawJSON) + if err != nil { + t.Fatalf("cacheHelper error: %v", err) + } + + body, err := io.ReadAll(httpReq.Body) + if err != nil { + t.Fatalf("read request body: %v", err) + } + + if got := gjson.GetBytes(body, "prompt_cache_key").String(); got != "cache-key-1" { + t.Fatalf("prompt_cache_key = %q, want %q", got, "cache-key-1") + } + if got := gjson.GetBytes(body, "prompt_cache_retention").String(); got != "persistent" { + t.Fatalf("prompt_cache_retention = %q, want %q", got, "persistent") + } + if got := httpReq.Header.Get("session_id"); got != "cache-key-1" { + t.Fatalf("session_id = %q, want %q", got, "cache-key-1") + } + if got := httpReq.Header.Get("Conversation_id"); got != "" { + t.Fatalf("Conversation_id = %q, want empty", got) + } +} + +func TestCodexExecutorCacheHelper_OpenAIChatCompletions_UsesExecutionSessionForContinuity(t *testing.T) { + executor := &CodexExecutor{} + rawJSON := []byte(`{"model":"gpt-5.4","stream":true}`) + req := cliproxyexecutor.Request{ + Model: "gpt-5.4", + Payload: []byte(`{"model":"gpt-5.4"}`), + } + opts := cliproxyexecutor.Options{Metadata: map[string]any{cliproxyexecutor.ExecutionSessionMetadataKey: "exec-session-1"}} + + httpReq, _, err := executor.cacheHelper(context.Background(), nil, sdktranslator.FromString("openai"), "https://example.com/responses", req, opts, rawJSON) + if err != nil { + t.Fatalf("cacheHelper error: %v", err) + } + + body, err := io.ReadAll(httpReq.Body) + if err != nil { + t.Fatalf("read request body: %v", err) + } + + if got := gjson.GetBytes(body, "prompt_cache_key").String(); got != "exec-session-1" { + t.Fatalf("prompt_cache_key = %q, want %q", got, "exec-session-1") + } + if got := httpReq.Header.Get("session_id"); got != "exec-session-1" { + t.Fatalf("session_id = %q, want %q", got, "exec-session-1") + } +} + +func TestCodexExecutorCacheHelper_OpenAIChatCompletions_FallsBackToStableAuthID(t *testing.T) { + executor := &CodexExecutor{} + rawJSON := []byte(`{"model":"gpt-5.4","stream":true}`) + req := cliproxyexecutor.Request{ + Model: "gpt-5.4", + Payload: []byte(`{"model":"gpt-5.4"}`), + } + auth := &cliproxyauth.Auth{ID: "codex-auth-1", Provider: "codex"} + + httpReq, _, err := executor.cacheHelper(context.Background(), auth, sdktranslator.FromString("openai"), "https://example.com/responses", req, cliproxyexecutor.Options{}, rawJSON) + if err != nil { + t.Fatalf("cacheHelper error: %v", err) + } + + body, err := io.ReadAll(httpReq.Body) + if err != nil { + t.Fatalf("read request body: %v", err) + } + + expected := uuid.NewSHA1(uuid.NameSpaceOID, []byte("cli-proxy-api:codex:prompt-cache:auth:codex-auth-1")).String() + if got := gjson.GetBytes(body, "prompt_cache_key").String(); got != expected { + t.Fatalf("prompt_cache_key = %q, want %q", got, expected) + } + if got := httpReq.Header.Get("session_id"); got != expected { + t.Fatalf("session_id = %q, want %q", got, expected) + } +} diff --git a/internal/runtime/executor/codex_websockets_executor.go b/internal/runtime/executor/codex_websockets_executor.go index fca82fe7..b8ae11ae 100644 --- a/internal/runtime/executor/codex_websockets_executor.go +++ b/internal/runtime/executor/codex_websockets_executor.go @@ -178,7 +178,6 @@ func (e *CodexWebsocketsExecutor) Execute(ctx context.Context, auth *cliproxyaut body, _ = sjson.SetBytes(body, "model", baseModel) body, _ = sjson.SetBytes(body, "stream", true) body, _ = sjson.DeleteBytes(body, "previous_response_id") - body, _ = sjson.DeleteBytes(body, "prompt_cache_retention") body, _ = sjson.DeleteBytes(body, "safety_identifier") if !gjson.GetBytes(body, "instructions").Exists() { body, _ = sjson.SetBytes(body, "instructions", "") @@ -191,6 +190,7 @@ func (e *CodexWebsocketsExecutor) Execute(ctx context.Context, auth *cliproxyaut } body, wsHeaders := applyCodexPromptCacheHeaders(from, req, body) + continuity := codexContinuity{Key: strings.TrimSpace(wsHeaders.Get("session_id"))} wsHeaders = applyCodexWebsocketHeaders(ctx, wsHeaders, auth, apiKey, e.cfg) var authID, authLabel, authType, authValue string @@ -209,6 +209,7 @@ func (e *CodexWebsocketsExecutor) Execute(ctx context.Context, auth *cliproxyaut } wsReqBody := buildCodexWebsocketRequestBody(body) + logCodexRequestDiagnostics(ctx, auth, req, opts, wsHeaders, body, continuity) recordAPIRequest(ctx, e.cfg, upstreamRequestLog{ URL: wsURL, Method: "WEBSOCKET", @@ -386,6 +387,7 @@ func (e *CodexWebsocketsExecutor) ExecuteStream(ctx context.Context, auth *clipr } body, wsHeaders := applyCodexPromptCacheHeaders(from, req, body) + continuity := codexContinuity{Key: strings.TrimSpace(wsHeaders.Get("session_id"))} wsHeaders = applyCodexWebsocketHeaders(ctx, wsHeaders, auth, apiKey, e.cfg) var authID, authLabel, authType, authValue string @@ -403,6 +405,7 @@ func (e *CodexWebsocketsExecutor) ExecuteStream(ctx context.Context, auth *clipr } wsReqBody := buildCodexWebsocketRequestBody(body) + logCodexRequestDiagnostics(ctx, auth, req, opts, wsHeaders, body, continuity) recordAPIRequest(ctx, e.cfg, upstreamRequestLog{ URL: wsURL, Method: "WEBSOCKET", @@ -790,8 +793,7 @@ func applyCodexPromptCacheHeaders(from sdktranslator.Format, req cliproxyexecuto if cache.ID != "" { rawJSON, _ = sjson.SetBytes(rawJSON, "prompt_cache_key", cache.ID) - headers.Set("Conversation_id", cache.ID) - headers.Set("Session_id", cache.ID) + headers.Set("session_id", cache.ID) } return rawJSON, headers @@ -826,7 +828,7 @@ func applyCodexWebsocketHeaders(ctx context.Context, headers http.Header, auth * betaHeader = codexResponsesWebsocketBetaHeaderValue } headers.Set("OpenAI-Beta", betaHeader) - misc.EnsureHeader(headers, ginHeaders, "Session_id", uuid.NewString()) + misc.EnsureHeader(headers, ginHeaders, "session_id", uuid.NewString()) ensureHeaderWithConfigPrecedence(headers, ginHeaders, "User-Agent", cfgUserAgent, codexUserAgent) isAPIKey := false diff --git a/internal/runtime/executor/codex_websockets_executor_test.go b/internal/runtime/executor/codex_websockets_executor_test.go index d34e7c39..733318a3 100644 --- a/internal/runtime/executor/codex_websockets_executor_test.go +++ b/internal/runtime/executor/codex_websockets_executor_test.go @@ -9,7 +9,9 @@ import ( "github.com/gin-gonic/gin" "github.com/router-for-me/CLIProxyAPI/v6/internal/config" cliproxyauth "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth" + cliproxyexecutor "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/executor" sdkconfig "github.com/router-for-me/CLIProxyAPI/v6/sdk/config" + sdktranslator "github.com/router-for-me/CLIProxyAPI/v6/sdk/translator" "github.com/tidwall/gjson" ) @@ -32,6 +34,29 @@ func TestBuildCodexWebsocketRequestBodyPreservesPreviousResponseID(t *testing.T) } } +func TestApplyCodexPromptCacheHeaders_PreservesPromptCacheRetention(t *testing.T) { + req := cliproxyexecutor.Request{ + Model: "gpt-5-codex", + Payload: []byte(`{"prompt_cache_key":"cache-key-1","prompt_cache_retention":"persistent"}`), + } + body := []byte(`{"model":"gpt-5-codex","stream":true,"prompt_cache_retention":"persistent"}`) + + updatedBody, headers := applyCodexPromptCacheHeaders(sdktranslator.FromString("openai-response"), req, body) + + if got := gjson.GetBytes(updatedBody, "prompt_cache_key").String(); got != "cache-key-1" { + t.Fatalf("prompt_cache_key = %q, want %q", got, "cache-key-1") + } + if got := gjson.GetBytes(updatedBody, "prompt_cache_retention").String(); got != "persistent" { + t.Fatalf("prompt_cache_retention = %q, want %q", got, "persistent") + } + if got := headers.Get("session_id"); got != "cache-key-1" { + t.Fatalf("session_id = %q, want %q", got, "cache-key-1") + } + if got := headers.Get("Conversation_id"); got != "" { + t.Fatalf("Conversation_id = %q, want empty", got) + } +} + func TestApplyCodexWebsocketHeadersDefaultsToCurrentResponsesBeta(t *testing.T) { headers := applyCodexWebsocketHeaders(context.Background(), http.Header{}, nil, "", nil) diff --git a/internal/translator/codex/openai/chat-completions/codex_openai_request.go b/internal/translator/codex/openai/chat-completions/codex_openai_request.go index 6cc701e7..7d24d60e 100644 --- a/internal/translator/codex/openai/chat-completions/codex_openai_request.go +++ b/internal/translator/codex/openai/chat-completions/codex_openai_request.go @@ -65,6 +65,9 @@ func ConvertOpenAIRequestToCodex(modelName string, inputRawJSON []byte, stream b // Model out, _ = sjson.SetBytes(out, "model", modelName) + if v := gjson.GetBytes(rawJSON, "prompt_cache_retention"); v.Exists() { + out, _ = sjson.SetBytes(out, "prompt_cache_retention", v.Value()) + } // Build tool name shortening map from original tools (if any) originalToolNameMap := map[string]string{} diff --git a/internal/translator/codex/openai/chat-completions/codex_openai_request_test.go b/internal/translator/codex/openai/chat-completions/codex_openai_request_test.go index 84c8dad2..1202980f 100644 --- a/internal/translator/codex/openai/chat-completions/codex_openai_request_test.go +++ b/internal/translator/codex/openai/chat-completions/codex_openai_request_test.go @@ -633,3 +633,19 @@ func TestToolsDefinitionTranslated(t *testing.T) { t.Errorf("tool 'search' not found in output tools: %s", gjson.Get(result, "tools").Raw) } } + +func TestPromptCacheRetentionPreserved(t *testing.T) { + input := []byte(`{ + "model": "gpt-4o", + "prompt_cache_retention": "persistent", + "messages": [ + {"role": "user", "content": "Hello"} + ] + }`) + + out := ConvertOpenAIRequestToCodex("gpt-4o", input, true) + + if got := gjson.GetBytes(out, "prompt_cache_retention").String(); got != "persistent" { + t.Fatalf("prompt_cache_retention = %q, want %q", got, "persistent") + } +} diff --git a/sdk/api/handlers/handlers.go b/sdk/api/handlers/handlers.go index 28ab970d..8679f1a1 100644 --- a/sdk/api/handlers/handlers.go +++ b/sdk/api/handlers/handlers.go @@ -46,6 +46,7 @@ type ErrorDetail struct { } const idempotencyKeyMetadataKey = "idempotency_key" +const authAffinityMetadataKey = "auth_affinity_key" const ( defaultStreamingKeepAliveSeconds = 0 @@ -189,9 +190,11 @@ func requestExecutionMetadata(ctx context.Context) map[string]any { // Idempotency-Key is an optional client-supplied header used to correlate retries. // It is forwarded as execution metadata; when absent we generate a UUID. key := "" + explicitIdempotencyKey := "" if ctx != nil { if ginCtx, ok := ctx.Value("gin").(*gin.Context); ok && ginCtx != nil && ginCtx.Request != nil { - key = strings.TrimSpace(ginCtx.GetHeader("Idempotency-Key")) + explicitIdempotencyKey = strings.TrimSpace(ginCtx.GetHeader("Idempotency-Key")) + key = explicitIdempotencyKey } } if key == "" { @@ -207,6 +210,15 @@ func requestExecutionMetadata(ctx context.Context) map[string]any { } if executionSessionID := executionSessionIDFromContext(ctx); executionSessionID != "" { meta[coreexecutor.ExecutionSessionMetadataKey] = executionSessionID + meta[authAffinityMetadataKey] = executionSessionID + } else if explicitIdempotencyKey != "" { + meta[authAffinityMetadataKey] = explicitIdempotencyKey + } else if ctx != nil { + if ginCtx, ok := ctx.Value("gin").(*gin.Context); ok && ginCtx != nil { + if apiKey, exists := ginCtx.Get("apiKey"); exists && apiKey != nil { + meta[authAffinityMetadataKey] = fmt.Sprintf("principal:%v", apiKey) + } + } } return meta } diff --git a/sdk/cliproxy/auth/conductor.go b/sdk/cliproxy/auth/conductor.go index 9f46c7cf..7a62f852 100644 --- a/sdk/cliproxy/auth/conductor.go +++ b/sdk/cliproxy/auth/conductor.go @@ -128,13 +128,15 @@ func (NoopHook) OnResult(context.Context, Result) {} // Manager orchestrates auth lifecycle, selection, execution, and persistence. type Manager struct { - store Store - executors map[string]ProviderExecutor - selector Selector - hook Hook - mu sync.RWMutex - auths map[string]*Auth - scheduler *authScheduler + store Store + executors map[string]ProviderExecutor + selector Selector + hook Hook + mu sync.RWMutex + auths map[string]*Auth + scheduler *authScheduler + affinityMu sync.RWMutex + affinity map[string]string // providerOffsets tracks per-model provider rotation state for multi-provider routing. providerOffsets map[string]int @@ -179,6 +181,7 @@ func NewManager(store Store, selector Selector, hook Hook) *Manager { selector: selector, hook: hook, auths: make(map[string]*Auth), + affinity: make(map[string]string), providerOffsets: make(map[string]int), modelPoolOffsets: make(map[string]int), refreshSemaphore: make(chan struct{}, refreshMaxConcurrency), @@ -1090,6 +1093,12 @@ func (m *Manager) executeMixedOnce(ctx context.Context, providers []string, req entry := logEntryWithRequestID(ctx) debugLogAuthSelection(entry, auth, provider, req.Model) publishSelectedAuthMetadata(opts.Metadata, auth.ID) + if affinityKey := authAffinityKeyFromMetadata(opts.Metadata); affinityKey != "" { + m.SetAuthAffinity(affinityKey, auth.ID) + if log.IsLevelEnabled(log.DebugLevel) { + entry.Debugf("auth affinity pinned key=%s auth_id=%s provider=%s model=%s", affinityKey, auth.ID, provider, req.Model) + } + } tried[auth.ID] = struct{}{} execCtx := ctx @@ -1168,6 +1177,12 @@ func (m *Manager) executeCountMixedOnce(ctx context.Context, providers []string, entry := logEntryWithRequestID(ctx) debugLogAuthSelection(entry, auth, provider, req.Model) publishSelectedAuthMetadata(opts.Metadata, auth.ID) + if affinityKey := authAffinityKeyFromMetadata(opts.Metadata); affinityKey != "" { + m.SetAuthAffinity(affinityKey, auth.ID) + if log.IsLevelEnabled(log.DebugLevel) { + entry.Debugf("auth affinity pinned key=%s auth_id=%s provider=%s model=%s", affinityKey, auth.ID, provider, req.Model) + } + } tried[auth.ID] = struct{}{} execCtx := ctx @@ -1254,6 +1269,12 @@ func (m *Manager) executeStreamMixedOnce(ctx context.Context, providers []string entry := logEntryWithRequestID(ctx) debugLogAuthSelection(entry, auth, provider, req.Model) publishSelectedAuthMetadata(opts.Metadata, auth.ID) + if affinityKey := authAffinityKeyFromMetadata(opts.Metadata); affinityKey != "" { + m.SetAuthAffinity(affinityKey, auth.ID) + if log.IsLevelEnabled(log.DebugLevel) { + entry.Debugf("auth affinity pinned key=%s auth_id=%s provider=%s model=%s", affinityKey, auth.ID, provider, req.Model) + } + } tried[auth.ID] = struct{}{} execCtx := ctx @@ -2222,6 +2243,58 @@ func (m *Manager) CloseExecutionSession(sessionID string) { } } +func authAffinityKeyFromMetadata(meta map[string]any) string { + if len(meta) == 0 { + return "" + } + raw, ok := meta["auth_affinity_key"] + if !ok || raw == nil { + return "" + } + switch val := raw.(type) { + case string: + return strings.TrimSpace(val) + case []byte: + return strings.TrimSpace(string(val)) + default: + return "" + } +} + +func (m *Manager) AuthAffinity(key string) string { + key = strings.TrimSpace(key) + if m == nil || key == "" { + return "" + } + m.affinityMu.RLock() + defer m.affinityMu.RUnlock() + return strings.TrimSpace(m.affinity[key]) +} + +func (m *Manager) SetAuthAffinity(key, authID string) { + key = strings.TrimSpace(key) + authID = strings.TrimSpace(authID) + if m == nil || key == "" || authID == "" { + return + } + m.affinityMu.Lock() + if m.affinity == nil { + m.affinity = make(map[string]string) + } + m.affinity[key] = authID + m.affinityMu.Unlock() +} + +func (m *Manager) ClearAuthAffinity(key string) { + key = strings.TrimSpace(key) + if m == nil || key == "" { + return + } + m.affinityMu.Lock() + delete(m.affinity, key) + m.affinityMu.Unlock() +} + func (m *Manager) useSchedulerFastPath() bool { if m == nil || m.scheduler == nil { return false @@ -2305,6 +2378,18 @@ func (m *Manager) pickNextLegacy(ctx context.Context, provider, model string, op } func (m *Manager) pickNext(ctx context.Context, provider, model string, opts cliproxyexecutor.Options, tried map[string]struct{}) (*Auth, ProviderExecutor, error) { + if pinnedAuthID := pinnedAuthIDFromMetadata(opts.Metadata); pinnedAuthID == "" { + if affinityKey := authAffinityKeyFromMetadata(opts.Metadata); affinityKey != "" { + if affinityAuthID := m.AuthAffinity(affinityKey); affinityAuthID != "" { + meta := opts.Metadata + if meta == nil { + meta = make(map[string]any) + opts.Metadata = meta + } + meta[cliproxyexecutor.PinnedAuthMetadataKey] = affinityAuthID + } + } + } if !m.useSchedulerFastPath() { return m.pickNextLegacy(ctx, provider, model, opts, tried) } @@ -2419,6 +2504,18 @@ func (m *Manager) pickNextMixedLegacy(ctx context.Context, providers []string, m } func (m *Manager) pickNextMixed(ctx context.Context, providers []string, model string, opts cliproxyexecutor.Options, tried map[string]struct{}) (*Auth, ProviderExecutor, string, error) { + if pinnedAuthID := pinnedAuthIDFromMetadata(opts.Metadata); pinnedAuthID == "" { + if affinityKey := authAffinityKeyFromMetadata(opts.Metadata); affinityKey != "" { + if affinityAuthID := m.AuthAffinity(affinityKey); affinityAuthID != "" { + meta := opts.Metadata + if meta == nil { + meta = make(map[string]any) + opts.Metadata = meta + } + meta[cliproxyexecutor.PinnedAuthMetadataKey] = affinityAuthID + } + } + } if !m.useSchedulerFastPath() { return m.pickNextMixedLegacy(ctx, providers, model, opts, tried) } diff --git a/sdk/cliproxy/auth/conductor_affinity_test.go b/sdk/cliproxy/auth/conductor_affinity_test.go new file mode 100644 index 00000000..e84f7c96 --- /dev/null +++ b/sdk/cliproxy/auth/conductor_affinity_test.go @@ -0,0 +1,85 @@ +package auth + +import ( + "context" + "net/http" + "testing" + + "github.com/router-for-me/CLIProxyAPI/v6/internal/registry" + cliproxyexecutor "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/executor" +) + +type affinityTestExecutor struct{ id string } + +func (e affinityTestExecutor) Identifier() string { return e.id } + +func (e affinityTestExecutor) Execute(context.Context, *Auth, cliproxyexecutor.Request, cliproxyexecutor.Options) (cliproxyexecutor.Response, error) { + return cliproxyexecutor.Response{}, nil +} + +func (e affinityTestExecutor) ExecuteStream(context.Context, *Auth, cliproxyexecutor.Request, cliproxyexecutor.Options) (*cliproxyexecutor.StreamResult, error) { + ch := make(chan cliproxyexecutor.StreamChunk) + close(ch) + return &cliproxyexecutor.StreamResult{Chunks: ch}, nil +} + +func (e affinityTestExecutor) Refresh(_ context.Context, auth *Auth) (*Auth, error) { return auth, nil } + +func (e affinityTestExecutor) CountTokens(context.Context, *Auth, cliproxyexecutor.Request, cliproxyexecutor.Options) (cliproxyexecutor.Response, error) { + return cliproxyexecutor.Response{}, nil +} + +func (e affinityTestExecutor) HttpRequest(context.Context, *Auth, *http.Request) (*http.Response, error) { + return nil, nil +} + +func TestManagerPickNextMixedUsesAuthAffinity(t *testing.T) { + t.Parallel() + + manager := NewManager(nil, &RoundRobinSelector{}, nil) + manager.executors["codex"] = affinityTestExecutor{id: "codex"} + reg := registry.GetGlobalRegistry() + reg.RegisterClient("codex-a", "codex", []*registry.ModelInfo{{ID: "gpt-5.4"}}) + reg.RegisterClient("codex-b", "codex", []*registry.ModelInfo{{ID: "gpt-5.4"}}) + t.Cleanup(func() { + reg.UnregisterClient("codex-a") + reg.UnregisterClient("codex-b") + }) + if _, errRegister := manager.Register(context.Background(), &Auth{ID: "codex-a", Provider: "codex"}); errRegister != nil { + t.Fatalf("Register(codex-a) error = %v", errRegister) + } + if _, errRegister := manager.Register(context.Background(), &Auth{ID: "codex-b", Provider: "codex"}); errRegister != nil { + t.Fatalf("Register(codex-b) error = %v", errRegister) + } + + manager.SetAuthAffinity("idem-1", "codex-b") + opts := cliproxyexecutor.Options{Metadata: map[string]any{"auth_affinity_key": "idem-1"}} + + got, _, provider, errPick := manager.pickNextMixed(context.Background(), []string{"codex"}, "gpt-5.4", opts, map[string]struct{}{}) + if errPick != nil { + t.Fatalf("pickNextMixed() error = %v", errPick) + } + if provider != "codex" { + t.Fatalf("provider = %q, want %q", provider, "codex") + } + if got == nil || got.ID != "codex-b" { + t.Fatalf("auth.ID = %v, want codex-b", got) + } + if pinned := pinnedAuthIDFromMetadata(opts.Metadata); pinned != "codex-b" { + t.Fatalf("pinned auth metadata = %q, want %q", pinned, "codex-b") + } +} + +func TestManagerAuthAffinityRoundTrip(t *testing.T) { + t.Parallel() + + manager := NewManager(nil, nil, nil) + manager.SetAuthAffinity("idem-2", "auth-1") + if got := manager.AuthAffinity("idem-2"); got != "auth-1" { + t.Fatalf("AuthAffinity = %q, want %q", got, "auth-1") + } + manager.ClearAuthAffinity("idem-2") + if got := manager.AuthAffinity("idem-2"); got != "" { + t.Fatalf("AuthAffinity after clear = %q, want empty", got) + } +}