mirror of
https://github.com/router-for-me/CLIProxyAPIPlus.git
synced 2026-04-07 13:27:21 +00:00
Prompt caching on Codex was not reliably reusable through the proxy because repeated chat-completions requests could reach the upstream without the same continuity envelope. In practice this showed up most clearly with OpenCode, where cache reads worked in the reference client but not through CLIProxyAPI, although the root cause is broader than OpenCode itself. The proxy was breaking continuity in several ways: executor-layer Codex request preparation stripped prompt_cache_retention, chat-completions translation did not preserve that field, continuity headers used a different shape than the working client behavior, and OpenAI-style Codex requests could be sent without a stable prompt_cache_key. When that happened, session_id fell back to a fresh random value per request, so upstream Codex treated repeated requests as unrelated turns instead of as part of the same cacheable context. This change fixes that by preserving caller-provided prompt_cache_retention on Codex execution paths, preserving prompt_cache_retention when translating OpenAI chat-completions requests to Codex, aligning Codex continuity headers to session_id, and introducing an explicit Codex continuity policy that derives a stable continuity key from the best available signal. The resolution order prefers an explicit prompt_cache_key, then execution session metadata, then an explicit idempotency key, then stable request-affinity metadata, then a stable client-principal hash, and finally a stable auth-ID hash when no better continuity signal exists. The same continuity key is applied to both prompt_cache_key in the request body and session_id in the request headers so repeated requests reuse the same upstream cache/session identity. The auth manager also keeps auth selection sticky for repeated request sequences, preventing otherwise-equivalent Codex requests from drifting across different upstream auth contexts and accidentally breaking cache reuse. To keep the implementation maintainable, the continuity resolution and diagnostics are centralized in a dedicated Codex continuity helper instead of being scattered across executor flow code. Regression coverage now verifies retention preservation, continuity-key precedence, stable auth-ID fallback, websocket parity, translator preservation, and auth-affinity behavior. Manual validation confirmed prompt cache reads now occur through CLIProxyAPI when using Codex via OpenCode, and the fix should also benefit other clients that rely on stable repeated Codex request continuity.
767 lines
25 KiB
Go
767 lines
25 KiB
Go
package executor
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"strings"
|
|
"time"
|
|
|
|
codexauth "github.com/router-for-me/CLIProxyAPI/v6/internal/auth/codex"
|
|
"github.com/router-for-me/CLIProxyAPI/v6/internal/config"
|
|
"github.com/router-for-me/CLIProxyAPI/v6/internal/misc"
|
|
"github.com/router-for-me/CLIProxyAPI/v6/internal/thinking"
|
|
"github.com/router-for-me/CLIProxyAPI/v6/internal/util"
|
|
cliproxyauth "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth"
|
|
cliproxyexecutor "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/executor"
|
|
sdktranslator "github.com/router-for-me/CLIProxyAPI/v6/sdk/translator"
|
|
log "github.com/sirupsen/logrus"
|
|
"github.com/tidwall/gjson"
|
|
"github.com/tidwall/sjson"
|
|
"github.com/tiktoken-go/tokenizer"
|
|
|
|
"github.com/gin-gonic/gin"
|
|
"github.com/google/uuid"
|
|
)
|
|
|
|
const (
|
|
codexUserAgent = "codex_cli_rs/0.116.0 (Mac OS 26.0.1; arm64) Apple_Terminal/464"
|
|
codexOriginator = "codex_cli_rs"
|
|
)
|
|
|
|
var dataTag = []byte("data:")
|
|
|
|
// CodexExecutor is a stateless executor for Codex (OpenAI Responses API entrypoint).
|
|
// If api_key is unavailable on auth, it falls back to legacy via ClientAdapter.
|
|
type CodexExecutor struct {
|
|
cfg *config.Config
|
|
}
|
|
|
|
func NewCodexExecutor(cfg *config.Config) *CodexExecutor { return &CodexExecutor{cfg: cfg} }
|
|
|
|
func (e *CodexExecutor) Identifier() string { return "codex" }
|
|
|
|
// PrepareRequest injects Codex credentials into the outgoing HTTP request.
|
|
func (e *CodexExecutor) PrepareRequest(req *http.Request, auth *cliproxyauth.Auth) error {
|
|
if req == nil {
|
|
return nil
|
|
}
|
|
apiKey, _ := codexCreds(auth)
|
|
if strings.TrimSpace(apiKey) != "" {
|
|
req.Header.Set("Authorization", "Bearer "+apiKey)
|
|
}
|
|
var attrs map[string]string
|
|
if auth != nil {
|
|
attrs = auth.Attributes
|
|
}
|
|
util.ApplyCustomHeadersFromAttrs(req, attrs)
|
|
return nil
|
|
}
|
|
|
|
// HttpRequest injects Codex credentials into the request and executes it.
|
|
func (e *CodexExecutor) HttpRequest(ctx context.Context, auth *cliproxyauth.Auth, req *http.Request) (*http.Response, error) {
|
|
if req == nil {
|
|
return nil, fmt.Errorf("codex executor: request is nil")
|
|
}
|
|
if ctx == nil {
|
|
ctx = req.Context()
|
|
}
|
|
httpReq := req.WithContext(ctx)
|
|
if err := e.PrepareRequest(httpReq, auth); err != nil {
|
|
return nil, err
|
|
}
|
|
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
|
|
return httpClient.Do(httpReq)
|
|
}
|
|
|
|
func (e *CodexExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (resp cliproxyexecutor.Response, err error) {
|
|
if opts.Alt == "responses/compact" {
|
|
return e.executeCompact(ctx, auth, req, opts)
|
|
}
|
|
baseModel := thinking.ParseSuffix(req.Model).ModelName
|
|
|
|
apiKey, baseURL := codexCreds(auth)
|
|
if baseURL == "" {
|
|
baseURL = "https://chatgpt.com/backend-api/codex"
|
|
}
|
|
|
|
reporter := newUsageReporter(ctx, e.Identifier(), baseModel, auth)
|
|
defer reporter.trackFailure(ctx, &err)
|
|
|
|
from := opts.SourceFormat
|
|
to := sdktranslator.FromString("codex")
|
|
originalPayloadSource := req.Payload
|
|
if len(opts.OriginalRequest) > 0 {
|
|
originalPayloadSource = opts.OriginalRequest
|
|
}
|
|
originalPayload := originalPayloadSource
|
|
originalTranslated := sdktranslator.TranslateRequest(from, to, baseModel, originalPayload, false)
|
|
body := sdktranslator.TranslateRequest(from, to, baseModel, req.Payload, false)
|
|
|
|
body, err = thinking.ApplyThinking(body, req.Model, from.String(), to.String(), e.Identifier())
|
|
if err != nil {
|
|
return resp, err
|
|
}
|
|
|
|
requestedModel := payloadRequestedModel(opts, req.Model)
|
|
body = applyPayloadConfigWithRoot(e.cfg, baseModel, to.String(), "", body, originalTranslated, requestedModel)
|
|
body, _ = sjson.SetBytes(body, "model", baseModel)
|
|
body, _ = sjson.SetBytes(body, "stream", true)
|
|
body, _ = sjson.DeleteBytes(body, "previous_response_id")
|
|
body, _ = sjson.DeleteBytes(body, "safety_identifier")
|
|
if !gjson.GetBytes(body, "instructions").Exists() {
|
|
body, _ = sjson.SetBytes(body, "instructions", "")
|
|
}
|
|
|
|
url := strings.TrimSuffix(baseURL, "/") + "/responses"
|
|
httpReq, continuity, err := e.cacheHelper(ctx, auth, from, url, req, opts, body)
|
|
if err != nil {
|
|
return resp, err
|
|
}
|
|
applyCodexHeaders(httpReq, auth, apiKey, true, e.cfg)
|
|
logCodexRequestDiagnostics(ctx, auth, req, opts, httpReq.Header, body, continuity)
|
|
var authID, authLabel, authType, authValue string
|
|
if auth != nil {
|
|
authID = auth.ID
|
|
authLabel = auth.Label
|
|
authType, authValue = auth.AccountInfo()
|
|
}
|
|
recordAPIRequest(ctx, e.cfg, upstreamRequestLog{
|
|
URL: url,
|
|
Method: http.MethodPost,
|
|
Headers: httpReq.Header.Clone(),
|
|
Body: body,
|
|
Provider: e.Identifier(),
|
|
AuthID: authID,
|
|
AuthLabel: authLabel,
|
|
AuthType: authType,
|
|
AuthValue: authValue,
|
|
})
|
|
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
|
|
httpResp, err := httpClient.Do(httpReq)
|
|
if err != nil {
|
|
recordAPIResponseError(ctx, e.cfg, err)
|
|
return resp, err
|
|
}
|
|
defer func() {
|
|
if errClose := httpResp.Body.Close(); errClose != nil {
|
|
log.Errorf("codex executor: close response body error: %v", errClose)
|
|
}
|
|
}()
|
|
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
|
|
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
|
|
b, _ := io.ReadAll(httpResp.Body)
|
|
appendAPIResponseChunk(ctx, e.cfg, b)
|
|
logWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
|
|
err = newCodexStatusErr(httpResp.StatusCode, b)
|
|
return resp, err
|
|
}
|
|
data, err := io.ReadAll(httpResp.Body)
|
|
if err != nil {
|
|
recordAPIResponseError(ctx, e.cfg, err)
|
|
return resp, err
|
|
}
|
|
appendAPIResponseChunk(ctx, e.cfg, data)
|
|
|
|
lines := bytes.Split(data, []byte("\n"))
|
|
for _, line := range lines {
|
|
if !bytes.HasPrefix(line, dataTag) {
|
|
continue
|
|
}
|
|
|
|
line = bytes.TrimSpace(line[5:])
|
|
if gjson.GetBytes(line, "type").String() != "response.completed" {
|
|
continue
|
|
}
|
|
|
|
if detail, ok := parseCodexUsage(line); ok {
|
|
reporter.publish(ctx, detail)
|
|
}
|
|
|
|
var param any
|
|
out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, originalPayload, body, line, ¶m)
|
|
resp = cliproxyexecutor.Response{Payload: out, Headers: httpResp.Header.Clone()}
|
|
return resp, nil
|
|
}
|
|
err = statusErr{code: 408, msg: "stream error: stream disconnected before completion: stream closed before response.completed"}
|
|
return resp, err
|
|
}
|
|
|
|
func (e *CodexExecutor) executeCompact(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (resp cliproxyexecutor.Response, err error) {
|
|
baseModel := thinking.ParseSuffix(req.Model).ModelName
|
|
|
|
apiKey, baseURL := codexCreds(auth)
|
|
if baseURL == "" {
|
|
baseURL = "https://chatgpt.com/backend-api/codex"
|
|
}
|
|
|
|
reporter := newUsageReporter(ctx, e.Identifier(), baseModel, auth)
|
|
defer reporter.trackFailure(ctx, &err)
|
|
|
|
from := opts.SourceFormat
|
|
to := sdktranslator.FromString("openai-response")
|
|
originalPayloadSource := req.Payload
|
|
if len(opts.OriginalRequest) > 0 {
|
|
originalPayloadSource = opts.OriginalRequest
|
|
}
|
|
originalPayload := originalPayloadSource
|
|
originalTranslated := sdktranslator.TranslateRequest(from, to, baseModel, originalPayload, false)
|
|
body := sdktranslator.TranslateRequest(from, to, baseModel, req.Payload, false)
|
|
|
|
body, err = thinking.ApplyThinking(body, req.Model, from.String(), to.String(), e.Identifier())
|
|
if err != nil {
|
|
return resp, err
|
|
}
|
|
|
|
requestedModel := payloadRequestedModel(opts, req.Model)
|
|
body = applyPayloadConfigWithRoot(e.cfg, baseModel, to.String(), "", body, originalTranslated, requestedModel)
|
|
body, _ = sjson.SetBytes(body, "model", baseModel)
|
|
body, _ = sjson.DeleteBytes(body, "stream")
|
|
|
|
url := strings.TrimSuffix(baseURL, "/") + "/responses/compact"
|
|
httpReq, continuity, err := e.cacheHelper(ctx, auth, from, url, req, opts, body)
|
|
if err != nil {
|
|
return resp, err
|
|
}
|
|
applyCodexHeaders(httpReq, auth, apiKey, false, e.cfg)
|
|
logCodexRequestDiagnostics(ctx, auth, req, opts, httpReq.Header, body, continuity)
|
|
var authID, authLabel, authType, authValue string
|
|
if auth != nil {
|
|
authID = auth.ID
|
|
authLabel = auth.Label
|
|
authType, authValue = auth.AccountInfo()
|
|
}
|
|
recordAPIRequest(ctx, e.cfg, upstreamRequestLog{
|
|
URL: url,
|
|
Method: http.MethodPost,
|
|
Headers: httpReq.Header.Clone(),
|
|
Body: body,
|
|
Provider: e.Identifier(),
|
|
AuthID: authID,
|
|
AuthLabel: authLabel,
|
|
AuthType: authType,
|
|
AuthValue: authValue,
|
|
})
|
|
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
|
|
httpResp, err := httpClient.Do(httpReq)
|
|
if err != nil {
|
|
recordAPIResponseError(ctx, e.cfg, err)
|
|
return resp, err
|
|
}
|
|
defer func() {
|
|
if errClose := httpResp.Body.Close(); errClose != nil {
|
|
log.Errorf("codex executor: close response body error: %v", errClose)
|
|
}
|
|
}()
|
|
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
|
|
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
|
|
b, _ := io.ReadAll(httpResp.Body)
|
|
appendAPIResponseChunk(ctx, e.cfg, b)
|
|
logWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
|
|
err = newCodexStatusErr(httpResp.StatusCode, b)
|
|
return resp, err
|
|
}
|
|
data, err := io.ReadAll(httpResp.Body)
|
|
if err != nil {
|
|
recordAPIResponseError(ctx, e.cfg, err)
|
|
return resp, err
|
|
}
|
|
appendAPIResponseChunk(ctx, e.cfg, data)
|
|
reporter.publish(ctx, parseOpenAIUsage(data))
|
|
reporter.ensurePublished(ctx)
|
|
var param any
|
|
out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, originalPayload, body, data, ¶m)
|
|
resp = cliproxyexecutor.Response{Payload: out, Headers: httpResp.Header.Clone()}
|
|
return resp, nil
|
|
}
|
|
|
|
func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (_ *cliproxyexecutor.StreamResult, err error) {
|
|
if opts.Alt == "responses/compact" {
|
|
return nil, statusErr{code: http.StatusBadRequest, msg: "streaming not supported for /responses/compact"}
|
|
}
|
|
baseModel := thinking.ParseSuffix(req.Model).ModelName
|
|
|
|
apiKey, baseURL := codexCreds(auth)
|
|
if baseURL == "" {
|
|
baseURL = "https://chatgpt.com/backend-api/codex"
|
|
}
|
|
|
|
reporter := newUsageReporter(ctx, e.Identifier(), baseModel, auth)
|
|
defer reporter.trackFailure(ctx, &err)
|
|
|
|
from := opts.SourceFormat
|
|
to := sdktranslator.FromString("codex")
|
|
originalPayloadSource := req.Payload
|
|
if len(opts.OriginalRequest) > 0 {
|
|
originalPayloadSource = opts.OriginalRequest
|
|
}
|
|
originalPayload := originalPayloadSource
|
|
originalTranslated := sdktranslator.TranslateRequest(from, to, baseModel, originalPayload, true)
|
|
body := sdktranslator.TranslateRequest(from, to, baseModel, req.Payload, true)
|
|
|
|
body, err = thinking.ApplyThinking(body, req.Model, from.String(), to.String(), e.Identifier())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
requestedModel := payloadRequestedModel(opts, req.Model)
|
|
body = applyPayloadConfigWithRoot(e.cfg, baseModel, to.String(), "", body, originalTranslated, requestedModel)
|
|
body, _ = sjson.DeleteBytes(body, "previous_response_id")
|
|
body, _ = sjson.DeleteBytes(body, "safety_identifier")
|
|
body, _ = sjson.SetBytes(body, "model", baseModel)
|
|
if !gjson.GetBytes(body, "instructions").Exists() {
|
|
body, _ = sjson.SetBytes(body, "instructions", "")
|
|
}
|
|
|
|
url := strings.TrimSuffix(baseURL, "/") + "/responses"
|
|
httpReq, continuity, err := e.cacheHelper(ctx, auth, from, url, req, opts, body)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
applyCodexHeaders(httpReq, auth, apiKey, true, e.cfg)
|
|
logCodexRequestDiagnostics(ctx, auth, req, opts, httpReq.Header, body, continuity)
|
|
var authID, authLabel, authType, authValue string
|
|
if auth != nil {
|
|
authID = auth.ID
|
|
authLabel = auth.Label
|
|
authType, authValue = auth.AccountInfo()
|
|
}
|
|
recordAPIRequest(ctx, e.cfg, upstreamRequestLog{
|
|
URL: url,
|
|
Method: http.MethodPost,
|
|
Headers: httpReq.Header.Clone(),
|
|
Body: body,
|
|
Provider: e.Identifier(),
|
|
AuthID: authID,
|
|
AuthLabel: authLabel,
|
|
AuthType: authType,
|
|
AuthValue: authValue,
|
|
})
|
|
|
|
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
|
|
httpResp, err := httpClient.Do(httpReq)
|
|
if err != nil {
|
|
recordAPIResponseError(ctx, e.cfg, err)
|
|
return nil, err
|
|
}
|
|
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
|
|
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
|
|
data, readErr := io.ReadAll(httpResp.Body)
|
|
if errClose := httpResp.Body.Close(); errClose != nil {
|
|
log.Errorf("codex executor: close response body error: %v", errClose)
|
|
}
|
|
if readErr != nil {
|
|
recordAPIResponseError(ctx, e.cfg, readErr)
|
|
return nil, readErr
|
|
}
|
|
appendAPIResponseChunk(ctx, e.cfg, data)
|
|
logWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), data))
|
|
err = newCodexStatusErr(httpResp.StatusCode, data)
|
|
return nil, err
|
|
}
|
|
out := make(chan cliproxyexecutor.StreamChunk)
|
|
go func() {
|
|
defer close(out)
|
|
defer func() {
|
|
if errClose := httpResp.Body.Close(); errClose != nil {
|
|
log.Errorf("codex executor: close response body error: %v", errClose)
|
|
}
|
|
}()
|
|
scanner := bufio.NewScanner(httpResp.Body)
|
|
scanner.Buffer(nil, 52_428_800) // 50MB
|
|
var param any
|
|
for scanner.Scan() {
|
|
line := scanner.Bytes()
|
|
appendAPIResponseChunk(ctx, e.cfg, line)
|
|
|
|
if bytes.HasPrefix(line, dataTag) {
|
|
data := bytes.TrimSpace(line[5:])
|
|
if gjson.GetBytes(data, "type").String() == "response.completed" {
|
|
if detail, ok := parseCodexUsage(data); ok {
|
|
reporter.publish(ctx, detail)
|
|
}
|
|
}
|
|
}
|
|
|
|
chunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, originalPayload, body, bytes.Clone(line), ¶m)
|
|
for i := range chunks {
|
|
out <- cliproxyexecutor.StreamChunk{Payload: chunks[i]}
|
|
}
|
|
}
|
|
if errScan := scanner.Err(); errScan != nil {
|
|
recordAPIResponseError(ctx, e.cfg, errScan)
|
|
reporter.publishFailure(ctx)
|
|
out <- cliproxyexecutor.StreamChunk{Err: errScan}
|
|
}
|
|
}()
|
|
return &cliproxyexecutor.StreamResult{Headers: httpResp.Header.Clone(), Chunks: out}, nil
|
|
}
|
|
|
|
func (e *CodexExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {
|
|
baseModel := thinking.ParseSuffix(req.Model).ModelName
|
|
|
|
from := opts.SourceFormat
|
|
to := sdktranslator.FromString("codex")
|
|
body := sdktranslator.TranslateRequest(from, to, baseModel, req.Payload, false)
|
|
|
|
body, err := thinking.ApplyThinking(body, req.Model, from.String(), to.String(), e.Identifier())
|
|
if err != nil {
|
|
return cliproxyexecutor.Response{}, err
|
|
}
|
|
|
|
body, _ = sjson.SetBytes(body, "model", baseModel)
|
|
body, _ = sjson.DeleteBytes(body, "previous_response_id")
|
|
body, _ = sjson.DeleteBytes(body, "prompt_cache_retention")
|
|
body, _ = sjson.DeleteBytes(body, "safety_identifier")
|
|
body, _ = sjson.SetBytes(body, "stream", false)
|
|
if !gjson.GetBytes(body, "instructions").Exists() {
|
|
body, _ = sjson.SetBytes(body, "instructions", "")
|
|
}
|
|
|
|
enc, err := tokenizerForCodexModel(baseModel)
|
|
if err != nil {
|
|
return cliproxyexecutor.Response{}, fmt.Errorf("codex executor: tokenizer init failed: %w", err)
|
|
}
|
|
|
|
count, err := countCodexInputTokens(enc, body)
|
|
if err != nil {
|
|
return cliproxyexecutor.Response{}, fmt.Errorf("codex executor: token counting failed: %w", err)
|
|
}
|
|
|
|
usageJSON := fmt.Sprintf(`{"response":{"usage":{"input_tokens":%d,"output_tokens":0,"total_tokens":%d}}}`, count, count)
|
|
translated := sdktranslator.TranslateTokenCount(ctx, to, from, count, []byte(usageJSON))
|
|
return cliproxyexecutor.Response{Payload: translated}, nil
|
|
}
|
|
|
|
func tokenizerForCodexModel(model string) (tokenizer.Codec, error) {
|
|
sanitized := strings.ToLower(strings.TrimSpace(model))
|
|
switch {
|
|
case sanitized == "":
|
|
return tokenizer.Get(tokenizer.Cl100kBase)
|
|
case strings.HasPrefix(sanitized, "gpt-5"):
|
|
return tokenizer.ForModel(tokenizer.GPT5)
|
|
case strings.HasPrefix(sanitized, "gpt-4.1"):
|
|
return tokenizer.ForModel(tokenizer.GPT41)
|
|
case strings.HasPrefix(sanitized, "gpt-4o"):
|
|
return tokenizer.ForModel(tokenizer.GPT4o)
|
|
case strings.HasPrefix(sanitized, "gpt-4"):
|
|
return tokenizer.ForModel(tokenizer.GPT4)
|
|
case strings.HasPrefix(sanitized, "gpt-3.5"), strings.HasPrefix(sanitized, "gpt-3"):
|
|
return tokenizer.ForModel(tokenizer.GPT35Turbo)
|
|
default:
|
|
return tokenizer.Get(tokenizer.Cl100kBase)
|
|
}
|
|
}
|
|
|
|
func countCodexInputTokens(enc tokenizer.Codec, body []byte) (int64, error) {
|
|
if enc == nil {
|
|
return 0, fmt.Errorf("encoder is nil")
|
|
}
|
|
if len(body) == 0 {
|
|
return 0, nil
|
|
}
|
|
|
|
root := gjson.ParseBytes(body)
|
|
var segments []string
|
|
|
|
if inst := strings.TrimSpace(root.Get("instructions").String()); inst != "" {
|
|
segments = append(segments, inst)
|
|
}
|
|
|
|
inputItems := root.Get("input")
|
|
if inputItems.IsArray() {
|
|
arr := inputItems.Array()
|
|
for i := range arr {
|
|
item := arr[i]
|
|
switch item.Get("type").String() {
|
|
case "message":
|
|
content := item.Get("content")
|
|
if content.IsArray() {
|
|
parts := content.Array()
|
|
for j := range parts {
|
|
part := parts[j]
|
|
if text := strings.TrimSpace(part.Get("text").String()); text != "" {
|
|
segments = append(segments, text)
|
|
}
|
|
}
|
|
}
|
|
case "function_call":
|
|
if name := strings.TrimSpace(item.Get("name").String()); name != "" {
|
|
segments = append(segments, name)
|
|
}
|
|
if args := strings.TrimSpace(item.Get("arguments").String()); args != "" {
|
|
segments = append(segments, args)
|
|
}
|
|
case "function_call_output":
|
|
if out := strings.TrimSpace(item.Get("output").String()); out != "" {
|
|
segments = append(segments, out)
|
|
}
|
|
default:
|
|
if text := strings.TrimSpace(item.Get("text").String()); text != "" {
|
|
segments = append(segments, text)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
tools := root.Get("tools")
|
|
if tools.IsArray() {
|
|
tarr := tools.Array()
|
|
for i := range tarr {
|
|
tool := tarr[i]
|
|
if name := strings.TrimSpace(tool.Get("name").String()); name != "" {
|
|
segments = append(segments, name)
|
|
}
|
|
if desc := strings.TrimSpace(tool.Get("description").String()); desc != "" {
|
|
segments = append(segments, desc)
|
|
}
|
|
if params := tool.Get("parameters"); params.Exists() {
|
|
val := params.Raw
|
|
if params.Type == gjson.String {
|
|
val = params.String()
|
|
}
|
|
if trimmed := strings.TrimSpace(val); trimmed != "" {
|
|
segments = append(segments, trimmed)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
textFormat := root.Get("text.format")
|
|
if textFormat.Exists() {
|
|
if name := strings.TrimSpace(textFormat.Get("name").String()); name != "" {
|
|
segments = append(segments, name)
|
|
}
|
|
if schema := textFormat.Get("schema"); schema.Exists() {
|
|
val := schema.Raw
|
|
if schema.Type == gjson.String {
|
|
val = schema.String()
|
|
}
|
|
if trimmed := strings.TrimSpace(val); trimmed != "" {
|
|
segments = append(segments, trimmed)
|
|
}
|
|
}
|
|
}
|
|
|
|
text := strings.Join(segments, "\n")
|
|
if text == "" {
|
|
return 0, nil
|
|
}
|
|
|
|
count, err := enc.Count(text)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return int64(count), nil
|
|
}
|
|
|
|
func (e *CodexExecutor) Refresh(ctx context.Context, auth *cliproxyauth.Auth) (*cliproxyauth.Auth, error) {
|
|
log.Debugf("codex executor: refresh called")
|
|
if auth == nil {
|
|
return nil, statusErr{code: 500, msg: "codex executor: auth is nil"}
|
|
}
|
|
var refreshToken string
|
|
if auth.Metadata != nil {
|
|
if v, ok := auth.Metadata["refresh_token"].(string); ok && v != "" {
|
|
refreshToken = v
|
|
}
|
|
}
|
|
if refreshToken == "" {
|
|
return auth, nil
|
|
}
|
|
svc := codexauth.NewCodexAuth(e.cfg)
|
|
td, err := svc.RefreshTokensWithRetry(ctx, refreshToken, 3)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if auth.Metadata == nil {
|
|
auth.Metadata = make(map[string]any)
|
|
}
|
|
auth.Metadata["id_token"] = td.IDToken
|
|
auth.Metadata["access_token"] = td.AccessToken
|
|
if td.RefreshToken != "" {
|
|
auth.Metadata["refresh_token"] = td.RefreshToken
|
|
}
|
|
if td.AccountID != "" {
|
|
auth.Metadata["account_id"] = td.AccountID
|
|
}
|
|
auth.Metadata["email"] = td.Email
|
|
// Use unified key in files
|
|
auth.Metadata["expired"] = td.Expire
|
|
auth.Metadata["type"] = "codex"
|
|
now := time.Now().Format(time.RFC3339)
|
|
auth.Metadata["last_refresh"] = now
|
|
return auth, nil
|
|
}
|
|
|
|
func (e *CodexExecutor) cacheHelper(ctx context.Context, auth *cliproxyauth.Auth, from sdktranslator.Format, url string, req cliproxyexecutor.Request, opts cliproxyexecutor.Options, rawJSON []byte) (*http.Request, codexContinuity, error) {
|
|
var cache codexCache
|
|
continuity := codexContinuity{}
|
|
if from == "claude" {
|
|
userIDResult := gjson.GetBytes(req.Payload, "metadata.user_id")
|
|
if userIDResult.Exists() {
|
|
key := fmt.Sprintf("%s-%s", req.Model, userIDResult.String())
|
|
var ok bool
|
|
if cache, ok = getCodexCache(key); !ok {
|
|
cache = codexCache{
|
|
ID: uuid.New().String(),
|
|
Expire: time.Now().Add(1 * time.Hour),
|
|
}
|
|
setCodexCache(key, cache)
|
|
}
|
|
}
|
|
} else if from == "openai-response" {
|
|
promptCacheKey := gjson.GetBytes(req.Payload, "prompt_cache_key")
|
|
if promptCacheKey.Exists() {
|
|
cache.ID = promptCacheKey.String()
|
|
continuity = codexContinuity{Key: cache.ID, Source: "prompt_cache_key"}
|
|
}
|
|
} else if from == "openai" {
|
|
continuity = resolveCodexContinuity(ctx, auth, req, opts)
|
|
cache.ID = continuity.Key
|
|
}
|
|
|
|
rawJSON = applyCodexContinuityBody(rawJSON, continuity)
|
|
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(rawJSON))
|
|
if err != nil {
|
|
return nil, continuity, err
|
|
}
|
|
applyCodexContinuityHeaders(httpReq.Header, continuity)
|
|
return httpReq, continuity, nil
|
|
}
|
|
|
|
func applyCodexHeaders(r *http.Request, auth *cliproxyauth.Auth, token string, stream bool, cfg *config.Config) {
|
|
r.Header.Set("Content-Type", "application/json")
|
|
r.Header.Set("Authorization", "Bearer "+token)
|
|
|
|
var ginHeaders http.Header
|
|
if ginCtx, ok := r.Context().Value("gin").(*gin.Context); ok && ginCtx != nil && ginCtx.Request != nil {
|
|
ginHeaders = ginCtx.Request.Header
|
|
}
|
|
|
|
misc.EnsureHeader(r.Header, ginHeaders, "Version", "")
|
|
misc.EnsureHeader(r.Header, ginHeaders, "session_id", uuid.NewString())
|
|
misc.EnsureHeader(r.Header, ginHeaders, "X-Codex-Turn-Metadata", "")
|
|
misc.EnsureHeader(r.Header, ginHeaders, "X-Client-Request-Id", "")
|
|
cfgUserAgent, _ := codexHeaderDefaults(cfg, auth)
|
|
ensureHeaderWithConfigPrecedence(r.Header, ginHeaders, "User-Agent", cfgUserAgent, codexUserAgent)
|
|
|
|
if stream {
|
|
r.Header.Set("Accept", "text/event-stream")
|
|
} else {
|
|
r.Header.Set("Accept", "application/json")
|
|
}
|
|
r.Header.Set("Connection", "Keep-Alive")
|
|
|
|
isAPIKey := false
|
|
if auth != nil && auth.Attributes != nil {
|
|
if v := strings.TrimSpace(auth.Attributes["api_key"]); v != "" {
|
|
isAPIKey = true
|
|
}
|
|
}
|
|
if originator := strings.TrimSpace(ginHeaders.Get("Originator")); originator != "" {
|
|
r.Header.Set("Originator", originator)
|
|
} else if !isAPIKey {
|
|
r.Header.Set("Originator", codexOriginator)
|
|
}
|
|
if !isAPIKey {
|
|
if auth != nil && auth.Metadata != nil {
|
|
if accountID, ok := auth.Metadata["account_id"].(string); ok {
|
|
r.Header.Set("Chatgpt-Account-Id", accountID)
|
|
}
|
|
}
|
|
}
|
|
var attrs map[string]string
|
|
if auth != nil {
|
|
attrs = auth.Attributes
|
|
}
|
|
util.ApplyCustomHeadersFromAttrs(r, attrs)
|
|
}
|
|
|
|
func newCodexStatusErr(statusCode int, body []byte) statusErr {
|
|
err := statusErr{code: statusCode, msg: string(body)}
|
|
if retryAfter := parseCodexRetryAfter(statusCode, body, time.Now()); retryAfter != nil {
|
|
err.retryAfter = retryAfter
|
|
}
|
|
return err
|
|
}
|
|
|
|
func parseCodexRetryAfter(statusCode int, errorBody []byte, now time.Time) *time.Duration {
|
|
if statusCode != http.StatusTooManyRequests || len(errorBody) == 0 {
|
|
return nil
|
|
}
|
|
if strings.TrimSpace(gjson.GetBytes(errorBody, "error.type").String()) != "usage_limit_reached" {
|
|
return nil
|
|
}
|
|
if resetsAt := gjson.GetBytes(errorBody, "error.resets_at").Int(); resetsAt > 0 {
|
|
resetAtTime := time.Unix(resetsAt, 0)
|
|
if resetAtTime.After(now) {
|
|
retryAfter := resetAtTime.Sub(now)
|
|
return &retryAfter
|
|
}
|
|
}
|
|
if resetsInSeconds := gjson.GetBytes(errorBody, "error.resets_in_seconds").Int(); resetsInSeconds > 0 {
|
|
retryAfter := time.Duration(resetsInSeconds) * time.Second
|
|
return &retryAfter
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func codexCreds(a *cliproxyauth.Auth) (apiKey, baseURL string) {
|
|
if a == nil {
|
|
return "", ""
|
|
}
|
|
if a.Attributes != nil {
|
|
apiKey = a.Attributes["api_key"]
|
|
baseURL = a.Attributes["base_url"]
|
|
}
|
|
if apiKey == "" && a.Metadata != nil {
|
|
if v, ok := a.Metadata["access_token"].(string); ok {
|
|
apiKey = v
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func (e *CodexExecutor) resolveCodexConfig(auth *cliproxyauth.Auth) *config.CodexKey {
|
|
if auth == nil || e.cfg == nil {
|
|
return nil
|
|
}
|
|
var attrKey, attrBase string
|
|
if auth.Attributes != nil {
|
|
attrKey = strings.TrimSpace(auth.Attributes["api_key"])
|
|
attrBase = strings.TrimSpace(auth.Attributes["base_url"])
|
|
}
|
|
for i := range e.cfg.CodexKey {
|
|
entry := &e.cfg.CodexKey[i]
|
|
cfgKey := strings.TrimSpace(entry.APIKey)
|
|
cfgBase := strings.TrimSpace(entry.BaseURL)
|
|
if attrKey != "" && attrBase != "" {
|
|
if strings.EqualFold(cfgKey, attrKey) && strings.EqualFold(cfgBase, attrBase) {
|
|
return entry
|
|
}
|
|
continue
|
|
}
|
|
if attrKey != "" && strings.EqualFold(cfgKey, attrKey) {
|
|
if cfgBase == "" || strings.EqualFold(cfgBase, attrBase) {
|
|
return entry
|
|
}
|
|
}
|
|
if attrKey == "" && attrBase != "" && strings.EqualFold(cfgBase, attrBase) {
|
|
return entry
|
|
}
|
|
}
|
|
if attrKey != "" {
|
|
for i := range e.cfg.CodexKey {
|
|
entry := &e.cfg.CodexKey[i]
|
|
if strings.EqualFold(strings.TrimSpace(entry.APIKey), attrKey) {
|
|
return entry
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|