diff --git a/internal/runtime/executor/codex_continuity.go b/internal/runtime/executor/codex_continuity.go new file mode 100644 index 00000000..e7d4508f --- /dev/null +++ b/internal/runtime/executor/codex_continuity.go @@ -0,0 +1,153 @@ +package executor + +import ( + "context" + "fmt" + "net/http" + "strings" + + "github.com/google/uuid" + cliproxyauth "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth" + cliproxyexecutor "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/executor" + log "github.com/sirupsen/logrus" + "github.com/tidwall/gjson" + "github.com/tidwall/sjson" +) + +const codexAuthAffinityMetadataKey = "auth_affinity_key" + +type codexContinuity struct { + Key string + Source string +} + +func resolveCodexContinuity(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) codexContinuity { + if promptCacheKey := strings.TrimSpace(gjson.GetBytes(req.Payload, "prompt_cache_key").String()); promptCacheKey != "" { + return codexContinuity{Key: promptCacheKey, Source: "prompt_cache_key"} + } + if opts.Metadata != nil { + if raw, ok := opts.Metadata[cliproxyexecutor.ExecutionSessionMetadataKey]; ok && raw != nil { + switch v := raw.(type) { + case string: + if trimmed := strings.TrimSpace(v); trimmed != "" { + return codexContinuity{Key: trimmed, Source: "execution_session"} + } + case []byte: + if trimmed := strings.TrimSpace(string(v)); trimmed != "" { + return codexContinuity{Key: trimmed, Source: "execution_session"} + } + } + } + } + if ginCtx := ginContextFrom(ctx); ginCtx != nil { + if ginCtx.Request != nil { + if v := strings.TrimSpace(ginCtx.GetHeader("Idempotency-Key")); v != "" { + return codexContinuity{Key: v, Source: "idempotency_key"} + } + } + if v, exists := ginCtx.Get("apiKey"); exists && v != nil { + switch value := v.(type) { + case string: + if trimmed := strings.TrimSpace(value); trimmed != "" { + return codexContinuity{Key: uuid.NewSHA1(uuid.NameSpaceOID, []byte("cli-proxy-api:codex:prompt-cache:"+trimmed)).String(), Source: "client_principal"} + } + case fmt.Stringer: + if trimmed := strings.TrimSpace(value.String()); trimmed != "" { + return codexContinuity{Key: uuid.NewSHA1(uuid.NameSpaceOID, []byte("cli-proxy-api:codex:prompt-cache:"+trimmed)).String(), Source: "client_principal"} + } + default: + trimmed := strings.TrimSpace(fmt.Sprintf("%v", value)) + if trimmed != "" { + return codexContinuity{Key: uuid.NewSHA1(uuid.NameSpaceOID, []byte("cli-proxy-api:codex:prompt-cache:"+trimmed)).String(), Source: "client_principal"} + } + } + } + } + if opts.Metadata != nil { + if raw, ok := opts.Metadata[codexAuthAffinityMetadataKey]; ok && raw != nil { + switch v := raw.(type) { + case string: + if trimmed := strings.TrimSpace(v); trimmed != "" { + return codexContinuity{Key: trimmed, Source: "auth_affinity"} + } + case []byte: + if trimmed := strings.TrimSpace(string(v)); trimmed != "" { + return codexContinuity{Key: trimmed, Source: "auth_affinity"} + } + } + } + } + if auth != nil { + if authID := strings.TrimSpace(auth.ID); authID != "" { + return codexContinuity{Key: uuid.NewSHA1(uuid.NameSpaceOID, []byte("cli-proxy-api:codex:prompt-cache:auth:"+authID)).String(), Source: "auth_id"} + } + } + return codexContinuity{} +} + +func applyCodexContinuityBody(rawJSON []byte, continuity codexContinuity) []byte { + if continuity.Key == "" { + return rawJSON + } + rawJSON, _ = sjson.SetBytes(rawJSON, "prompt_cache_key", continuity.Key) + return rawJSON +} + +func applyCodexContinuityHeaders(headers http.Header, continuity codexContinuity) { + if headers == nil || continuity.Key == "" { + return + } + headers.Set("session_id", continuity.Key) +} + +func logCodexRequestDiagnostics(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options, headers http.Header, body []byte, continuity codexContinuity) { + if !log.IsLevelEnabled(log.DebugLevel) { + return + } + entry := logWithRequestID(ctx) + authID := "" + authFile := "" + if auth != nil { + authID = strings.TrimSpace(auth.ID) + authFile = strings.TrimSpace(auth.FileName) + } + selectedAuthID := "" + executionSessionID := "" + if opts.Metadata != nil { + if raw, ok := opts.Metadata[cliproxyexecutor.SelectedAuthMetadataKey]; ok && raw != nil { + switch v := raw.(type) { + case string: + selectedAuthID = strings.TrimSpace(v) + case []byte: + selectedAuthID = strings.TrimSpace(string(v)) + } + } + if raw, ok := opts.Metadata[cliproxyexecutor.ExecutionSessionMetadataKey]; ok && raw != nil { + switch v := raw.(type) { + case string: + executionSessionID = strings.TrimSpace(v) + case []byte: + executionSessionID = strings.TrimSpace(string(v)) + } + } + } + entry.Debugf( + "codex request diagnostics auth_id=%s selected_auth_id=%s auth_file=%s exec_session=%s continuity_source=%s session_id=%s prompt_cache_key=%s prompt_cache_retention=%s store=%t has_instructions=%t reasoning_effort=%s reasoning_summary=%s chatgpt_account_id=%t originator=%s model=%s source_format=%s", + authID, + selectedAuthID, + authFile, + executionSessionID, + continuity.Source, + strings.TrimSpace(headers.Get("session_id")), + gjson.GetBytes(body, "prompt_cache_key").String(), + gjson.GetBytes(body, "prompt_cache_retention").String(), + gjson.GetBytes(body, "store").Bool(), + gjson.GetBytes(body, "instructions").Exists(), + gjson.GetBytes(body, "reasoning.effort").String(), + gjson.GetBytes(body, "reasoning.summary").String(), + strings.TrimSpace(headers.Get("Chatgpt-Account-Id")) != "", + strings.TrimSpace(headers.Get("Originator")), + req.Model, + opts.SourceFormat.String(), + ) +} diff --git a/internal/runtime/executor/codex_executor.go b/internal/runtime/executor/codex_executor.go index 7e4163b8..766a081a 100644 --- a/internal/runtime/executor/codex_executor.go +++ b/internal/runtime/executor/codex_executor.go @@ -111,18 +111,18 @@ func (e *CodexExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, re body, _ = sjson.SetBytes(body, "model", baseModel) body, _ = sjson.SetBytes(body, "stream", true) body, _ = sjson.DeleteBytes(body, "previous_response_id") - body, _ = sjson.DeleteBytes(body, "prompt_cache_retention") body, _ = sjson.DeleteBytes(body, "safety_identifier") if !gjson.GetBytes(body, "instructions").Exists() { body, _ = sjson.SetBytes(body, "instructions", "") } url := strings.TrimSuffix(baseURL, "/") + "/responses" - httpReq, err := e.cacheHelper(ctx, from, url, req, body) + httpReq, continuity, err := e.cacheHelper(ctx, auth, from, url, req, opts, body) if err != nil { return resp, err } applyCodexHeaders(httpReq, auth, apiKey, true, e.cfg) + logCodexRequestDiagnostics(ctx, auth, req, opts, httpReq.Header, body, continuity) var authID, authLabel, authType, authValue string if auth != nil { authID = auth.ID @@ -222,11 +222,12 @@ func (e *CodexExecutor) executeCompact(ctx context.Context, auth *cliproxyauth.A body, _ = sjson.DeleteBytes(body, "stream") url := strings.TrimSuffix(baseURL, "/") + "/responses/compact" - httpReq, err := e.cacheHelper(ctx, from, url, req, body) + httpReq, continuity, err := e.cacheHelper(ctx, auth, from, url, req, opts, body) if err != nil { return resp, err } applyCodexHeaders(httpReq, auth, apiKey, false, e.cfg) + logCodexRequestDiagnostics(ctx, auth, req, opts, httpReq.Header, body, continuity) var authID, authLabel, authType, authValue string if auth != nil { authID = auth.ID @@ -309,7 +310,6 @@ func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au requestedModel := payloadRequestedModel(opts, req.Model) body = applyPayloadConfigWithRoot(e.cfg, baseModel, to.String(), "", body, originalTranslated, requestedModel) body, _ = sjson.DeleteBytes(body, "previous_response_id") - body, _ = sjson.DeleteBytes(body, "prompt_cache_retention") body, _ = sjson.DeleteBytes(body, "safety_identifier") body, _ = sjson.SetBytes(body, "model", baseModel) if !gjson.GetBytes(body, "instructions").Exists() { @@ -317,11 +317,12 @@ func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au } url := strings.TrimSuffix(baseURL, "/") + "/responses" - httpReq, err := e.cacheHelper(ctx, from, url, req, body) + httpReq, continuity, err := e.cacheHelper(ctx, auth, from, url, req, opts, body) if err != nil { return nil, err } applyCodexHeaders(httpReq, auth, apiKey, true, e.cfg) + logCodexRequestDiagnostics(ctx, auth, req, opts, httpReq.Header, body, continuity) var authID, authLabel, authType, authValue string if auth != nil { authID = auth.ID @@ -596,8 +597,9 @@ func (e *CodexExecutor) Refresh(ctx context.Context, auth *cliproxyauth.Auth) (* return auth, nil } -func (e *CodexExecutor) cacheHelper(ctx context.Context, from sdktranslator.Format, url string, req cliproxyexecutor.Request, rawJSON []byte) (*http.Request, error) { +func (e *CodexExecutor) cacheHelper(ctx context.Context, auth *cliproxyauth.Auth, from sdktranslator.Format, url string, req cliproxyexecutor.Request, opts cliproxyexecutor.Options, rawJSON []byte) (*http.Request, codexContinuity, error) { var cache codexCache + continuity := codexContinuity{} if from == "claude" { userIDResult := gjson.GetBytes(req.Payload, "metadata.user_id") if userIDResult.Exists() { @@ -615,25 +617,20 @@ func (e *CodexExecutor) cacheHelper(ctx context.Context, from sdktranslator.Form promptCacheKey := gjson.GetBytes(req.Payload, "prompt_cache_key") if promptCacheKey.Exists() { cache.ID = promptCacheKey.String() + continuity = codexContinuity{Key: cache.ID, Source: "prompt_cache_key"} } } else if from == "openai" { - if apiKey := strings.TrimSpace(apiKeyFromContext(ctx)); apiKey != "" { - cache.ID = uuid.NewSHA1(uuid.NameSpaceOID, []byte("cli-proxy-api:codex:prompt-cache:"+apiKey)).String() - } + continuity = resolveCodexContinuity(ctx, auth, req, opts) + cache.ID = continuity.Key } - if cache.ID != "" { - rawJSON, _ = sjson.SetBytes(rawJSON, "prompt_cache_key", cache.ID) - } + rawJSON = applyCodexContinuityBody(rawJSON, continuity) httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(rawJSON)) if err != nil { - return nil, err + return nil, continuity, err } - if cache.ID != "" { - httpReq.Header.Set("Conversation_id", cache.ID) - httpReq.Header.Set("Session_id", cache.ID) - } - return httpReq, nil + applyCodexContinuityHeaders(httpReq.Header, continuity) + return httpReq, continuity, nil } func applyCodexHeaders(r *http.Request, auth *cliproxyauth.Auth, token string, stream bool, cfg *config.Config) { @@ -646,7 +643,7 @@ func applyCodexHeaders(r *http.Request, auth *cliproxyauth.Auth, token string, s } misc.EnsureHeader(r.Header, ginHeaders, "Version", "") - misc.EnsureHeader(r.Header, ginHeaders, "Session_id", uuid.NewString()) + misc.EnsureHeader(r.Header, ginHeaders, "session_id", uuid.NewString()) misc.EnsureHeader(r.Header, ginHeaders, "X-Codex-Turn-Metadata", "") misc.EnsureHeader(r.Header, ginHeaders, "X-Client-Request-Id", "") cfgUserAgent, _ := codexHeaderDefaults(cfg, auth) diff --git a/internal/runtime/executor/codex_executor_cache_test.go b/internal/runtime/executor/codex_executor_cache_test.go index d6dca031..116b06ff 100644 --- a/internal/runtime/executor/codex_executor_cache_test.go +++ b/internal/runtime/executor/codex_executor_cache_test.go @@ -8,6 +8,7 @@ import ( "github.com/gin-gonic/gin" "github.com/google/uuid" + cliproxyauth "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth" cliproxyexecutor "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/executor" sdktranslator "github.com/router-for-me/CLIProxyAPI/v6/sdk/translator" "github.com/tidwall/gjson" @@ -27,7 +28,7 @@ func TestCodexExecutorCacheHelper_OpenAIChatCompletions_StablePromptCacheKeyFrom } url := "https://example.com/responses" - httpReq, err := executor.cacheHelper(ctx, sdktranslator.FromString("openai"), url, req, rawJSON) + httpReq, _, err := executor.cacheHelper(ctx, nil, sdktranslator.FromString("openai"), url, req, cliproxyexecutor.Options{}, rawJSON) if err != nil { t.Fatalf("cacheHelper error: %v", err) } @@ -42,14 +43,14 @@ func TestCodexExecutorCacheHelper_OpenAIChatCompletions_StablePromptCacheKeyFrom if gotKey != expectedKey { t.Fatalf("prompt_cache_key = %q, want %q", gotKey, expectedKey) } - if gotConversation := httpReq.Header.Get("Conversation_id"); gotConversation != expectedKey { - t.Fatalf("Conversation_id = %q, want %q", gotConversation, expectedKey) + if gotSession := httpReq.Header.Get("session_id"); gotSession != expectedKey { + t.Fatalf("session_id = %q, want %q", gotSession, expectedKey) } - if gotSession := httpReq.Header.Get("Session_id"); gotSession != expectedKey { - t.Fatalf("Session_id = %q, want %q", gotSession, expectedKey) + if got := httpReq.Header.Get("Conversation_id"); got != "" { + t.Fatalf("Conversation_id = %q, want empty", got) } - httpReq2, err := executor.cacheHelper(ctx, sdktranslator.FromString("openai"), url, req, rawJSON) + httpReq2, _, err := executor.cacheHelper(ctx, nil, sdktranslator.FromString("openai"), url, req, cliproxyexecutor.Options{}, rawJSON) if err != nil { t.Fatalf("cacheHelper error (second call): %v", err) } @@ -62,3 +63,91 @@ func TestCodexExecutorCacheHelper_OpenAIChatCompletions_StablePromptCacheKeyFrom t.Fatalf("prompt_cache_key (second call) = %q, want %q", gotKey2, expectedKey) } } + +func TestCodexExecutorCacheHelper_OpenAIResponses_PreservesPromptCacheRetention(t *testing.T) { + executor := &CodexExecutor{} + url := "https://example.com/responses" + req := cliproxyexecutor.Request{ + Model: "gpt-5.3-codex", + Payload: []byte(`{"model":"gpt-5.3-codex","prompt_cache_key":"cache-key-1","prompt_cache_retention":"persistent"}`), + } + rawJSON := []byte(`{"model":"gpt-5.3-codex","stream":true,"prompt_cache_retention":"persistent"}`) + + httpReq, _, err := executor.cacheHelper(context.Background(), nil, sdktranslator.FromString("openai-response"), url, req, cliproxyexecutor.Options{}, rawJSON) + if err != nil { + t.Fatalf("cacheHelper error: %v", err) + } + + body, err := io.ReadAll(httpReq.Body) + if err != nil { + t.Fatalf("read request body: %v", err) + } + + if got := gjson.GetBytes(body, "prompt_cache_key").String(); got != "cache-key-1" { + t.Fatalf("prompt_cache_key = %q, want %q", got, "cache-key-1") + } + if got := gjson.GetBytes(body, "prompt_cache_retention").String(); got != "persistent" { + t.Fatalf("prompt_cache_retention = %q, want %q", got, "persistent") + } + if got := httpReq.Header.Get("session_id"); got != "cache-key-1" { + t.Fatalf("session_id = %q, want %q", got, "cache-key-1") + } + if got := httpReq.Header.Get("Conversation_id"); got != "" { + t.Fatalf("Conversation_id = %q, want empty", got) + } +} + +func TestCodexExecutorCacheHelper_OpenAIChatCompletions_UsesExecutionSessionForContinuity(t *testing.T) { + executor := &CodexExecutor{} + rawJSON := []byte(`{"model":"gpt-5.4","stream":true}`) + req := cliproxyexecutor.Request{ + Model: "gpt-5.4", + Payload: []byte(`{"model":"gpt-5.4"}`), + } + opts := cliproxyexecutor.Options{Metadata: map[string]any{cliproxyexecutor.ExecutionSessionMetadataKey: "exec-session-1"}} + + httpReq, _, err := executor.cacheHelper(context.Background(), nil, sdktranslator.FromString("openai"), "https://example.com/responses", req, opts, rawJSON) + if err != nil { + t.Fatalf("cacheHelper error: %v", err) + } + + body, err := io.ReadAll(httpReq.Body) + if err != nil { + t.Fatalf("read request body: %v", err) + } + + if got := gjson.GetBytes(body, "prompt_cache_key").String(); got != "exec-session-1" { + t.Fatalf("prompt_cache_key = %q, want %q", got, "exec-session-1") + } + if got := httpReq.Header.Get("session_id"); got != "exec-session-1" { + t.Fatalf("session_id = %q, want %q", got, "exec-session-1") + } +} + +func TestCodexExecutorCacheHelper_OpenAIChatCompletions_FallsBackToStableAuthID(t *testing.T) { + executor := &CodexExecutor{} + rawJSON := []byte(`{"model":"gpt-5.4","stream":true}`) + req := cliproxyexecutor.Request{ + Model: "gpt-5.4", + Payload: []byte(`{"model":"gpt-5.4"}`), + } + auth := &cliproxyauth.Auth{ID: "codex-auth-1", Provider: "codex"} + + httpReq, _, err := executor.cacheHelper(context.Background(), auth, sdktranslator.FromString("openai"), "https://example.com/responses", req, cliproxyexecutor.Options{}, rawJSON) + if err != nil { + t.Fatalf("cacheHelper error: %v", err) + } + + body, err := io.ReadAll(httpReq.Body) + if err != nil { + t.Fatalf("read request body: %v", err) + } + + expected := uuid.NewSHA1(uuid.NameSpaceOID, []byte("cli-proxy-api:codex:prompt-cache:auth:codex-auth-1")).String() + if got := gjson.GetBytes(body, "prompt_cache_key").String(); got != expected { + t.Fatalf("prompt_cache_key = %q, want %q", got, expected) + } + if got := httpReq.Header.Get("session_id"); got != expected { + t.Fatalf("session_id = %q, want %q", got, expected) + } +} diff --git a/internal/runtime/executor/codex_websockets_executor.go b/internal/runtime/executor/codex_websockets_executor.go index fca82fe7..b8ae11ae 100644 --- a/internal/runtime/executor/codex_websockets_executor.go +++ b/internal/runtime/executor/codex_websockets_executor.go @@ -178,7 +178,6 @@ func (e *CodexWebsocketsExecutor) Execute(ctx context.Context, auth *cliproxyaut body, _ = sjson.SetBytes(body, "model", baseModel) body, _ = sjson.SetBytes(body, "stream", true) body, _ = sjson.DeleteBytes(body, "previous_response_id") - body, _ = sjson.DeleteBytes(body, "prompt_cache_retention") body, _ = sjson.DeleteBytes(body, "safety_identifier") if !gjson.GetBytes(body, "instructions").Exists() { body, _ = sjson.SetBytes(body, "instructions", "") @@ -191,6 +190,7 @@ func (e *CodexWebsocketsExecutor) Execute(ctx context.Context, auth *cliproxyaut } body, wsHeaders := applyCodexPromptCacheHeaders(from, req, body) + continuity := codexContinuity{Key: strings.TrimSpace(wsHeaders.Get("session_id"))} wsHeaders = applyCodexWebsocketHeaders(ctx, wsHeaders, auth, apiKey, e.cfg) var authID, authLabel, authType, authValue string @@ -209,6 +209,7 @@ func (e *CodexWebsocketsExecutor) Execute(ctx context.Context, auth *cliproxyaut } wsReqBody := buildCodexWebsocketRequestBody(body) + logCodexRequestDiagnostics(ctx, auth, req, opts, wsHeaders, body, continuity) recordAPIRequest(ctx, e.cfg, upstreamRequestLog{ URL: wsURL, Method: "WEBSOCKET", @@ -386,6 +387,7 @@ func (e *CodexWebsocketsExecutor) ExecuteStream(ctx context.Context, auth *clipr } body, wsHeaders := applyCodexPromptCacheHeaders(from, req, body) + continuity := codexContinuity{Key: strings.TrimSpace(wsHeaders.Get("session_id"))} wsHeaders = applyCodexWebsocketHeaders(ctx, wsHeaders, auth, apiKey, e.cfg) var authID, authLabel, authType, authValue string @@ -403,6 +405,7 @@ func (e *CodexWebsocketsExecutor) ExecuteStream(ctx context.Context, auth *clipr } wsReqBody := buildCodexWebsocketRequestBody(body) + logCodexRequestDiagnostics(ctx, auth, req, opts, wsHeaders, body, continuity) recordAPIRequest(ctx, e.cfg, upstreamRequestLog{ URL: wsURL, Method: "WEBSOCKET", @@ -790,8 +793,7 @@ func applyCodexPromptCacheHeaders(from sdktranslator.Format, req cliproxyexecuto if cache.ID != "" { rawJSON, _ = sjson.SetBytes(rawJSON, "prompt_cache_key", cache.ID) - headers.Set("Conversation_id", cache.ID) - headers.Set("Session_id", cache.ID) + headers.Set("session_id", cache.ID) } return rawJSON, headers @@ -826,7 +828,7 @@ func applyCodexWebsocketHeaders(ctx context.Context, headers http.Header, auth * betaHeader = codexResponsesWebsocketBetaHeaderValue } headers.Set("OpenAI-Beta", betaHeader) - misc.EnsureHeader(headers, ginHeaders, "Session_id", uuid.NewString()) + misc.EnsureHeader(headers, ginHeaders, "session_id", uuid.NewString()) ensureHeaderWithConfigPrecedence(headers, ginHeaders, "User-Agent", cfgUserAgent, codexUserAgent) isAPIKey := false diff --git a/internal/runtime/executor/codex_websockets_executor_test.go b/internal/runtime/executor/codex_websockets_executor_test.go index d34e7c39..733318a3 100644 --- a/internal/runtime/executor/codex_websockets_executor_test.go +++ b/internal/runtime/executor/codex_websockets_executor_test.go @@ -9,7 +9,9 @@ import ( "github.com/gin-gonic/gin" "github.com/router-for-me/CLIProxyAPI/v6/internal/config" cliproxyauth "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth" + cliproxyexecutor "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/executor" sdkconfig "github.com/router-for-me/CLIProxyAPI/v6/sdk/config" + sdktranslator "github.com/router-for-me/CLIProxyAPI/v6/sdk/translator" "github.com/tidwall/gjson" ) @@ -32,6 +34,29 @@ func TestBuildCodexWebsocketRequestBodyPreservesPreviousResponseID(t *testing.T) } } +func TestApplyCodexPromptCacheHeaders_PreservesPromptCacheRetention(t *testing.T) { + req := cliproxyexecutor.Request{ + Model: "gpt-5-codex", + Payload: []byte(`{"prompt_cache_key":"cache-key-1","prompt_cache_retention":"persistent"}`), + } + body := []byte(`{"model":"gpt-5-codex","stream":true,"prompt_cache_retention":"persistent"}`) + + updatedBody, headers := applyCodexPromptCacheHeaders(sdktranslator.FromString("openai-response"), req, body) + + if got := gjson.GetBytes(updatedBody, "prompt_cache_key").String(); got != "cache-key-1" { + t.Fatalf("prompt_cache_key = %q, want %q", got, "cache-key-1") + } + if got := gjson.GetBytes(updatedBody, "prompt_cache_retention").String(); got != "persistent" { + t.Fatalf("prompt_cache_retention = %q, want %q", got, "persistent") + } + if got := headers.Get("session_id"); got != "cache-key-1" { + t.Fatalf("session_id = %q, want %q", got, "cache-key-1") + } + if got := headers.Get("Conversation_id"); got != "" { + t.Fatalf("Conversation_id = %q, want empty", got) + } +} + func TestApplyCodexWebsocketHeadersDefaultsToCurrentResponsesBeta(t *testing.T) { headers := applyCodexWebsocketHeaders(context.Background(), http.Header{}, nil, "", nil) diff --git a/internal/translator/codex/openai/chat-completions/codex_openai_request.go b/internal/translator/codex/openai/chat-completions/codex_openai_request.go index 6cc701e7..7d24d60e 100644 --- a/internal/translator/codex/openai/chat-completions/codex_openai_request.go +++ b/internal/translator/codex/openai/chat-completions/codex_openai_request.go @@ -65,6 +65,9 @@ func ConvertOpenAIRequestToCodex(modelName string, inputRawJSON []byte, stream b // Model out, _ = sjson.SetBytes(out, "model", modelName) + if v := gjson.GetBytes(rawJSON, "prompt_cache_retention"); v.Exists() { + out, _ = sjson.SetBytes(out, "prompt_cache_retention", v.Value()) + } // Build tool name shortening map from original tools (if any) originalToolNameMap := map[string]string{} diff --git a/internal/translator/codex/openai/chat-completions/codex_openai_request_test.go b/internal/translator/codex/openai/chat-completions/codex_openai_request_test.go index 84c8dad2..1202980f 100644 --- a/internal/translator/codex/openai/chat-completions/codex_openai_request_test.go +++ b/internal/translator/codex/openai/chat-completions/codex_openai_request_test.go @@ -633,3 +633,19 @@ func TestToolsDefinitionTranslated(t *testing.T) { t.Errorf("tool 'search' not found in output tools: %s", gjson.Get(result, "tools").Raw) } } + +func TestPromptCacheRetentionPreserved(t *testing.T) { + input := []byte(`{ + "model": "gpt-4o", + "prompt_cache_retention": "persistent", + "messages": [ + {"role": "user", "content": "Hello"} + ] + }`) + + out := ConvertOpenAIRequestToCodex("gpt-4o", input, true) + + if got := gjson.GetBytes(out, "prompt_cache_retention").String(); got != "persistent" { + t.Fatalf("prompt_cache_retention = %q, want %q", got, "persistent") + } +} diff --git a/sdk/api/handlers/handlers.go b/sdk/api/handlers/handlers.go index 28ab970d..8679f1a1 100644 --- a/sdk/api/handlers/handlers.go +++ b/sdk/api/handlers/handlers.go @@ -46,6 +46,7 @@ type ErrorDetail struct { } const idempotencyKeyMetadataKey = "idempotency_key" +const authAffinityMetadataKey = "auth_affinity_key" const ( defaultStreamingKeepAliveSeconds = 0 @@ -189,9 +190,11 @@ func requestExecutionMetadata(ctx context.Context) map[string]any { // Idempotency-Key is an optional client-supplied header used to correlate retries. // It is forwarded as execution metadata; when absent we generate a UUID. key := "" + explicitIdempotencyKey := "" if ctx != nil { if ginCtx, ok := ctx.Value("gin").(*gin.Context); ok && ginCtx != nil && ginCtx.Request != nil { - key = strings.TrimSpace(ginCtx.GetHeader("Idempotency-Key")) + explicitIdempotencyKey = strings.TrimSpace(ginCtx.GetHeader("Idempotency-Key")) + key = explicitIdempotencyKey } } if key == "" { @@ -207,6 +210,15 @@ func requestExecutionMetadata(ctx context.Context) map[string]any { } if executionSessionID := executionSessionIDFromContext(ctx); executionSessionID != "" { meta[coreexecutor.ExecutionSessionMetadataKey] = executionSessionID + meta[authAffinityMetadataKey] = executionSessionID + } else if explicitIdempotencyKey != "" { + meta[authAffinityMetadataKey] = explicitIdempotencyKey + } else if ctx != nil { + if ginCtx, ok := ctx.Value("gin").(*gin.Context); ok && ginCtx != nil { + if apiKey, exists := ginCtx.Get("apiKey"); exists && apiKey != nil { + meta[authAffinityMetadataKey] = fmt.Sprintf("principal:%v", apiKey) + } + } } return meta } diff --git a/sdk/cliproxy/auth/conductor.go b/sdk/cliproxy/auth/conductor.go index 9f46c7cf..7a62f852 100644 --- a/sdk/cliproxy/auth/conductor.go +++ b/sdk/cliproxy/auth/conductor.go @@ -128,13 +128,15 @@ func (NoopHook) OnResult(context.Context, Result) {} // Manager orchestrates auth lifecycle, selection, execution, and persistence. type Manager struct { - store Store - executors map[string]ProviderExecutor - selector Selector - hook Hook - mu sync.RWMutex - auths map[string]*Auth - scheduler *authScheduler + store Store + executors map[string]ProviderExecutor + selector Selector + hook Hook + mu sync.RWMutex + auths map[string]*Auth + scheduler *authScheduler + affinityMu sync.RWMutex + affinity map[string]string // providerOffsets tracks per-model provider rotation state for multi-provider routing. providerOffsets map[string]int @@ -179,6 +181,7 @@ func NewManager(store Store, selector Selector, hook Hook) *Manager { selector: selector, hook: hook, auths: make(map[string]*Auth), + affinity: make(map[string]string), providerOffsets: make(map[string]int), modelPoolOffsets: make(map[string]int), refreshSemaphore: make(chan struct{}, refreshMaxConcurrency), @@ -1090,6 +1093,12 @@ func (m *Manager) executeMixedOnce(ctx context.Context, providers []string, req entry := logEntryWithRequestID(ctx) debugLogAuthSelection(entry, auth, provider, req.Model) publishSelectedAuthMetadata(opts.Metadata, auth.ID) + if affinityKey := authAffinityKeyFromMetadata(opts.Metadata); affinityKey != "" { + m.SetAuthAffinity(affinityKey, auth.ID) + if log.IsLevelEnabled(log.DebugLevel) { + entry.Debugf("auth affinity pinned key=%s auth_id=%s provider=%s model=%s", affinityKey, auth.ID, provider, req.Model) + } + } tried[auth.ID] = struct{}{} execCtx := ctx @@ -1168,6 +1177,12 @@ func (m *Manager) executeCountMixedOnce(ctx context.Context, providers []string, entry := logEntryWithRequestID(ctx) debugLogAuthSelection(entry, auth, provider, req.Model) publishSelectedAuthMetadata(opts.Metadata, auth.ID) + if affinityKey := authAffinityKeyFromMetadata(opts.Metadata); affinityKey != "" { + m.SetAuthAffinity(affinityKey, auth.ID) + if log.IsLevelEnabled(log.DebugLevel) { + entry.Debugf("auth affinity pinned key=%s auth_id=%s provider=%s model=%s", affinityKey, auth.ID, provider, req.Model) + } + } tried[auth.ID] = struct{}{} execCtx := ctx @@ -1254,6 +1269,12 @@ func (m *Manager) executeStreamMixedOnce(ctx context.Context, providers []string entry := logEntryWithRequestID(ctx) debugLogAuthSelection(entry, auth, provider, req.Model) publishSelectedAuthMetadata(opts.Metadata, auth.ID) + if affinityKey := authAffinityKeyFromMetadata(opts.Metadata); affinityKey != "" { + m.SetAuthAffinity(affinityKey, auth.ID) + if log.IsLevelEnabled(log.DebugLevel) { + entry.Debugf("auth affinity pinned key=%s auth_id=%s provider=%s model=%s", affinityKey, auth.ID, provider, req.Model) + } + } tried[auth.ID] = struct{}{} execCtx := ctx @@ -2222,6 +2243,58 @@ func (m *Manager) CloseExecutionSession(sessionID string) { } } +func authAffinityKeyFromMetadata(meta map[string]any) string { + if len(meta) == 0 { + return "" + } + raw, ok := meta["auth_affinity_key"] + if !ok || raw == nil { + return "" + } + switch val := raw.(type) { + case string: + return strings.TrimSpace(val) + case []byte: + return strings.TrimSpace(string(val)) + default: + return "" + } +} + +func (m *Manager) AuthAffinity(key string) string { + key = strings.TrimSpace(key) + if m == nil || key == "" { + return "" + } + m.affinityMu.RLock() + defer m.affinityMu.RUnlock() + return strings.TrimSpace(m.affinity[key]) +} + +func (m *Manager) SetAuthAffinity(key, authID string) { + key = strings.TrimSpace(key) + authID = strings.TrimSpace(authID) + if m == nil || key == "" || authID == "" { + return + } + m.affinityMu.Lock() + if m.affinity == nil { + m.affinity = make(map[string]string) + } + m.affinity[key] = authID + m.affinityMu.Unlock() +} + +func (m *Manager) ClearAuthAffinity(key string) { + key = strings.TrimSpace(key) + if m == nil || key == "" { + return + } + m.affinityMu.Lock() + delete(m.affinity, key) + m.affinityMu.Unlock() +} + func (m *Manager) useSchedulerFastPath() bool { if m == nil || m.scheduler == nil { return false @@ -2305,6 +2378,18 @@ func (m *Manager) pickNextLegacy(ctx context.Context, provider, model string, op } func (m *Manager) pickNext(ctx context.Context, provider, model string, opts cliproxyexecutor.Options, tried map[string]struct{}) (*Auth, ProviderExecutor, error) { + if pinnedAuthID := pinnedAuthIDFromMetadata(opts.Metadata); pinnedAuthID == "" { + if affinityKey := authAffinityKeyFromMetadata(opts.Metadata); affinityKey != "" { + if affinityAuthID := m.AuthAffinity(affinityKey); affinityAuthID != "" { + meta := opts.Metadata + if meta == nil { + meta = make(map[string]any) + opts.Metadata = meta + } + meta[cliproxyexecutor.PinnedAuthMetadataKey] = affinityAuthID + } + } + } if !m.useSchedulerFastPath() { return m.pickNextLegacy(ctx, provider, model, opts, tried) } @@ -2419,6 +2504,18 @@ func (m *Manager) pickNextMixedLegacy(ctx context.Context, providers []string, m } func (m *Manager) pickNextMixed(ctx context.Context, providers []string, model string, opts cliproxyexecutor.Options, tried map[string]struct{}) (*Auth, ProviderExecutor, string, error) { + if pinnedAuthID := pinnedAuthIDFromMetadata(opts.Metadata); pinnedAuthID == "" { + if affinityKey := authAffinityKeyFromMetadata(opts.Metadata); affinityKey != "" { + if affinityAuthID := m.AuthAffinity(affinityKey); affinityAuthID != "" { + meta := opts.Metadata + if meta == nil { + meta = make(map[string]any) + opts.Metadata = meta + } + meta[cliproxyexecutor.PinnedAuthMetadataKey] = affinityAuthID + } + } + } if !m.useSchedulerFastPath() { return m.pickNextMixedLegacy(ctx, providers, model, opts, tried) } diff --git a/sdk/cliproxy/auth/conductor_affinity_test.go b/sdk/cliproxy/auth/conductor_affinity_test.go new file mode 100644 index 00000000..e84f7c96 --- /dev/null +++ b/sdk/cliproxy/auth/conductor_affinity_test.go @@ -0,0 +1,85 @@ +package auth + +import ( + "context" + "net/http" + "testing" + + "github.com/router-for-me/CLIProxyAPI/v6/internal/registry" + cliproxyexecutor "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/executor" +) + +type affinityTestExecutor struct{ id string } + +func (e affinityTestExecutor) Identifier() string { return e.id } + +func (e affinityTestExecutor) Execute(context.Context, *Auth, cliproxyexecutor.Request, cliproxyexecutor.Options) (cliproxyexecutor.Response, error) { + return cliproxyexecutor.Response{}, nil +} + +func (e affinityTestExecutor) ExecuteStream(context.Context, *Auth, cliproxyexecutor.Request, cliproxyexecutor.Options) (*cliproxyexecutor.StreamResult, error) { + ch := make(chan cliproxyexecutor.StreamChunk) + close(ch) + return &cliproxyexecutor.StreamResult{Chunks: ch}, nil +} + +func (e affinityTestExecutor) Refresh(_ context.Context, auth *Auth) (*Auth, error) { return auth, nil } + +func (e affinityTestExecutor) CountTokens(context.Context, *Auth, cliproxyexecutor.Request, cliproxyexecutor.Options) (cliproxyexecutor.Response, error) { + return cliproxyexecutor.Response{}, nil +} + +func (e affinityTestExecutor) HttpRequest(context.Context, *Auth, *http.Request) (*http.Response, error) { + return nil, nil +} + +func TestManagerPickNextMixedUsesAuthAffinity(t *testing.T) { + t.Parallel() + + manager := NewManager(nil, &RoundRobinSelector{}, nil) + manager.executors["codex"] = affinityTestExecutor{id: "codex"} + reg := registry.GetGlobalRegistry() + reg.RegisterClient("codex-a", "codex", []*registry.ModelInfo{{ID: "gpt-5.4"}}) + reg.RegisterClient("codex-b", "codex", []*registry.ModelInfo{{ID: "gpt-5.4"}}) + t.Cleanup(func() { + reg.UnregisterClient("codex-a") + reg.UnregisterClient("codex-b") + }) + if _, errRegister := manager.Register(context.Background(), &Auth{ID: "codex-a", Provider: "codex"}); errRegister != nil { + t.Fatalf("Register(codex-a) error = %v", errRegister) + } + if _, errRegister := manager.Register(context.Background(), &Auth{ID: "codex-b", Provider: "codex"}); errRegister != nil { + t.Fatalf("Register(codex-b) error = %v", errRegister) + } + + manager.SetAuthAffinity("idem-1", "codex-b") + opts := cliproxyexecutor.Options{Metadata: map[string]any{"auth_affinity_key": "idem-1"}} + + got, _, provider, errPick := manager.pickNextMixed(context.Background(), []string{"codex"}, "gpt-5.4", opts, map[string]struct{}{}) + if errPick != nil { + t.Fatalf("pickNextMixed() error = %v", errPick) + } + if provider != "codex" { + t.Fatalf("provider = %q, want %q", provider, "codex") + } + if got == nil || got.ID != "codex-b" { + t.Fatalf("auth.ID = %v, want codex-b", got) + } + if pinned := pinnedAuthIDFromMetadata(opts.Metadata); pinned != "codex-b" { + t.Fatalf("pinned auth metadata = %q, want %q", pinned, "codex-b") + } +} + +func TestManagerAuthAffinityRoundTrip(t *testing.T) { + t.Parallel() + + manager := NewManager(nil, nil, nil) + manager.SetAuthAffinity("idem-2", "auth-1") + if got := manager.AuthAffinity("idem-2"); got != "auth-1" { + t.Fatalf("AuthAffinity = %q, want %q", got, "auth-1") + } + manager.ClearAuthAffinity("idem-2") + if got := manager.AuthAffinity("idem-2"); got != "" { + t.Fatalf("AuthAffinity after clear = %q, want empty", got) + } +}