mirror of
https://github.com/router-for-me/CLIProxyAPIPlus.git
synced 2026-03-09 15:25:17 +00:00
Compare commits
15 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ee0c24628f | ||
|
|
ddcf1f279d | ||
|
|
7e6bb8fdc5 | ||
|
|
9cee8ef87b | ||
|
|
93fb841bcb | ||
|
|
0c05131aeb | ||
|
|
5ebc58fab4 | ||
|
|
2b609dd891 | ||
|
|
a8cbc68c3e | ||
|
|
dd44413ba5 | ||
|
|
10fa0f2062 | ||
|
|
30338ecec4 | ||
|
|
9a37defed3 | ||
|
|
c83a057996 | ||
|
|
b7588428c5 |
@@ -220,12 +220,27 @@ func GetGeminiModels() []*ModelInfo {
|
||||
Name: "models/gemini-3-flash-preview",
|
||||
Version: "3.0",
|
||||
DisplayName: "Gemini 3 Flash Preview",
|
||||
Description: "Gemini 3 Flash Preview",
|
||||
Description: "Our most intelligent model built for speed, combining frontier intelligence with superior search and grounding.",
|
||||
InputTokenLimit: 1048576,
|
||||
OutputTokenLimit: 65536,
|
||||
SupportedGenerationMethods: []string{"generateContent", "countTokens", "createCachedContent", "batchGenerateContent"},
|
||||
Thinking: &ThinkingSupport{Min: 128, Max: 32768, ZeroAllowed: false, DynamicAllowed: true, Levels: []string{"minimal", "low", "medium", "high"}},
|
||||
},
|
||||
{
|
||||
ID: "gemini-3.1-flash-lite-preview",
|
||||
Object: "model",
|
||||
Created: 1776288000,
|
||||
OwnedBy: "google",
|
||||
Type: "gemini",
|
||||
Name: "models/gemini-3.1-flash-lite-preview",
|
||||
Version: "3.1",
|
||||
DisplayName: "Gemini 3.1 Flash Lite Preview",
|
||||
Description: "Our smallest and most cost effective model, built for at scale usage.",
|
||||
InputTokenLimit: 1048576,
|
||||
OutputTokenLimit: 65536,
|
||||
SupportedGenerationMethods: []string{"generateContent", "countTokens", "createCachedContent", "batchGenerateContent"},
|
||||
Thinking: &ThinkingSupport{Min: 128, Max: 32768, ZeroAllowed: false, DynamicAllowed: true, Levels: []string{"minimal", "high"}},
|
||||
},
|
||||
{
|
||||
ID: "gemini-3-pro-image-preview",
|
||||
Object: "model",
|
||||
@@ -336,6 +351,21 @@ func GetGeminiVertexModels() []*ModelInfo {
|
||||
SupportedGenerationMethods: []string{"generateContent", "countTokens", "createCachedContent", "batchGenerateContent"},
|
||||
Thinking: &ThinkingSupport{Min: 128, Max: 32768, ZeroAllowed: false, DynamicAllowed: true, Levels: []string{"low", "high"}},
|
||||
},
|
||||
{
|
||||
ID: "gemini-3.1-flash-lite-preview",
|
||||
Object: "model",
|
||||
Created: 1776288000,
|
||||
OwnedBy: "google",
|
||||
Type: "gemini",
|
||||
Name: "models/gemini-3.1-flash-lite-preview",
|
||||
Version: "3.1",
|
||||
DisplayName: "Gemini 3.1 Flash Lite Preview",
|
||||
Description: "Our smallest and most cost effective model, built for at scale usage.",
|
||||
InputTokenLimit: 1048576,
|
||||
OutputTokenLimit: 65536,
|
||||
SupportedGenerationMethods: []string{"generateContent", "countTokens", "createCachedContent", "batchGenerateContent"},
|
||||
Thinking: &ThinkingSupport{Min: 128, Max: 32768, ZeroAllowed: false, DynamicAllowed: true, Levels: []string{"minimal", "high"}},
|
||||
},
|
||||
{
|
||||
ID: "gemini-3-pro-image-preview",
|
||||
Object: "model",
|
||||
@@ -508,6 +538,21 @@ func GetGeminiCLIModels() []*ModelInfo {
|
||||
SupportedGenerationMethods: []string{"generateContent", "countTokens", "createCachedContent", "batchGenerateContent"},
|
||||
Thinking: &ThinkingSupport{Min: 128, Max: 32768, ZeroAllowed: false, DynamicAllowed: true, Levels: []string{"minimal", "low", "medium", "high"}},
|
||||
},
|
||||
{
|
||||
ID: "gemini-3.1-flash-lite-preview",
|
||||
Object: "model",
|
||||
Created: 1776288000,
|
||||
OwnedBy: "google",
|
||||
Type: "gemini",
|
||||
Name: "models/gemini-3.1-flash-lite-preview",
|
||||
Version: "3.1",
|
||||
DisplayName: "Gemini 3.1 Flash Lite Preview",
|
||||
Description: "Our smallest and most cost effective model, built for at scale usage.",
|
||||
InputTokenLimit: 1048576,
|
||||
OutputTokenLimit: 65536,
|
||||
SupportedGenerationMethods: []string{"generateContent", "countTokens", "createCachedContent", "batchGenerateContent"},
|
||||
Thinking: &ThinkingSupport{Min: 128, Max: 32768, ZeroAllowed: false, DynamicAllowed: true, Levels: []string{"minimal", "high"}},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -604,6 +649,21 @@ func GetAIStudioModels() []*ModelInfo {
|
||||
SupportedGenerationMethods: []string{"generateContent", "countTokens", "createCachedContent", "batchGenerateContent"},
|
||||
Thinking: &ThinkingSupport{Min: 128, Max: 32768, ZeroAllowed: false, DynamicAllowed: true},
|
||||
},
|
||||
{
|
||||
ID: "gemini-3.1-flash-lite-preview",
|
||||
Object: "model",
|
||||
Created: 1776288000,
|
||||
OwnedBy: "google",
|
||||
Type: "gemini",
|
||||
Name: "models/gemini-3.1-flash-lite-preview",
|
||||
Version: "3.1",
|
||||
DisplayName: "Gemini 3.1 Flash Lite Preview",
|
||||
Description: "Our smallest and most cost effective model, built for at scale usage.",
|
||||
InputTokenLimit: 1048576,
|
||||
OutputTokenLimit: 65536,
|
||||
SupportedGenerationMethods: []string{"generateContent", "countTokens", "createCachedContent", "batchGenerateContent"},
|
||||
Thinking: &ThinkingSupport{Min: 128, Max: 32768, ZeroAllowed: false, DynamicAllowed: true, Levels: []string{"minimal", "high"}},
|
||||
},
|
||||
{
|
||||
ID: "gemini-pro-latest",
|
||||
Object: "model",
|
||||
@@ -980,6 +1040,7 @@ func GetAntigravityModelConfig() map[string]*AntigravityModelConfig {
|
||||
"gemini-3.1-pro-high": {Thinking: &ThinkingSupport{Min: 128, Max: 32768, ZeroAllowed: false, DynamicAllowed: true, Levels: []string{"low", "high"}}},
|
||||
"gemini-3.1-pro-low": {Thinking: &ThinkingSupport{Min: 128, Max: 32768, ZeroAllowed: false, DynamicAllowed: true, Levels: []string{"low", "high"}}},
|
||||
"gemini-3.1-flash-image": {Thinking: &ThinkingSupport{Min: 128, Max: 32768, ZeroAllowed: false, DynamicAllowed: true, Levels: []string{"minimal", "high"}}},
|
||||
"gemini-3.1-flash-lite-preview": {Thinking: &ThinkingSupport{Min: 128, Max: 32768, ZeroAllowed: false, DynamicAllowed: true, Levels: []string{"minimal", "high"}}},
|
||||
"gemini-3-flash": {Thinking: &ThinkingSupport{Min: 128, Max: 32768, ZeroAllowed: false, DynamicAllowed: true, Levels: []string{"minimal", "low", "medium", "high"}}},
|
||||
"claude-opus-4-6-thinking": {Thinking: &ThinkingSupport{Min: 1024, Max: 64000, ZeroAllowed: true, DynamicAllowed: true}, MaxCompletionTokens: 64000},
|
||||
"claude-sonnet-4-6": {Thinking: &ThinkingSupport{Min: 1024, Max: 64000, ZeroAllowed: true, DynamicAllowed: true}, MaxCompletionTokens: 64000},
|
||||
|
||||
@@ -31,7 +31,7 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
codexResponsesWebsocketBetaHeaderValue = "responses_websockets=2026-02-04"
|
||||
codexResponsesWebsocketBetaHeaderValue = "responses_websockets=2026-02-06"
|
||||
codexResponsesWebsocketIdleTimeout = 5 * time.Minute
|
||||
codexResponsesWebsocketHandshakeTO = 30 * time.Second
|
||||
)
|
||||
@@ -57,11 +57,6 @@ type codexWebsocketSession struct {
|
||||
wsURL string
|
||||
authID string
|
||||
|
||||
// connCreateSent tracks whether a `response.create` message has been successfully sent
|
||||
// on the current websocket connection. The upstream expects the first message on each
|
||||
// connection to be `response.create`.
|
||||
connCreateSent bool
|
||||
|
||||
writeMu sync.Mutex
|
||||
|
||||
activeMu sync.Mutex
|
||||
@@ -212,13 +207,7 @@ func (e *CodexWebsocketsExecutor) Execute(ctx context.Context, auth *cliproxyaut
|
||||
defer sess.reqMu.Unlock()
|
||||
}
|
||||
|
||||
allowAppend := true
|
||||
if sess != nil {
|
||||
sess.connMu.Lock()
|
||||
allowAppend = sess.connCreateSent
|
||||
sess.connMu.Unlock()
|
||||
}
|
||||
wsReqBody := buildCodexWebsocketRequestBody(body, allowAppend)
|
||||
wsReqBody := buildCodexWebsocketRequestBody(body)
|
||||
recordAPIRequest(ctx, e.cfg, upstreamRequestLog{
|
||||
URL: wsURL,
|
||||
Method: "WEBSOCKET",
|
||||
@@ -280,10 +269,7 @@ func (e *CodexWebsocketsExecutor) Execute(ctx context.Context, auth *cliproxyaut
|
||||
// execution session.
|
||||
connRetry, _, errDialRetry := e.ensureUpstreamConn(ctx, auth, sess, authID, wsURL, wsHeaders)
|
||||
if errDialRetry == nil && connRetry != nil {
|
||||
sess.connMu.Lock()
|
||||
allowAppend = sess.connCreateSent
|
||||
sess.connMu.Unlock()
|
||||
wsReqBodyRetry := buildCodexWebsocketRequestBody(body, allowAppend)
|
||||
wsReqBodyRetry := buildCodexWebsocketRequestBody(body)
|
||||
recordAPIRequest(ctx, e.cfg, upstreamRequestLog{
|
||||
URL: wsURL,
|
||||
Method: "WEBSOCKET",
|
||||
@@ -312,7 +298,6 @@ func (e *CodexWebsocketsExecutor) Execute(ctx context.Context, auth *cliproxyaut
|
||||
return resp, errSend
|
||||
}
|
||||
}
|
||||
markCodexWebsocketCreateSent(sess, conn, wsReqBody)
|
||||
|
||||
for {
|
||||
if ctx != nil && ctx.Err() != nil {
|
||||
@@ -403,26 +388,20 @@ func (e *CodexWebsocketsExecutor) ExecuteStream(ctx context.Context, auth *clipr
|
||||
wsHeaders = applyCodexWebsocketHeaders(ctx, wsHeaders, auth, apiKey)
|
||||
|
||||
var authID, authLabel, authType, authValue string
|
||||
if auth != nil {
|
||||
authID = auth.ID
|
||||
authLabel = auth.Label
|
||||
authType, authValue = auth.AccountInfo()
|
||||
}
|
||||
authID = auth.ID
|
||||
authLabel = auth.Label
|
||||
authType, authValue = auth.AccountInfo()
|
||||
|
||||
executionSessionID := executionSessionIDFromOptions(opts)
|
||||
var sess *codexWebsocketSession
|
||||
if executionSessionID != "" {
|
||||
sess = e.getOrCreateSession(executionSessionID)
|
||||
sess.reqMu.Lock()
|
||||
if sess != nil {
|
||||
sess.reqMu.Lock()
|
||||
}
|
||||
}
|
||||
|
||||
allowAppend := true
|
||||
if sess != nil {
|
||||
sess.connMu.Lock()
|
||||
allowAppend = sess.connCreateSent
|
||||
sess.connMu.Unlock()
|
||||
}
|
||||
wsReqBody := buildCodexWebsocketRequestBody(body, allowAppend)
|
||||
wsReqBody := buildCodexWebsocketRequestBody(body)
|
||||
recordAPIRequest(ctx, e.cfg, upstreamRequestLog{
|
||||
URL: wsURL,
|
||||
Method: "WEBSOCKET",
|
||||
@@ -483,10 +462,7 @@ func (e *CodexWebsocketsExecutor) ExecuteStream(ctx context.Context, auth *clipr
|
||||
sess.reqMu.Unlock()
|
||||
return nil, errDialRetry
|
||||
}
|
||||
sess.connMu.Lock()
|
||||
allowAppend = sess.connCreateSent
|
||||
sess.connMu.Unlock()
|
||||
wsReqBodyRetry := buildCodexWebsocketRequestBody(body, allowAppend)
|
||||
wsReqBodyRetry := buildCodexWebsocketRequestBody(body)
|
||||
recordAPIRequest(ctx, e.cfg, upstreamRequestLog{
|
||||
URL: wsURL,
|
||||
Method: "WEBSOCKET",
|
||||
@@ -515,7 +491,6 @@ func (e *CodexWebsocketsExecutor) ExecuteStream(ctx context.Context, auth *clipr
|
||||
return nil, errSend
|
||||
}
|
||||
}
|
||||
markCodexWebsocketCreateSent(sess, conn, wsReqBody)
|
||||
|
||||
out := make(chan cliproxyexecutor.StreamChunk)
|
||||
go func() {
|
||||
@@ -657,31 +632,14 @@ func writeCodexWebsocketMessage(sess *codexWebsocketSession, conn *websocket.Con
|
||||
return conn.WriteMessage(websocket.TextMessage, payload)
|
||||
}
|
||||
|
||||
func buildCodexWebsocketRequestBody(body []byte, allowAppend bool) []byte {
|
||||
func buildCodexWebsocketRequestBody(body []byte) []byte {
|
||||
if len(body) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Codex CLI websocket v2 uses `response.create` with `previous_response_id` for incremental turns.
|
||||
// The upstream ChatGPT Codex websocket currently rejects that with close 1008 (policy violation).
|
||||
// Fall back to v1 `response.append` semantics on the same websocket connection to keep the session alive.
|
||||
//
|
||||
// NOTE: The upstream expects the first websocket event on each connection to be `response.create`,
|
||||
// so we only use `response.append` after we have initialized the current connection.
|
||||
if allowAppend {
|
||||
if prev := strings.TrimSpace(gjson.GetBytes(body, "previous_response_id").String()); prev != "" {
|
||||
inputNode := gjson.GetBytes(body, "input")
|
||||
wsReqBody := []byte(`{}`)
|
||||
wsReqBody, _ = sjson.SetBytes(wsReqBody, "type", "response.append")
|
||||
if inputNode.Exists() && inputNode.IsArray() && strings.TrimSpace(inputNode.Raw) != "" {
|
||||
wsReqBody, _ = sjson.SetRawBytes(wsReqBody, "input", []byte(inputNode.Raw))
|
||||
return wsReqBody
|
||||
}
|
||||
wsReqBody, _ = sjson.SetRawBytes(wsReqBody, "input", []byte("[]"))
|
||||
return wsReqBody
|
||||
}
|
||||
}
|
||||
|
||||
// Match codex-rs websocket v2 semantics: every request is `response.create`.
|
||||
// Incremental follow-up turns continue on the same websocket using
|
||||
// `previous_response_id` + incremental `input`, not `response.append`.
|
||||
wsReqBody, errSet := sjson.SetBytes(bytes.Clone(body), "type", "response.create")
|
||||
if errSet == nil && len(wsReqBody) > 0 {
|
||||
return wsReqBody
|
||||
@@ -725,21 +683,6 @@ func readCodexWebsocketMessage(ctx context.Context, sess *codexWebsocketSession,
|
||||
}
|
||||
}
|
||||
|
||||
func markCodexWebsocketCreateSent(sess *codexWebsocketSession, conn *websocket.Conn, payload []byte) {
|
||||
if sess == nil || conn == nil || len(payload) == 0 {
|
||||
return
|
||||
}
|
||||
if strings.TrimSpace(gjson.GetBytes(payload, "type").String()) != "response.create" {
|
||||
return
|
||||
}
|
||||
|
||||
sess.connMu.Lock()
|
||||
if sess.conn == conn {
|
||||
sess.connCreateSent = true
|
||||
}
|
||||
sess.connMu.Unlock()
|
||||
}
|
||||
|
||||
func newProxyAwareWebsocketDialer(cfg *config.Config, auth *cliproxyauth.Auth) *websocket.Dialer {
|
||||
dialer := &websocket.Dialer{
|
||||
Proxy: http.ProxyFromEnvironment,
|
||||
@@ -1017,36 +960,6 @@ func closeHTTPResponseBody(resp *http.Response, logPrefix string) {
|
||||
}
|
||||
}
|
||||
|
||||
func closeOnContextDone(ctx context.Context, conn *websocket.Conn) chan struct{} {
|
||||
done := make(chan struct{})
|
||||
if ctx == nil || conn == nil {
|
||||
return done
|
||||
}
|
||||
go func() {
|
||||
select {
|
||||
case <-done:
|
||||
case <-ctx.Done():
|
||||
_ = conn.Close()
|
||||
}
|
||||
}()
|
||||
return done
|
||||
}
|
||||
|
||||
func cancelReadOnContextDone(ctx context.Context, conn *websocket.Conn) chan struct{} {
|
||||
done := make(chan struct{})
|
||||
if ctx == nil || conn == nil {
|
||||
return done
|
||||
}
|
||||
go func() {
|
||||
select {
|
||||
case <-done:
|
||||
case <-ctx.Done():
|
||||
_ = conn.SetReadDeadline(time.Now())
|
||||
}
|
||||
}()
|
||||
return done
|
||||
}
|
||||
|
||||
func executionSessionIDFromOptions(opts cliproxyexecutor.Options) string {
|
||||
if len(opts.Metadata) == 0 {
|
||||
return ""
|
||||
@@ -1120,7 +1033,6 @@ func (e *CodexWebsocketsExecutor) ensureUpstreamConn(ctx context.Context, auth *
|
||||
sess.conn = conn
|
||||
sess.wsURL = wsURL
|
||||
sess.authID = authID
|
||||
sess.connCreateSent = false
|
||||
sess.readerConn = conn
|
||||
sess.connMu.Unlock()
|
||||
|
||||
@@ -1206,7 +1118,6 @@ func (e *CodexWebsocketsExecutor) invalidateUpstreamConn(sess *codexWebsocketSes
|
||||
return
|
||||
}
|
||||
sess.conn = nil
|
||||
sess.connCreateSent = false
|
||||
if sess.readerConn == conn {
|
||||
sess.readerConn = nil
|
||||
}
|
||||
@@ -1273,7 +1184,6 @@ func (e *CodexWebsocketsExecutor) closeExecutionSession(sess *codexWebsocketSess
|
||||
authID := sess.authID
|
||||
wsURL := sess.wsURL
|
||||
sess.conn = nil
|
||||
sess.connCreateSent = false
|
||||
if sess.readerConn == conn {
|
||||
sess.readerConn = nil
|
||||
}
|
||||
|
||||
36
internal/runtime/executor/codex_websockets_executor_test.go
Normal file
36
internal/runtime/executor/codex_websockets_executor_test.go
Normal file
@@ -0,0 +1,36 @@
|
||||
package executor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"testing"
|
||||
|
||||
"github.com/tidwall/gjson"
|
||||
)
|
||||
|
||||
func TestBuildCodexWebsocketRequestBodyPreservesPreviousResponseID(t *testing.T) {
|
||||
body := []byte(`{"model":"gpt-5-codex","previous_response_id":"resp-1","input":[{"type":"message","id":"msg-1"}]}`)
|
||||
|
||||
wsReqBody := buildCodexWebsocketRequestBody(body)
|
||||
|
||||
if got := gjson.GetBytes(wsReqBody, "type").String(); got != "response.create" {
|
||||
t.Fatalf("type = %s, want response.create", got)
|
||||
}
|
||||
if got := gjson.GetBytes(wsReqBody, "previous_response_id").String(); got != "resp-1" {
|
||||
t.Fatalf("previous_response_id = %s, want resp-1", got)
|
||||
}
|
||||
if gjson.GetBytes(wsReqBody, "input.0.id").String() != "msg-1" {
|
||||
t.Fatalf("input item id mismatch")
|
||||
}
|
||||
if got := gjson.GetBytes(wsReqBody, "type").String(); got == "response.append" {
|
||||
t.Fatalf("unexpected websocket request type: %s", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestApplyCodexWebsocketHeadersDefaultsToCurrentResponsesBeta(t *testing.T) {
|
||||
headers := applyCodexWebsocketHeaders(context.Background(), http.Header{}, nil, "")
|
||||
|
||||
if got := headers.Get("OpenAI-Beta"); got != codexResponsesWebsocketBetaHeaderValue {
|
||||
t.Fatalf("OpenAI-Beta = %s, want %s", got, codexResponsesWebsocketBetaHeaderValue)
|
||||
}
|
||||
}
|
||||
@@ -212,6 +212,33 @@ func ConvertOpenAIRequestToAntigravity(modelName string, inputRawJSON []byte, _
|
||||
} else {
|
||||
log.Warnf("Unknown file name extension '%s' in user message, skip", ext)
|
||||
}
|
||||
case "input_audio":
|
||||
audioData := item.Get("input_audio.data").String()
|
||||
audioFormat := item.Get("input_audio.format").String()
|
||||
if audioData != "" {
|
||||
audioMimeMap := map[string]string{
|
||||
"mp3": "audio/mpeg",
|
||||
"wav": "audio/wav",
|
||||
"ogg": "audio/ogg",
|
||||
"flac": "audio/flac",
|
||||
"aac": "audio/aac",
|
||||
"webm": "audio/webm",
|
||||
"pcm16": "audio/pcm",
|
||||
"g711_ulaw": "audio/basic",
|
||||
"g711_alaw": "audio/basic",
|
||||
}
|
||||
mimeType := "audio/wav"
|
||||
if audioFormat != "" {
|
||||
if mapped, ok := audioMimeMap[audioFormat]; ok {
|
||||
mimeType = mapped
|
||||
} else {
|
||||
mimeType = "audio/" + audioFormat
|
||||
}
|
||||
}
|
||||
node, _ = sjson.SetBytes(node, "parts."+itoa(p)+".inlineData.mime_type", mimeType)
|
||||
node, _ = sjson.SetBytes(node, "parts."+itoa(p)+".inlineData.data", audioData)
|
||||
p++
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -203,46 +203,9 @@ func ConvertOpenAIRequestToClaude(modelName string, inputRawJSON []byte, stream
|
||||
msg, _ = sjson.SetRaw(msg, "content.-1", part)
|
||||
} else if contentResult.Exists() && contentResult.IsArray() {
|
||||
contentResult.ForEach(func(_, part gjson.Result) bool {
|
||||
partType := part.Get("type").String()
|
||||
|
||||
switch partType {
|
||||
case "text":
|
||||
textPart := `{"type":"text","text":""}`
|
||||
textPart, _ = sjson.Set(textPart, "text", part.Get("text").String())
|
||||
msg, _ = sjson.SetRaw(msg, "content.-1", textPart)
|
||||
|
||||
case "image_url":
|
||||
// Convert OpenAI image format to Claude Code format
|
||||
imageURL := part.Get("image_url.url").String()
|
||||
if strings.HasPrefix(imageURL, "data:") {
|
||||
// Extract base64 data and media type from data URL
|
||||
parts := strings.Split(imageURL, ",")
|
||||
if len(parts) == 2 {
|
||||
mediaTypePart := strings.Split(parts[0], ";")[0]
|
||||
mediaType := strings.TrimPrefix(mediaTypePart, "data:")
|
||||
data := parts[1]
|
||||
|
||||
imagePart := `{"type":"image","source":{"type":"base64","media_type":"","data":""}}`
|
||||
imagePart, _ = sjson.Set(imagePart, "source.media_type", mediaType)
|
||||
imagePart, _ = sjson.Set(imagePart, "source.data", data)
|
||||
msg, _ = sjson.SetRaw(msg, "content.-1", imagePart)
|
||||
}
|
||||
}
|
||||
|
||||
case "file":
|
||||
fileData := part.Get("file.file_data").String()
|
||||
if strings.HasPrefix(fileData, "data:") {
|
||||
semicolonIdx := strings.Index(fileData, ";")
|
||||
commaIdx := strings.Index(fileData, ",")
|
||||
if semicolonIdx != -1 && commaIdx != -1 && commaIdx > semicolonIdx {
|
||||
mediaType := strings.TrimPrefix(fileData[:semicolonIdx], "data:")
|
||||
data := fileData[commaIdx+1:]
|
||||
docPart := `{"type":"document","source":{"type":"base64","media_type":"","data":""}}`
|
||||
docPart, _ = sjson.Set(docPart, "source.media_type", mediaType)
|
||||
docPart, _ = sjson.Set(docPart, "source.data", data)
|
||||
msg, _ = sjson.SetRaw(msg, "content.-1", docPart)
|
||||
}
|
||||
}
|
||||
claudePart := convertOpenAIContentPartToClaudePart(part)
|
||||
if claudePart != "" {
|
||||
msg, _ = sjson.SetRaw(msg, "content.-1", claudePart)
|
||||
}
|
||||
return true
|
||||
})
|
||||
@@ -291,11 +254,16 @@ func ConvertOpenAIRequestToClaude(modelName string, inputRawJSON []byte, stream
|
||||
case "tool":
|
||||
// Handle tool result messages conversion
|
||||
toolCallID := message.Get("tool_call_id").String()
|
||||
content := message.Get("content").String()
|
||||
toolContentResult := message.Get("content")
|
||||
|
||||
msg := `{"role":"user","content":[{"type":"tool_result","tool_use_id":"","content":""}]}`
|
||||
msg, _ = sjson.Set(msg, "content.0.tool_use_id", toolCallID)
|
||||
msg, _ = sjson.Set(msg, "content.0.content", content)
|
||||
toolResultContent, toolResultContentRaw := convertOpenAIToolResultContent(toolContentResult)
|
||||
if toolResultContentRaw {
|
||||
msg, _ = sjson.SetRaw(msg, "content.0.content", toolResultContent)
|
||||
} else {
|
||||
msg, _ = sjson.Set(msg, "content.0.content", toolResultContent)
|
||||
}
|
||||
out, _ = sjson.SetRaw(out, "messages.-1", msg)
|
||||
messageIndex++
|
||||
}
|
||||
@@ -358,3 +326,110 @@ func ConvertOpenAIRequestToClaude(modelName string, inputRawJSON []byte, stream
|
||||
|
||||
return []byte(out)
|
||||
}
|
||||
|
||||
func convertOpenAIContentPartToClaudePart(part gjson.Result) string {
|
||||
switch part.Get("type").String() {
|
||||
case "text":
|
||||
textPart := `{"type":"text","text":""}`
|
||||
textPart, _ = sjson.Set(textPart, "text", part.Get("text").String())
|
||||
return textPart
|
||||
|
||||
case "image_url":
|
||||
return convertOpenAIImageURLToClaudePart(part.Get("image_url.url").String())
|
||||
|
||||
case "file":
|
||||
fileData := part.Get("file.file_data").String()
|
||||
if strings.HasPrefix(fileData, "data:") {
|
||||
semicolonIdx := strings.Index(fileData, ";")
|
||||
commaIdx := strings.Index(fileData, ",")
|
||||
if semicolonIdx != -1 && commaIdx != -1 && commaIdx > semicolonIdx {
|
||||
mediaType := strings.TrimPrefix(fileData[:semicolonIdx], "data:")
|
||||
data := fileData[commaIdx+1:]
|
||||
docPart := `{"type":"document","source":{"type":"base64","media_type":"","data":""}}`
|
||||
docPart, _ = sjson.Set(docPart, "source.media_type", mediaType)
|
||||
docPart, _ = sjson.Set(docPart, "source.data", data)
|
||||
return docPart
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return ""
|
||||
}
|
||||
|
||||
func convertOpenAIImageURLToClaudePart(imageURL string) string {
|
||||
if imageURL == "" {
|
||||
return ""
|
||||
}
|
||||
|
||||
if strings.HasPrefix(imageURL, "data:") {
|
||||
parts := strings.SplitN(imageURL, ",", 2)
|
||||
if len(parts) != 2 {
|
||||
return ""
|
||||
}
|
||||
|
||||
mediaTypePart := strings.SplitN(parts[0], ";", 2)[0]
|
||||
mediaType := strings.TrimPrefix(mediaTypePart, "data:")
|
||||
if mediaType == "" {
|
||||
mediaType = "application/octet-stream"
|
||||
}
|
||||
|
||||
imagePart := `{"type":"image","source":{"type":"base64","media_type":"","data":""}}`
|
||||
imagePart, _ = sjson.Set(imagePart, "source.media_type", mediaType)
|
||||
imagePart, _ = sjson.Set(imagePart, "source.data", parts[1])
|
||||
return imagePart
|
||||
}
|
||||
|
||||
imagePart := `{"type":"image","source":{"type":"url","url":""}}`
|
||||
imagePart, _ = sjson.Set(imagePart, "source.url", imageURL)
|
||||
return imagePart
|
||||
}
|
||||
|
||||
func convertOpenAIToolResultContent(content gjson.Result) (string, bool) {
|
||||
if !content.Exists() {
|
||||
return "", false
|
||||
}
|
||||
|
||||
if content.Type == gjson.String {
|
||||
return content.String(), false
|
||||
}
|
||||
|
||||
if content.IsArray() {
|
||||
claudeContent := "[]"
|
||||
partCount := 0
|
||||
|
||||
content.ForEach(func(_, part gjson.Result) bool {
|
||||
if part.Type == gjson.String {
|
||||
textPart := `{"type":"text","text":""}`
|
||||
textPart, _ = sjson.Set(textPart, "text", part.String())
|
||||
claudeContent, _ = sjson.SetRaw(claudeContent, "-1", textPart)
|
||||
partCount++
|
||||
return true
|
||||
}
|
||||
|
||||
claudePart := convertOpenAIContentPartToClaudePart(part)
|
||||
if claudePart != "" {
|
||||
claudeContent, _ = sjson.SetRaw(claudeContent, "-1", claudePart)
|
||||
partCount++
|
||||
}
|
||||
return true
|
||||
})
|
||||
|
||||
if partCount > 0 || len(content.Array()) == 0 {
|
||||
return claudeContent, true
|
||||
}
|
||||
|
||||
return content.Raw, false
|
||||
}
|
||||
|
||||
if content.IsObject() {
|
||||
claudePart := convertOpenAIContentPartToClaudePart(content)
|
||||
if claudePart != "" {
|
||||
claudeContent := "[]"
|
||||
claudeContent, _ = sjson.SetRaw(claudeContent, "-1", claudePart)
|
||||
return claudeContent, true
|
||||
}
|
||||
return content.Raw, false
|
||||
}
|
||||
|
||||
return content.Raw, false
|
||||
}
|
||||
|
||||
@@ -0,0 +1,137 @@
|
||||
package chat_completions
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/tidwall/gjson"
|
||||
)
|
||||
|
||||
func TestConvertOpenAIRequestToClaude_ToolResultTextAndBase64Image(t *testing.T) {
|
||||
inputJSON := `{
|
||||
"model": "gpt-4.1",
|
||||
"messages": [
|
||||
{
|
||||
"role": "assistant",
|
||||
"content": "",
|
||||
"tool_calls": [
|
||||
{
|
||||
"id": "call_1",
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "do_work",
|
||||
"arguments": "{\"a\":1}"
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"role": "tool",
|
||||
"tool_call_id": "call_1",
|
||||
"content": [
|
||||
{"type": "text", "text": "tool ok"},
|
||||
{
|
||||
"type": "image_url",
|
||||
"image_url": {
|
||||
"url": "data:image/png;base64,iVBORw0KGgoAAAANSUhEUg=="
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}`
|
||||
|
||||
result := ConvertOpenAIRequestToClaude("claude-sonnet-4-5", []byte(inputJSON), false)
|
||||
resultJSON := gjson.ParseBytes(result)
|
||||
messages := resultJSON.Get("messages").Array()
|
||||
|
||||
if len(messages) != 2 {
|
||||
t.Fatalf("Expected 2 messages, got %d. Messages: %s", len(messages), resultJSON.Get("messages").Raw)
|
||||
}
|
||||
|
||||
toolResult := messages[1].Get("content.0")
|
||||
if got := toolResult.Get("type").String(); got != "tool_result" {
|
||||
t.Fatalf("Expected content[0].type %q, got %q", "tool_result", got)
|
||||
}
|
||||
if got := toolResult.Get("tool_use_id").String(); got != "call_1" {
|
||||
t.Fatalf("Expected tool_use_id %q, got %q", "call_1", got)
|
||||
}
|
||||
|
||||
toolContent := toolResult.Get("content")
|
||||
if !toolContent.IsArray() {
|
||||
t.Fatalf("Expected tool_result content array, got %s", toolContent.Raw)
|
||||
}
|
||||
if got := toolContent.Get("0.type").String(); got != "text" {
|
||||
t.Fatalf("Expected first tool_result part type %q, got %q", "text", got)
|
||||
}
|
||||
if got := toolContent.Get("0.text").String(); got != "tool ok" {
|
||||
t.Fatalf("Expected first tool_result part text %q, got %q", "tool ok", got)
|
||||
}
|
||||
if got := toolContent.Get("1.type").String(); got != "image" {
|
||||
t.Fatalf("Expected second tool_result part type %q, got %q", "image", got)
|
||||
}
|
||||
if got := toolContent.Get("1.source.type").String(); got != "base64" {
|
||||
t.Fatalf("Expected image source type %q, got %q", "base64", got)
|
||||
}
|
||||
if got := toolContent.Get("1.source.media_type").String(); got != "image/png" {
|
||||
t.Fatalf("Expected image media type %q, got %q", "image/png", got)
|
||||
}
|
||||
if got := toolContent.Get("1.source.data").String(); got != "iVBORw0KGgoAAAANSUhEUg==" {
|
||||
t.Fatalf("Unexpected base64 image data: %q", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestConvertOpenAIRequestToClaude_ToolResultURLImageOnly(t *testing.T) {
|
||||
inputJSON := `{
|
||||
"model": "gpt-4.1",
|
||||
"messages": [
|
||||
{
|
||||
"role": "assistant",
|
||||
"content": "",
|
||||
"tool_calls": [
|
||||
{
|
||||
"id": "call_1",
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "do_work",
|
||||
"arguments": "{\"a\":1}"
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"role": "tool",
|
||||
"tool_call_id": "call_1",
|
||||
"content": [
|
||||
{
|
||||
"type": "image_url",
|
||||
"image_url": {
|
||||
"url": "https://example.com/tool.png"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}`
|
||||
|
||||
result := ConvertOpenAIRequestToClaude("claude-sonnet-4-5", []byte(inputJSON), false)
|
||||
resultJSON := gjson.ParseBytes(result)
|
||||
messages := resultJSON.Get("messages").Array()
|
||||
|
||||
if len(messages) != 2 {
|
||||
t.Fatalf("Expected 2 messages, got %d. Messages: %s", len(messages), resultJSON.Get("messages").Raw)
|
||||
}
|
||||
|
||||
toolContent := messages[1].Get("content.0.content")
|
||||
if !toolContent.IsArray() {
|
||||
t.Fatalf("Expected tool_result content array, got %s", toolContent.Raw)
|
||||
}
|
||||
if got := toolContent.Get("0.type").String(); got != "image" {
|
||||
t.Fatalf("Expected tool_result part type %q, got %q", "image", got)
|
||||
}
|
||||
if got := toolContent.Get("0.source.type").String(); got != "url" {
|
||||
t.Fatalf("Expected image source type %q, got %q", "url", got)
|
||||
}
|
||||
if got := toolContent.Get("0.source.url").String(); got != "https://example.com/tool.png" {
|
||||
t.Fatalf("Unexpected image URL: %q", got)
|
||||
}
|
||||
}
|
||||
@@ -237,6 +237,33 @@ func ConvertOpenAIResponsesRequestToGemini(modelName string, inputRawJSON []byte
|
||||
partJSON, _ = sjson.Set(partJSON, "inline_data.data", data)
|
||||
}
|
||||
}
|
||||
case "input_audio":
|
||||
audioData := contentItem.Get("data").String()
|
||||
audioFormat := contentItem.Get("format").String()
|
||||
if audioData != "" {
|
||||
audioMimeMap := map[string]string{
|
||||
"mp3": "audio/mpeg",
|
||||
"wav": "audio/wav",
|
||||
"ogg": "audio/ogg",
|
||||
"flac": "audio/flac",
|
||||
"aac": "audio/aac",
|
||||
"webm": "audio/webm",
|
||||
"pcm16": "audio/pcm",
|
||||
"g711_ulaw": "audio/basic",
|
||||
"g711_alaw": "audio/basic",
|
||||
}
|
||||
mimeType := "audio/wav"
|
||||
if audioFormat != "" {
|
||||
if mapped, ok := audioMimeMap[audioFormat]; ok {
|
||||
mimeType = mapped
|
||||
} else {
|
||||
mimeType = "audio/" + audioFormat
|
||||
}
|
||||
}
|
||||
partJSON = `{"inline_data":{"mime_type":"","data":""}}`
|
||||
partJSON, _ = sjson.Set(partJSON, "inline_data.mime_type", mimeType)
|
||||
partJSON, _ = sjson.Set(partJSON, "inline_data.data", audioData)
|
||||
}
|
||||
}
|
||||
|
||||
if partJSON != "" {
|
||||
|
||||
@@ -183,7 +183,12 @@ func ConvertClaudeRequestToOpenAI(modelName string, inputRawJSON []byte, stream
|
||||
// Collect tool_result to emit after the main message (ensures tool results follow tool_calls)
|
||||
toolResultJSON := `{"role":"tool","tool_call_id":"","content":""}`
|
||||
toolResultJSON, _ = sjson.Set(toolResultJSON, "tool_call_id", part.Get("tool_use_id").String())
|
||||
toolResultJSON, _ = sjson.Set(toolResultJSON, "content", convertClaudeToolResultContentToString(part.Get("content")))
|
||||
toolResultContent, toolResultContentRaw := convertClaudeToolResultContent(part.Get("content"))
|
||||
if toolResultContentRaw {
|
||||
toolResultJSON, _ = sjson.SetRaw(toolResultJSON, "content", toolResultContent)
|
||||
} else {
|
||||
toolResultJSON, _ = sjson.Set(toolResultJSON, "content", toolResultContent)
|
||||
}
|
||||
toolResults = append(toolResults, toolResultJSON)
|
||||
}
|
||||
return true
|
||||
@@ -374,21 +379,41 @@ func convertClaudeContentPart(part gjson.Result) (string, bool) {
|
||||
}
|
||||
}
|
||||
|
||||
func convertClaudeToolResultContentToString(content gjson.Result) string {
|
||||
func convertClaudeToolResultContent(content gjson.Result) (string, bool) {
|
||||
if !content.Exists() {
|
||||
return ""
|
||||
return "", false
|
||||
}
|
||||
|
||||
if content.Type == gjson.String {
|
||||
return content.String()
|
||||
return content.String(), false
|
||||
}
|
||||
|
||||
if content.IsArray() {
|
||||
var parts []string
|
||||
contentJSON := "[]"
|
||||
hasImagePart := false
|
||||
content.ForEach(func(_, item gjson.Result) bool {
|
||||
switch {
|
||||
case item.Type == gjson.String:
|
||||
parts = append(parts, item.String())
|
||||
text := item.String()
|
||||
parts = append(parts, text)
|
||||
textContent := `{"type":"text","text":""}`
|
||||
textContent, _ = sjson.Set(textContent, "text", text)
|
||||
contentJSON, _ = sjson.SetRaw(contentJSON, "-1", textContent)
|
||||
case item.IsObject() && item.Get("type").String() == "text":
|
||||
text := item.Get("text").String()
|
||||
parts = append(parts, text)
|
||||
textContent := `{"type":"text","text":""}`
|
||||
textContent, _ = sjson.Set(textContent, "text", text)
|
||||
contentJSON, _ = sjson.SetRaw(contentJSON, "-1", textContent)
|
||||
case item.IsObject() && item.Get("type").String() == "image":
|
||||
contentItem, ok := convertClaudeContentPart(item)
|
||||
if ok {
|
||||
contentJSON, _ = sjson.SetRaw(contentJSON, "-1", contentItem)
|
||||
hasImagePart = true
|
||||
} else {
|
||||
parts = append(parts, item.Raw)
|
||||
}
|
||||
case item.IsObject() && item.Get("text").Exists() && item.Get("text").Type == gjson.String:
|
||||
parts = append(parts, item.Get("text").String())
|
||||
default:
|
||||
@@ -397,19 +422,31 @@ func convertClaudeToolResultContentToString(content gjson.Result) string {
|
||||
return true
|
||||
})
|
||||
|
||||
if hasImagePart {
|
||||
return contentJSON, true
|
||||
}
|
||||
|
||||
joined := strings.Join(parts, "\n\n")
|
||||
if strings.TrimSpace(joined) != "" {
|
||||
return joined
|
||||
return joined, false
|
||||
}
|
||||
return content.Raw
|
||||
return content.Raw, false
|
||||
}
|
||||
|
||||
if content.IsObject() {
|
||||
if text := content.Get("text"); text.Exists() && text.Type == gjson.String {
|
||||
return text.String()
|
||||
if content.Get("type").String() == "image" {
|
||||
contentItem, ok := convertClaudeContentPart(content)
|
||||
if ok {
|
||||
contentJSON := "[]"
|
||||
contentJSON, _ = sjson.SetRaw(contentJSON, "-1", contentItem)
|
||||
return contentJSON, true
|
||||
}
|
||||
}
|
||||
return content.Raw
|
||||
if text := content.Get("text"); text.Exists() && text.Type == gjson.String {
|
||||
return text.String(), false
|
||||
}
|
||||
return content.Raw, false
|
||||
}
|
||||
|
||||
return content.Raw
|
||||
return content.Raw, false
|
||||
}
|
||||
|
||||
@@ -488,6 +488,114 @@ func TestConvertClaudeRequestToOpenAI_ToolResultObjectContent(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestConvertClaudeRequestToOpenAI_ToolResultTextAndImageContent(t *testing.T) {
|
||||
inputJSON := `{
|
||||
"model": "claude-3-opus",
|
||||
"messages": [
|
||||
{
|
||||
"role": "assistant",
|
||||
"content": [
|
||||
{"type": "tool_use", "id": "call_1", "name": "do_work", "input": {"a": 1}}
|
||||
]
|
||||
},
|
||||
{
|
||||
"role": "user",
|
||||
"content": [
|
||||
{
|
||||
"type": "tool_result",
|
||||
"tool_use_id": "call_1",
|
||||
"content": [
|
||||
{"type": "text", "text": "tool ok"},
|
||||
{
|
||||
"type": "image",
|
||||
"source": {
|
||||
"type": "base64",
|
||||
"media_type": "image/png",
|
||||
"data": "iVBORw0KGgoAAAANSUhEUg=="
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}`
|
||||
|
||||
result := ConvertClaudeRequestToOpenAI("test-model", []byte(inputJSON), false)
|
||||
resultJSON := gjson.ParseBytes(result)
|
||||
messages := resultJSON.Get("messages").Array()
|
||||
|
||||
if len(messages) != 2 {
|
||||
t.Fatalf("Expected 2 messages, got %d. Messages: %s", len(messages), resultJSON.Get("messages").Raw)
|
||||
}
|
||||
|
||||
toolContent := messages[1].Get("content")
|
||||
if !toolContent.IsArray() {
|
||||
t.Fatalf("Expected tool content array, got %s", toolContent.Raw)
|
||||
}
|
||||
if got := toolContent.Get("0.type").String(); got != "text" {
|
||||
t.Fatalf("Expected first tool content type %q, got %q", "text", got)
|
||||
}
|
||||
if got := toolContent.Get("0.text").String(); got != "tool ok" {
|
||||
t.Fatalf("Expected first tool content text %q, got %q", "tool ok", got)
|
||||
}
|
||||
if got := toolContent.Get("1.type").String(); got != "image_url" {
|
||||
t.Fatalf("Expected second tool content type %q, got %q", "image_url", got)
|
||||
}
|
||||
if got := toolContent.Get("1.image_url.url").String(); got != "data:image/png;base64,iVBORw0KGgoAAAANSUhEUg==" {
|
||||
t.Fatalf("Unexpected image_url: %q", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestConvertClaudeRequestToOpenAI_ToolResultURLImageOnly(t *testing.T) {
|
||||
inputJSON := `{
|
||||
"model": "claude-3-opus",
|
||||
"messages": [
|
||||
{
|
||||
"role": "assistant",
|
||||
"content": [
|
||||
{"type": "tool_use", "id": "call_1", "name": "do_work", "input": {"a": 1}}
|
||||
]
|
||||
},
|
||||
{
|
||||
"role": "user",
|
||||
"content": [
|
||||
{
|
||||
"type": "tool_result",
|
||||
"tool_use_id": "call_1",
|
||||
"content": {
|
||||
"type": "image",
|
||||
"source": {
|
||||
"type": "url",
|
||||
"url": "https://example.com/tool.png"
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}`
|
||||
|
||||
result := ConvertClaudeRequestToOpenAI("test-model", []byte(inputJSON), false)
|
||||
resultJSON := gjson.ParseBytes(result)
|
||||
messages := resultJSON.Get("messages").Array()
|
||||
|
||||
if len(messages) != 2 {
|
||||
t.Fatalf("Expected 2 messages, got %d. Messages: %s", len(messages), resultJSON.Get("messages").Raw)
|
||||
}
|
||||
|
||||
toolContent := messages[1].Get("content")
|
||||
if !toolContent.IsArray() {
|
||||
t.Fatalf("Expected tool content array, got %s", toolContent.Raw)
|
||||
}
|
||||
if got := toolContent.Get("0.type").String(); got != "image_url" {
|
||||
t.Fatalf("Expected tool content type %q, got %q", "image_url", got)
|
||||
}
|
||||
if got := toolContent.Get("0.image_url.url").String(); got != "https://example.com/tool.png" {
|
||||
t.Fatalf("Unexpected image_url: %q", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestConvertClaudeRequestToOpenAI_AssistantTextToolUseTextOrder(t *testing.T) {
|
||||
inputJSON := `{
|
||||
"model": "claude-3-opus",
|
||||
|
||||
@@ -17,6 +17,7 @@ import (
|
||||
"github.com/router-for-me/CLIProxyAPI/v6/internal/config"
|
||||
"github.com/router-for-me/CLIProxyAPI/v6/internal/util"
|
||||
"github.com/router-for-me/CLIProxyAPI/v6/internal/watcher/diff"
|
||||
"github.com/router-for-me/CLIProxyAPI/v6/internal/watcher/synthesizer"
|
||||
coreauth "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
@@ -75,6 +76,7 @@ func (w *Watcher) reloadClients(rescanAuth bool, affectedOAuthProviders []string
|
||||
|
||||
w.lastAuthHashes = make(map[string]string)
|
||||
w.lastAuthContents = make(map[string]*coreauth.Auth)
|
||||
w.fileAuthsByPath = make(map[string]map[string]*coreauth.Auth)
|
||||
if resolvedAuthDir, errResolveAuthDir := util.ResolveAuthDir(cfg.AuthDir); errResolveAuthDir != nil {
|
||||
log.Errorf("failed to resolve auth directory for hash cache: %v", errResolveAuthDir)
|
||||
} else if resolvedAuthDir != "" {
|
||||
@@ -92,6 +94,17 @@ func (w *Watcher) reloadClients(rescanAuth bool, affectedOAuthProviders []string
|
||||
if errParse := json.Unmarshal(data, &auth); errParse == nil {
|
||||
w.lastAuthContents[normalizedPath] = &auth
|
||||
}
|
||||
ctx := &synthesizer.SynthesisContext{
|
||||
Config: cfg,
|
||||
AuthDir: resolvedAuthDir,
|
||||
Now: time.Now(),
|
||||
IDGenerator: synthesizer.NewStableIDGenerator(),
|
||||
}
|
||||
if generated := synthesizer.SynthesizeAuthFile(ctx, path, data); len(generated) > 0 {
|
||||
if pathAuths := authSliceToMap(generated); len(pathAuths) > 0 {
|
||||
w.fileAuthsByPath[normalizedPath] = pathAuths
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
@@ -143,13 +156,14 @@ func (w *Watcher) addOrUpdateClient(path string) {
|
||||
}
|
||||
|
||||
w.clientsMutex.Lock()
|
||||
|
||||
cfg := w.config
|
||||
if cfg == nil {
|
||||
if w.config == nil {
|
||||
log.Error("config is nil, cannot add or update client")
|
||||
w.clientsMutex.Unlock()
|
||||
return
|
||||
}
|
||||
if w.fileAuthsByPath == nil {
|
||||
w.fileAuthsByPath = make(map[string]map[string]*coreauth.Auth)
|
||||
}
|
||||
if prev, ok := w.lastAuthHashes[normalized]; ok && prev == curHash {
|
||||
log.Debugf("auth file unchanged (hash match), skipping reload: %s", filepath.Base(path))
|
||||
w.clientsMutex.Unlock()
|
||||
@@ -177,34 +191,86 @@ func (w *Watcher) addOrUpdateClient(path string) {
|
||||
}
|
||||
w.lastAuthContents[normalized] = &newAuth
|
||||
|
||||
w.clientsMutex.Unlock() // Unlock before the callback
|
||||
|
||||
w.refreshAuthState(false)
|
||||
|
||||
if w.reloadCallback != nil {
|
||||
log.Debugf("triggering server update callback after add/update")
|
||||
w.triggerServerUpdate(cfg)
|
||||
oldByID := make(map[string]*coreauth.Auth, len(w.fileAuthsByPath[normalized]))
|
||||
for id, a := range w.fileAuthsByPath[normalized] {
|
||||
oldByID[id] = a
|
||||
}
|
||||
|
||||
// Build synthesized auth entries for this single file only.
|
||||
sctx := &synthesizer.SynthesisContext{
|
||||
Config: w.config,
|
||||
AuthDir: w.authDir,
|
||||
Now: time.Now(),
|
||||
IDGenerator: synthesizer.NewStableIDGenerator(),
|
||||
}
|
||||
generated := synthesizer.SynthesizeAuthFile(sctx, path, data)
|
||||
newByID := authSliceToMap(generated)
|
||||
if len(newByID) > 0 {
|
||||
w.fileAuthsByPath[normalized] = newByID
|
||||
} else {
|
||||
delete(w.fileAuthsByPath, normalized)
|
||||
}
|
||||
updates := w.computePerPathUpdatesLocked(oldByID, newByID)
|
||||
w.clientsMutex.Unlock()
|
||||
|
||||
w.persistAuthAsync(fmt.Sprintf("Sync auth %s", filepath.Base(path)), path)
|
||||
w.dispatchAuthUpdates(updates)
|
||||
}
|
||||
|
||||
func (w *Watcher) removeClient(path string) {
|
||||
normalized := w.normalizeAuthPath(path)
|
||||
w.clientsMutex.Lock()
|
||||
|
||||
cfg := w.config
|
||||
oldByID := make(map[string]*coreauth.Auth, len(w.fileAuthsByPath[normalized]))
|
||||
for id, a := range w.fileAuthsByPath[normalized] {
|
||||
oldByID[id] = a
|
||||
}
|
||||
delete(w.lastAuthHashes, normalized)
|
||||
delete(w.lastAuthContents, normalized)
|
||||
delete(w.fileAuthsByPath, normalized)
|
||||
|
||||
w.clientsMutex.Unlock() // Release the lock before the callback
|
||||
updates := w.computePerPathUpdatesLocked(oldByID, map[string]*coreauth.Auth{})
|
||||
w.clientsMutex.Unlock()
|
||||
|
||||
w.refreshAuthState(false)
|
||||
|
||||
if w.reloadCallback != nil {
|
||||
log.Debugf("triggering server update callback after removal")
|
||||
w.triggerServerUpdate(cfg)
|
||||
}
|
||||
w.persistAuthAsync(fmt.Sprintf("Remove auth %s", filepath.Base(path)), path)
|
||||
w.dispatchAuthUpdates(updates)
|
||||
}
|
||||
|
||||
func (w *Watcher) computePerPathUpdatesLocked(oldByID, newByID map[string]*coreauth.Auth) []AuthUpdate {
|
||||
if w.currentAuths == nil {
|
||||
w.currentAuths = make(map[string]*coreauth.Auth)
|
||||
}
|
||||
updates := make([]AuthUpdate, 0, len(oldByID)+len(newByID))
|
||||
for id, newAuth := range newByID {
|
||||
existing, ok := w.currentAuths[id]
|
||||
if !ok {
|
||||
w.currentAuths[id] = newAuth.Clone()
|
||||
updates = append(updates, AuthUpdate{Action: AuthUpdateActionAdd, ID: id, Auth: newAuth.Clone()})
|
||||
continue
|
||||
}
|
||||
if !authEqual(existing, newAuth) {
|
||||
w.currentAuths[id] = newAuth.Clone()
|
||||
updates = append(updates, AuthUpdate{Action: AuthUpdateActionModify, ID: id, Auth: newAuth.Clone()})
|
||||
}
|
||||
}
|
||||
for id := range oldByID {
|
||||
if _, stillExists := newByID[id]; stillExists {
|
||||
continue
|
||||
}
|
||||
delete(w.currentAuths, id)
|
||||
updates = append(updates, AuthUpdate{Action: AuthUpdateActionDelete, ID: id})
|
||||
}
|
||||
return updates
|
||||
}
|
||||
|
||||
func authSliceToMap(auths []*coreauth.Auth) map[string]*coreauth.Auth {
|
||||
byID := make(map[string]*coreauth.Auth, len(auths))
|
||||
for _, a := range auths {
|
||||
if a == nil || strings.TrimSpace(a.ID) == "" {
|
||||
continue
|
||||
}
|
||||
byID[a.ID] = a
|
||||
}
|
||||
return byID
|
||||
}
|
||||
|
||||
func (w *Watcher) loadFileClients(cfg *config.Config) int {
|
||||
|
||||
@@ -14,6 +14,8 @@ import (
|
||||
coreauth "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth"
|
||||
)
|
||||
|
||||
var snapshotCoreAuthsFunc = snapshotCoreAuths
|
||||
|
||||
func (w *Watcher) setAuthUpdateQueue(queue chan<- AuthUpdate) {
|
||||
w.clientsMutex.Lock()
|
||||
defer w.clientsMutex.Unlock()
|
||||
@@ -76,7 +78,11 @@ func (w *Watcher) dispatchRuntimeAuthUpdate(update AuthUpdate) bool {
|
||||
}
|
||||
|
||||
func (w *Watcher) refreshAuthState(force bool) {
|
||||
auths := w.SnapshotCoreAuths()
|
||||
w.clientsMutex.RLock()
|
||||
cfg := w.config
|
||||
authDir := w.authDir
|
||||
w.clientsMutex.RUnlock()
|
||||
auths := snapshotCoreAuthsFunc(cfg, authDir)
|
||||
w.clientsMutex.Lock()
|
||||
if len(w.runtimeAuths) > 0 {
|
||||
for _, a := range w.runtimeAuths {
|
||||
|
||||
@@ -36,9 +36,6 @@ func (s *FileSynthesizer) Synthesize(ctx *SynthesisContext) ([]*coreauth.Auth, e
|
||||
return out, nil
|
||||
}
|
||||
|
||||
now := ctx.Now
|
||||
cfg := ctx.Config
|
||||
|
||||
for _, e := range entries {
|
||||
if e.IsDir() {
|
||||
continue
|
||||
@@ -52,99 +49,120 @@ func (s *FileSynthesizer) Synthesize(ctx *SynthesisContext) ([]*coreauth.Auth, e
|
||||
if errRead != nil || len(data) == 0 {
|
||||
continue
|
||||
}
|
||||
var metadata map[string]any
|
||||
if errUnmarshal := json.Unmarshal(data, &metadata); errUnmarshal != nil {
|
||||
auths := synthesizeFileAuths(ctx, full, data)
|
||||
if len(auths) == 0 {
|
||||
continue
|
||||
}
|
||||
t, _ := metadata["type"].(string)
|
||||
if t == "" {
|
||||
continue
|
||||
}
|
||||
provider := strings.ToLower(t)
|
||||
if provider == "gemini" {
|
||||
provider = "gemini-cli"
|
||||
}
|
||||
label := provider
|
||||
if email, _ := metadata["email"].(string); email != "" {
|
||||
label = email
|
||||
}
|
||||
// Use relative path under authDir as ID to stay consistent with the file-based token store
|
||||
id := full
|
||||
if rel, errRel := filepath.Rel(ctx.AuthDir, full); errRel == nil && rel != "" {
|
||||
id = rel
|
||||
}
|
||||
// On Windows, normalize ID casing to avoid duplicate auth entries caused by case-insensitive paths.
|
||||
if runtime.GOOS == "windows" {
|
||||
id = strings.ToLower(id)
|
||||
}
|
||||
|
||||
proxyURL := ""
|
||||
if p, ok := metadata["proxy_url"].(string); ok {
|
||||
proxyURL = p
|
||||
}
|
||||
|
||||
prefix := ""
|
||||
if rawPrefix, ok := metadata["prefix"].(string); ok {
|
||||
trimmed := strings.TrimSpace(rawPrefix)
|
||||
trimmed = strings.Trim(trimmed, "/")
|
||||
if trimmed != "" && !strings.Contains(trimmed, "/") {
|
||||
prefix = trimmed
|
||||
}
|
||||
}
|
||||
|
||||
disabled, _ := metadata["disabled"].(bool)
|
||||
status := coreauth.StatusActive
|
||||
if disabled {
|
||||
status = coreauth.StatusDisabled
|
||||
}
|
||||
|
||||
// Read per-account excluded models from the OAuth JSON file
|
||||
perAccountExcluded := extractExcludedModelsFromMetadata(metadata)
|
||||
|
||||
a := &coreauth.Auth{
|
||||
ID: id,
|
||||
Provider: provider,
|
||||
Label: label,
|
||||
Prefix: prefix,
|
||||
Status: status,
|
||||
Disabled: disabled,
|
||||
Attributes: map[string]string{
|
||||
"source": full,
|
||||
"path": full,
|
||||
},
|
||||
ProxyURL: proxyURL,
|
||||
Metadata: metadata,
|
||||
CreatedAt: now,
|
||||
UpdatedAt: now,
|
||||
}
|
||||
// Read priority from auth file
|
||||
if rawPriority, ok := metadata["priority"]; ok {
|
||||
switch v := rawPriority.(type) {
|
||||
case float64:
|
||||
a.Attributes["priority"] = strconv.Itoa(int(v))
|
||||
case string:
|
||||
priority := strings.TrimSpace(v)
|
||||
if _, errAtoi := strconv.Atoi(priority); errAtoi == nil {
|
||||
a.Attributes["priority"] = priority
|
||||
}
|
||||
}
|
||||
}
|
||||
ApplyAuthExcludedModelsMeta(a, cfg, perAccountExcluded, "oauth")
|
||||
if provider == "gemini-cli" {
|
||||
if virtuals := SynthesizeGeminiVirtualAuths(a, metadata, now); len(virtuals) > 0 {
|
||||
for _, v := range virtuals {
|
||||
ApplyAuthExcludedModelsMeta(v, cfg, perAccountExcluded, "oauth")
|
||||
}
|
||||
out = append(out, a)
|
||||
out = append(out, virtuals...)
|
||||
continue
|
||||
}
|
||||
}
|
||||
out = append(out, a)
|
||||
out = append(out, auths...)
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// SynthesizeAuthFile generates Auth entries for one auth JSON file payload.
|
||||
// It shares exactly the same mapping behavior as FileSynthesizer.Synthesize.
|
||||
func SynthesizeAuthFile(ctx *SynthesisContext, fullPath string, data []byte) []*coreauth.Auth {
|
||||
return synthesizeFileAuths(ctx, fullPath, data)
|
||||
}
|
||||
|
||||
func synthesizeFileAuths(ctx *SynthesisContext, fullPath string, data []byte) []*coreauth.Auth {
|
||||
if ctx == nil || len(data) == 0 {
|
||||
return nil
|
||||
}
|
||||
now := ctx.Now
|
||||
cfg := ctx.Config
|
||||
var metadata map[string]any
|
||||
if errUnmarshal := json.Unmarshal(data, &metadata); errUnmarshal != nil {
|
||||
return nil
|
||||
}
|
||||
t, _ := metadata["type"].(string)
|
||||
if t == "" {
|
||||
return nil
|
||||
}
|
||||
provider := strings.ToLower(t)
|
||||
if provider == "gemini" {
|
||||
provider = "gemini-cli"
|
||||
}
|
||||
label := provider
|
||||
if email, _ := metadata["email"].(string); email != "" {
|
||||
label = email
|
||||
}
|
||||
// Use relative path under authDir as ID to stay consistent with the file-based token store.
|
||||
id := fullPath
|
||||
if strings.TrimSpace(ctx.AuthDir) != "" {
|
||||
if rel, errRel := filepath.Rel(ctx.AuthDir, fullPath); errRel == nil && rel != "" {
|
||||
id = rel
|
||||
}
|
||||
}
|
||||
if runtime.GOOS == "windows" {
|
||||
id = strings.ToLower(id)
|
||||
}
|
||||
|
||||
proxyURL := ""
|
||||
if p, ok := metadata["proxy_url"].(string); ok {
|
||||
proxyURL = p
|
||||
}
|
||||
|
||||
prefix := ""
|
||||
if rawPrefix, ok := metadata["prefix"].(string); ok {
|
||||
trimmed := strings.TrimSpace(rawPrefix)
|
||||
trimmed = strings.Trim(trimmed, "/")
|
||||
if trimmed != "" && !strings.Contains(trimmed, "/") {
|
||||
prefix = trimmed
|
||||
}
|
||||
}
|
||||
|
||||
disabled, _ := metadata["disabled"].(bool)
|
||||
status := coreauth.StatusActive
|
||||
if disabled {
|
||||
status = coreauth.StatusDisabled
|
||||
}
|
||||
|
||||
// Read per-account excluded models from the OAuth JSON file.
|
||||
perAccountExcluded := extractExcludedModelsFromMetadata(metadata)
|
||||
|
||||
a := &coreauth.Auth{
|
||||
ID: id,
|
||||
Provider: provider,
|
||||
Label: label,
|
||||
Prefix: prefix,
|
||||
Status: status,
|
||||
Disabled: disabled,
|
||||
Attributes: map[string]string{
|
||||
"source": fullPath,
|
||||
"path": fullPath,
|
||||
},
|
||||
ProxyURL: proxyURL,
|
||||
Metadata: metadata,
|
||||
CreatedAt: now,
|
||||
UpdatedAt: now,
|
||||
}
|
||||
// Read priority from auth file.
|
||||
if rawPriority, ok := metadata["priority"]; ok {
|
||||
switch v := rawPriority.(type) {
|
||||
case float64:
|
||||
a.Attributes["priority"] = strconv.Itoa(int(v))
|
||||
case string:
|
||||
priority := strings.TrimSpace(v)
|
||||
if _, errAtoi := strconv.Atoi(priority); errAtoi == nil {
|
||||
a.Attributes["priority"] = priority
|
||||
}
|
||||
}
|
||||
}
|
||||
ApplyAuthExcludedModelsMeta(a, cfg, perAccountExcluded, "oauth")
|
||||
if provider == "gemini-cli" {
|
||||
if virtuals := SynthesizeGeminiVirtualAuths(a, metadata, now); len(virtuals) > 0 {
|
||||
for _, v := range virtuals {
|
||||
ApplyAuthExcludedModelsMeta(v, cfg, perAccountExcluded, "oauth")
|
||||
}
|
||||
out := make([]*coreauth.Auth, 0, 1+len(virtuals))
|
||||
out = append(out, a)
|
||||
out = append(out, virtuals...)
|
||||
return out
|
||||
}
|
||||
}
|
||||
return []*coreauth.Auth{a}
|
||||
}
|
||||
|
||||
// SynthesizeGeminiVirtualAuths creates virtual Auth entries for multi-project Gemini credentials.
|
||||
// It disables the primary auth and creates one virtual auth per project.
|
||||
func SynthesizeGeminiVirtualAuths(primary *coreauth.Auth, metadata map[string]any, now time.Time) []*coreauth.Auth {
|
||||
|
||||
@@ -45,6 +45,7 @@ type Watcher struct {
|
||||
watcher *fsnotify.Watcher
|
||||
lastAuthHashes map[string]string
|
||||
lastAuthContents map[string]*coreauth.Auth
|
||||
fileAuthsByPath map[string]map[string]*coreauth.Auth
|
||||
lastRemoveTimes map[string]time.Time
|
||||
lastConfigHash string
|
||||
authQueue chan<- AuthUpdate
|
||||
@@ -92,11 +93,12 @@ func NewWatcher(configPath, authDir string, reloadCallback func(*config.Config))
|
||||
return nil, errNewWatcher
|
||||
}
|
||||
w := &Watcher{
|
||||
configPath: configPath,
|
||||
authDir: authDir,
|
||||
reloadCallback: reloadCallback,
|
||||
watcher: watcher,
|
||||
lastAuthHashes: make(map[string]string),
|
||||
configPath: configPath,
|
||||
authDir: authDir,
|
||||
reloadCallback: reloadCallback,
|
||||
watcher: watcher,
|
||||
lastAuthHashes: make(map[string]string),
|
||||
fileAuthsByPath: make(map[string]map[string]*coreauth.Auth),
|
||||
}
|
||||
w.dispatchCond = sync.NewCond(&w.dispatchMu)
|
||||
if store := sdkAuth.GetTokenStore(); store != nil {
|
||||
|
||||
@@ -406,8 +406,8 @@ func TestAddOrUpdateClientTriggersReloadAndHash(t *testing.T) {
|
||||
|
||||
w.addOrUpdateClient(authFile)
|
||||
|
||||
if got := atomic.LoadInt32(&reloads); got != 1 {
|
||||
t.Fatalf("expected reload callback once, got %d", got)
|
||||
if got := atomic.LoadInt32(&reloads); got != 0 {
|
||||
t.Fatalf("expected no reload callback for auth update, got %d", got)
|
||||
}
|
||||
// Use normalizeAuthPath to match how addOrUpdateClient stores the key
|
||||
normalized := w.normalizeAuthPath(authFile)
|
||||
@@ -436,8 +436,110 @@ func TestRemoveClientRemovesHash(t *testing.T) {
|
||||
if _, ok := w.lastAuthHashes[w.normalizeAuthPath(authFile)]; ok {
|
||||
t.Fatal("expected hash to be removed after deletion")
|
||||
}
|
||||
if got := atomic.LoadInt32(&reloads); got != 1 {
|
||||
t.Fatalf("expected reload callback once, got %d", got)
|
||||
if got := atomic.LoadInt32(&reloads); got != 0 {
|
||||
t.Fatalf("expected no reload callback for auth removal, got %d", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAuthFileEventsDoNotInvokeSnapshotCoreAuths(t *testing.T) {
|
||||
tmpDir := t.TempDir()
|
||||
authFile := filepath.Join(tmpDir, "sample.json")
|
||||
if err := os.WriteFile(authFile, []byte(`{"type":"codex","email":"u@example.com"}`), 0o644); err != nil {
|
||||
t.Fatalf("failed to create auth file: %v", err)
|
||||
}
|
||||
|
||||
origSnapshot := snapshotCoreAuthsFunc
|
||||
var snapshotCalls int32
|
||||
snapshotCoreAuthsFunc = func(cfg *config.Config, authDir string) []*coreauth.Auth {
|
||||
atomic.AddInt32(&snapshotCalls, 1)
|
||||
return origSnapshot(cfg, authDir)
|
||||
}
|
||||
defer func() { snapshotCoreAuthsFunc = origSnapshot }()
|
||||
|
||||
w := &Watcher{
|
||||
authDir: tmpDir,
|
||||
lastAuthHashes: make(map[string]string),
|
||||
lastAuthContents: make(map[string]*coreauth.Auth),
|
||||
fileAuthsByPath: make(map[string]map[string]*coreauth.Auth),
|
||||
}
|
||||
w.SetConfig(&config.Config{AuthDir: tmpDir})
|
||||
|
||||
w.addOrUpdateClient(authFile)
|
||||
w.removeClient(authFile)
|
||||
|
||||
if got := atomic.LoadInt32(&snapshotCalls); got != 0 {
|
||||
t.Fatalf("expected auth file events to avoid full snapshot, got %d calls", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAuthSliceToMap(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
valid1 := &coreauth.Auth{ID: "a"}
|
||||
valid2 := &coreauth.Auth{ID: "b"}
|
||||
dupOld := &coreauth.Auth{ID: "dup", Label: "old"}
|
||||
dupNew := &coreauth.Auth{ID: "dup", Label: "new"}
|
||||
empty := &coreauth.Auth{ID: " "}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
in []*coreauth.Auth
|
||||
want map[string]*coreauth.Auth
|
||||
}{
|
||||
{
|
||||
name: "nil input",
|
||||
in: nil,
|
||||
want: map[string]*coreauth.Auth{},
|
||||
},
|
||||
{
|
||||
name: "empty input",
|
||||
in: []*coreauth.Auth{},
|
||||
want: map[string]*coreauth.Auth{},
|
||||
},
|
||||
{
|
||||
name: "filters invalid auths",
|
||||
in: []*coreauth.Auth{nil, empty},
|
||||
want: map[string]*coreauth.Auth{},
|
||||
},
|
||||
{
|
||||
name: "keeps valid auths",
|
||||
in: []*coreauth.Auth{valid1, nil, valid2},
|
||||
want: map[string]*coreauth.Auth{"a": valid1, "b": valid2},
|
||||
},
|
||||
{
|
||||
name: "last duplicate wins",
|
||||
in: []*coreauth.Auth{dupOld, dupNew},
|
||||
want: map[string]*coreauth.Auth{"dup": dupNew},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
tc := tc
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
got := authSliceToMap(tc.in)
|
||||
if len(tc.want) == 0 {
|
||||
if got == nil {
|
||||
t.Fatal("expected empty map, got nil")
|
||||
}
|
||||
if len(got) != 0 {
|
||||
t.Fatalf("expected empty map, got %#v", got)
|
||||
}
|
||||
return
|
||||
}
|
||||
if len(got) != len(tc.want) {
|
||||
t.Fatalf("unexpected map length: got %d, want %d", len(got), len(tc.want))
|
||||
}
|
||||
for id, wantAuth := range tc.want {
|
||||
gotAuth, ok := got[id]
|
||||
if !ok {
|
||||
t.Fatalf("missing id %q in result map", id)
|
||||
}
|
||||
if !authEqual(gotAuth, wantAuth) {
|
||||
t.Fatalf("unexpected auth for id %q: got %#v, want %#v", id, gotAuth, wantAuth)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -695,8 +797,8 @@ func TestHandleEventRemovesAuthFile(t *testing.T) {
|
||||
|
||||
w.handleEvent(fsnotify.Event{Name: authFile, Op: fsnotify.Remove})
|
||||
|
||||
if atomic.LoadInt32(&reloads) != 1 {
|
||||
t.Fatalf("expected reload callback once, got %d", reloads)
|
||||
if atomic.LoadInt32(&reloads) != 0 {
|
||||
t.Fatalf("expected no reload callback for auth removal, got %d", reloads)
|
||||
}
|
||||
if _, ok := w.lastAuthHashes[w.normalizeAuthPath(authFile)]; ok {
|
||||
t.Fatal("expected hash entry to be removed")
|
||||
@@ -893,8 +995,8 @@ func TestHandleEventAuthWriteTriggersUpdate(t *testing.T) {
|
||||
w.SetConfig(&config.Config{AuthDir: authDir})
|
||||
|
||||
w.handleEvent(fsnotify.Event{Name: authFile, Op: fsnotify.Write})
|
||||
if atomic.LoadInt32(&reloads) != 1 {
|
||||
t.Fatalf("expected auth write to trigger reload callback, got %d", reloads)
|
||||
if atomic.LoadInt32(&reloads) != 0 {
|
||||
t.Fatalf("expected auth write to avoid global reload callback, got %d", reloads)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -990,8 +1092,8 @@ func TestHandleEventAtomicReplaceChangedTriggersUpdate(t *testing.T) {
|
||||
w.lastAuthHashes[w.normalizeAuthPath(authFile)] = hexString(oldSum[:])
|
||||
|
||||
w.handleEvent(fsnotify.Event{Name: authFile, Op: fsnotify.Rename})
|
||||
if atomic.LoadInt32(&reloads) != 1 {
|
||||
t.Fatalf("expected changed atomic replace to trigger update, got %d", reloads)
|
||||
if atomic.LoadInt32(&reloads) != 0 {
|
||||
t.Fatalf("expected changed atomic replace to avoid global reload, got %d", reloads)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1045,8 +1147,8 @@ func TestHandleEventRemoveKnownFileDeletes(t *testing.T) {
|
||||
w.lastAuthHashes[w.normalizeAuthPath(authFile)] = "hash"
|
||||
|
||||
w.handleEvent(fsnotify.Event{Name: authFile, Op: fsnotify.Remove})
|
||||
if atomic.LoadInt32(&reloads) != 1 {
|
||||
t.Fatalf("expected known remove to trigger reload, got %d", reloads)
|
||||
if atomic.LoadInt32(&reloads) != 0 {
|
||||
t.Fatalf("expected known remove to avoid global reload, got %d", reloads)
|
||||
}
|
||||
if _, ok := w.lastAuthHashes[w.normalizeAuthPath(authFile)]; ok {
|
||||
t.Fatal("expected known auth hash to be deleted")
|
||||
|
||||
@@ -14,7 +14,11 @@ import (
|
||||
"github.com/google/uuid"
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/router-for-me/CLIProxyAPI/v6/internal/interfaces"
|
||||
"github.com/router-for-me/CLIProxyAPI/v6/internal/registry"
|
||||
"github.com/router-for-me/CLIProxyAPI/v6/internal/thinking"
|
||||
"github.com/router-for-me/CLIProxyAPI/v6/internal/util"
|
||||
"github.com/router-for-me/CLIProxyAPI/v6/sdk/api/handlers"
|
||||
coreauth "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth"
|
||||
cliproxyexecutor "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/executor"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/tidwall/gjson"
|
||||
@@ -26,7 +30,6 @@ const (
|
||||
wsRequestTypeAppend = "response.append"
|
||||
wsEventTypeError = "error"
|
||||
wsEventTypeCompleted = "response.completed"
|
||||
wsEventTypeDone = "response.done"
|
||||
wsDoneMarker = "[DONE]"
|
||||
wsTurnStateHeader = "x-codex-turn-state"
|
||||
wsRequestBodyKey = "REQUEST_BODY_OVERRIDE"
|
||||
@@ -101,11 +104,17 @@ func (h *OpenAIResponsesAPIHandler) ResponsesWebsocket(c *gin.Context) {
|
||||
// )
|
||||
appendWebsocketEvent(&wsBodyLog, "request", payload)
|
||||
|
||||
allowIncrementalInputWithPreviousResponseID := websocketUpstreamSupportsIncrementalInput(nil, nil)
|
||||
allowIncrementalInputWithPreviousResponseID := false
|
||||
if pinnedAuthID != "" && h != nil && h.AuthManager != nil {
|
||||
if pinnedAuth, ok := h.AuthManager.GetByID(pinnedAuthID); ok && pinnedAuth != nil {
|
||||
allowIncrementalInputWithPreviousResponseID = websocketUpstreamSupportsIncrementalInput(pinnedAuth.Attributes, pinnedAuth.Metadata)
|
||||
}
|
||||
} else {
|
||||
requestModelName := strings.TrimSpace(gjson.GetBytes(payload, "model").String())
|
||||
if requestModelName == "" {
|
||||
requestModelName = strings.TrimSpace(gjson.GetBytes(lastRequest, "model").String())
|
||||
}
|
||||
allowIncrementalInputWithPreviousResponseID = h.websocketUpstreamSupportsIncrementalInputForModel(requestModelName)
|
||||
}
|
||||
|
||||
var requestJSON []byte
|
||||
@@ -140,6 +149,22 @@ func (h *OpenAIResponsesAPIHandler) ResponsesWebsocket(c *gin.Context) {
|
||||
}
|
||||
continue
|
||||
}
|
||||
if shouldHandleResponsesWebsocketPrewarmLocally(payload, lastRequest, allowIncrementalInputWithPreviousResponseID) {
|
||||
if updated, errDelete := sjson.DeleteBytes(requestJSON, "generate"); errDelete == nil {
|
||||
requestJSON = updated
|
||||
}
|
||||
if updated, errDelete := sjson.DeleteBytes(updatedLastRequest, "generate"); errDelete == nil {
|
||||
updatedLastRequest = updated
|
||||
}
|
||||
lastRequest = updatedLastRequest
|
||||
lastResponseOutput = []byte("[]")
|
||||
if errWrite := writeResponsesWebsocketSyntheticPrewarm(c, conn, requestJSON, &wsBodyLog, passthroughSessionID); errWrite != nil {
|
||||
wsTerminateErr = errWrite
|
||||
appendWebsocketEvent(&wsBodyLog, "disconnect", []byte(errWrite.Error()))
|
||||
return
|
||||
}
|
||||
continue
|
||||
}
|
||||
lastRequest = updatedLastRequest
|
||||
|
||||
modelName := gjson.GetBytes(requestJSON, "model").String()
|
||||
@@ -340,6 +365,192 @@ func websocketUpstreamSupportsIncrementalInput(attributes map[string]string, met
|
||||
return false
|
||||
}
|
||||
|
||||
func (h *OpenAIResponsesAPIHandler) websocketUpstreamSupportsIncrementalInputForModel(modelName string) bool {
|
||||
if h == nil || h.AuthManager == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
resolvedModelName := modelName
|
||||
initialSuffix := thinking.ParseSuffix(modelName)
|
||||
if initialSuffix.ModelName == "auto" {
|
||||
resolvedBase := util.ResolveAutoModel(initialSuffix.ModelName)
|
||||
if initialSuffix.HasSuffix {
|
||||
resolvedModelName = fmt.Sprintf("%s(%s)", resolvedBase, initialSuffix.RawSuffix)
|
||||
} else {
|
||||
resolvedModelName = resolvedBase
|
||||
}
|
||||
} else {
|
||||
resolvedModelName = util.ResolveAutoModel(modelName)
|
||||
}
|
||||
|
||||
parsed := thinking.ParseSuffix(resolvedModelName)
|
||||
baseModel := strings.TrimSpace(parsed.ModelName)
|
||||
providers := util.GetProviderName(baseModel)
|
||||
if len(providers) == 0 && baseModel != resolvedModelName {
|
||||
providers = util.GetProviderName(resolvedModelName)
|
||||
}
|
||||
if len(providers) == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
providerSet := make(map[string]struct{}, len(providers))
|
||||
for i := 0; i < len(providers); i++ {
|
||||
providerKey := strings.TrimSpace(strings.ToLower(providers[i]))
|
||||
if providerKey == "" {
|
||||
continue
|
||||
}
|
||||
providerSet[providerKey] = struct{}{}
|
||||
}
|
||||
if len(providerSet) == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
modelKey := baseModel
|
||||
if modelKey == "" {
|
||||
modelKey = strings.TrimSpace(resolvedModelName)
|
||||
}
|
||||
registryRef := registry.GetGlobalRegistry()
|
||||
now := time.Now()
|
||||
auths := h.AuthManager.List()
|
||||
for i := 0; i < len(auths); i++ {
|
||||
auth := auths[i]
|
||||
if auth == nil {
|
||||
continue
|
||||
}
|
||||
providerKey := strings.TrimSpace(strings.ToLower(auth.Provider))
|
||||
if _, ok := providerSet[providerKey]; !ok {
|
||||
continue
|
||||
}
|
||||
if modelKey != "" && registryRef != nil && !registryRef.ClientSupportsModel(auth.ID, modelKey) {
|
||||
continue
|
||||
}
|
||||
if !responsesWebsocketAuthAvailableForModel(auth, modelKey, now) {
|
||||
continue
|
||||
}
|
||||
if websocketUpstreamSupportsIncrementalInput(auth.Attributes, auth.Metadata) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func responsesWebsocketAuthAvailableForModel(auth *coreauth.Auth, modelName string, now time.Time) bool {
|
||||
if auth == nil {
|
||||
return false
|
||||
}
|
||||
if auth.Disabled || auth.Status == coreauth.StatusDisabled {
|
||||
return false
|
||||
}
|
||||
if modelName != "" && len(auth.ModelStates) > 0 {
|
||||
state, ok := auth.ModelStates[modelName]
|
||||
if (!ok || state == nil) && modelName != "" {
|
||||
baseModel := strings.TrimSpace(thinking.ParseSuffix(modelName).ModelName)
|
||||
if baseModel != "" && baseModel != modelName {
|
||||
state, ok = auth.ModelStates[baseModel]
|
||||
}
|
||||
}
|
||||
if ok && state != nil {
|
||||
if state.Status == coreauth.StatusDisabled {
|
||||
return false
|
||||
}
|
||||
if state.Unavailable && !state.NextRetryAfter.IsZero() && state.NextRetryAfter.After(now) {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
}
|
||||
if auth.Unavailable && !auth.NextRetryAfter.IsZero() && auth.NextRetryAfter.After(now) {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func shouldHandleResponsesWebsocketPrewarmLocally(rawJSON []byte, lastRequest []byte, allowIncrementalInputWithPreviousResponseID bool) bool {
|
||||
if allowIncrementalInputWithPreviousResponseID || len(lastRequest) != 0 {
|
||||
return false
|
||||
}
|
||||
if strings.TrimSpace(gjson.GetBytes(rawJSON, "type").String()) != wsRequestTypeCreate {
|
||||
return false
|
||||
}
|
||||
generateResult := gjson.GetBytes(rawJSON, "generate")
|
||||
return generateResult.Exists() && !generateResult.Bool()
|
||||
}
|
||||
|
||||
func writeResponsesWebsocketSyntheticPrewarm(
|
||||
c *gin.Context,
|
||||
conn *websocket.Conn,
|
||||
requestJSON []byte,
|
||||
wsBodyLog *strings.Builder,
|
||||
sessionID string,
|
||||
) error {
|
||||
payloads, errPayloads := syntheticResponsesWebsocketPrewarmPayloads(requestJSON)
|
||||
if errPayloads != nil {
|
||||
return errPayloads
|
||||
}
|
||||
for i := 0; i < len(payloads); i++ {
|
||||
markAPIResponseTimestamp(c)
|
||||
appendWebsocketEvent(wsBodyLog, "response", payloads[i])
|
||||
// log.Infof(
|
||||
// "responses websocket: downstream_out id=%s type=%d event=%s payload=%s",
|
||||
// sessionID,
|
||||
// websocket.TextMessage,
|
||||
// websocketPayloadEventType(payloads[i]),
|
||||
// websocketPayloadPreview(payloads[i]),
|
||||
// )
|
||||
if errWrite := conn.WriteMessage(websocket.TextMessage, payloads[i]); errWrite != nil {
|
||||
log.Warnf(
|
||||
"responses websocket: downstream_out write failed id=%s event=%s error=%v",
|
||||
sessionID,
|
||||
websocketPayloadEventType(payloads[i]),
|
||||
errWrite,
|
||||
)
|
||||
return errWrite
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func syntheticResponsesWebsocketPrewarmPayloads(requestJSON []byte) ([][]byte, error) {
|
||||
responseID := "resp_prewarm_" + uuid.NewString()
|
||||
createdAt := time.Now().Unix()
|
||||
modelName := strings.TrimSpace(gjson.GetBytes(requestJSON, "model").String())
|
||||
|
||||
createdPayload := []byte(`{"type":"response.created","sequence_number":0,"response":{"id":"","object":"response","created_at":0,"status":"in_progress","background":false,"error":null,"output":[]}}`)
|
||||
var errSet error
|
||||
createdPayload, errSet = sjson.SetBytes(createdPayload, "response.id", responseID)
|
||||
if errSet != nil {
|
||||
return nil, errSet
|
||||
}
|
||||
createdPayload, errSet = sjson.SetBytes(createdPayload, "response.created_at", createdAt)
|
||||
if errSet != nil {
|
||||
return nil, errSet
|
||||
}
|
||||
if modelName != "" {
|
||||
createdPayload, errSet = sjson.SetBytes(createdPayload, "response.model", modelName)
|
||||
if errSet != nil {
|
||||
return nil, errSet
|
||||
}
|
||||
}
|
||||
|
||||
completedPayload := []byte(`{"type":"response.completed","sequence_number":1,"response":{"id":"","object":"response","created_at":0,"status":"completed","background":false,"error":null,"output":[],"usage":{"input_tokens":0,"output_tokens":0,"total_tokens":0}}}`)
|
||||
completedPayload, errSet = sjson.SetBytes(completedPayload, "response.id", responseID)
|
||||
if errSet != nil {
|
||||
return nil, errSet
|
||||
}
|
||||
completedPayload, errSet = sjson.SetBytes(completedPayload, "response.created_at", createdAt)
|
||||
if errSet != nil {
|
||||
return nil, errSet
|
||||
}
|
||||
if modelName != "" {
|
||||
completedPayload, errSet = sjson.SetBytes(completedPayload, "response.model", modelName)
|
||||
if errSet != nil {
|
||||
return nil, errSet
|
||||
}
|
||||
}
|
||||
|
||||
return [][]byte{createdPayload, completedPayload}, nil
|
||||
}
|
||||
|
||||
func mergeJSONArrayRaw(existingRaw, appendRaw string) (string, error) {
|
||||
existingRaw = strings.TrimSpace(existingRaw)
|
||||
appendRaw = strings.TrimSpace(appendRaw)
|
||||
@@ -469,9 +680,6 @@ func (h *OpenAIResponsesAPIHandler) forwardResponsesWebsocket(
|
||||
for i := range payloads {
|
||||
eventType := gjson.GetBytes(payloads[i], "type").String()
|
||||
if eventType == wsEventTypeCompleted {
|
||||
// log.Infof("replace %s with %s", wsEventTypeCompleted, wsEventTypeDone)
|
||||
payloads[i], _ = sjson.SetBytes(payloads[i], "type", wsEventTypeDone)
|
||||
|
||||
completed = true
|
||||
completedOutput = responseCompletedOutputFromPayload(payloads[i])
|
||||
}
|
||||
@@ -554,47 +762,63 @@ func writeResponsesWebsocketError(conn *websocket.Conn, errMsg *interfaces.Error
|
||||
}
|
||||
|
||||
body := handlers.BuildErrorResponseBody(status, errText)
|
||||
payload := map[string]any{
|
||||
"type": wsEventTypeError,
|
||||
"status": status,
|
||||
payload := []byte(`{}`)
|
||||
var errSet error
|
||||
payload, errSet = sjson.SetBytes(payload, "type", wsEventTypeError)
|
||||
if errSet != nil {
|
||||
return nil, errSet
|
||||
}
|
||||
payload, errSet = sjson.SetBytes(payload, "status", status)
|
||||
if errSet != nil {
|
||||
return nil, errSet
|
||||
}
|
||||
|
||||
if errMsg != nil && errMsg.Addon != nil {
|
||||
headers := map[string]any{}
|
||||
headers := []byte(`{}`)
|
||||
hasHeaders := false
|
||||
for key, values := range errMsg.Addon {
|
||||
if len(values) == 0 {
|
||||
continue
|
||||
}
|
||||
headers[key] = values[0]
|
||||
headerPath := strings.ReplaceAll(strings.ReplaceAll(key, `\\`, `\\\\`), ".", `\\.`)
|
||||
headers, errSet = sjson.SetBytes(headers, headerPath, values[0])
|
||||
if errSet != nil {
|
||||
return nil, errSet
|
||||
}
|
||||
hasHeaders = true
|
||||
}
|
||||
if len(headers) > 0 {
|
||||
payload["headers"] = headers
|
||||
}
|
||||
}
|
||||
|
||||
if len(body) > 0 && json.Valid(body) {
|
||||
var decoded map[string]any
|
||||
if errDecode := json.Unmarshal(body, &decoded); errDecode == nil {
|
||||
if inner, ok := decoded["error"]; ok {
|
||||
payload["error"] = inner
|
||||
} else {
|
||||
payload["error"] = decoded
|
||||
if hasHeaders {
|
||||
payload, errSet = sjson.SetRawBytes(payload, "headers", headers)
|
||||
if errSet != nil {
|
||||
return nil, errSet
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if _, ok := payload["error"]; !ok {
|
||||
payload["error"] = map[string]any{
|
||||
"type": "server_error",
|
||||
"message": errText,
|
||||
if len(body) > 0 && json.Valid(body) {
|
||||
errorNode := gjson.GetBytes(body, "error")
|
||||
if errorNode.Exists() {
|
||||
payload, errSet = sjson.SetRawBytes(payload, "error", []byte(errorNode.Raw))
|
||||
} else {
|
||||
payload, errSet = sjson.SetRawBytes(payload, "error", body)
|
||||
}
|
||||
if errSet != nil {
|
||||
return nil, errSet
|
||||
}
|
||||
}
|
||||
|
||||
data, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
if !gjson.GetBytes(payload, "error").Exists() {
|
||||
payload, errSet = sjson.SetBytes(payload, "error.type", "server_error")
|
||||
if errSet != nil {
|
||||
return nil, errSet
|
||||
}
|
||||
payload, errSet = sjson.SetBytes(payload, "error.message", errText)
|
||||
if errSet != nil {
|
||||
return nil, errSet
|
||||
}
|
||||
}
|
||||
return data, conn.WriteMessage(websocket.TextMessage, data)
|
||||
|
||||
return payload, conn.WriteMessage(websocket.TextMessage, payload)
|
||||
}
|
||||
|
||||
func appendWebsocketEvent(builder *strings.Builder, eventType string, payload []byte) {
|
||||
|
||||
@@ -2,15 +2,57 @@ package openai
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/router-for-me/CLIProxyAPI/v6/internal/interfaces"
|
||||
"github.com/router-for-me/CLIProxyAPI/v6/internal/registry"
|
||||
"github.com/router-for-me/CLIProxyAPI/v6/sdk/api/handlers"
|
||||
coreauth "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth"
|
||||
coreexecutor "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/executor"
|
||||
sdkconfig "github.com/router-for-me/CLIProxyAPI/v6/sdk/config"
|
||||
"github.com/tidwall/gjson"
|
||||
)
|
||||
|
||||
type websocketCaptureExecutor struct {
|
||||
streamCalls int
|
||||
payloads [][]byte
|
||||
}
|
||||
|
||||
func (e *websocketCaptureExecutor) Identifier() string { return "test-provider" }
|
||||
|
||||
func (e *websocketCaptureExecutor) Execute(context.Context, *coreauth.Auth, coreexecutor.Request, coreexecutor.Options) (coreexecutor.Response, error) {
|
||||
return coreexecutor.Response{}, errors.New("not implemented")
|
||||
}
|
||||
|
||||
func (e *websocketCaptureExecutor) ExecuteStream(_ context.Context, _ *coreauth.Auth, req coreexecutor.Request, _ coreexecutor.Options) (*coreexecutor.StreamResult, error) {
|
||||
e.streamCalls++
|
||||
e.payloads = append(e.payloads, bytes.Clone(req.Payload))
|
||||
chunks := make(chan coreexecutor.StreamChunk, 1)
|
||||
chunks <- coreexecutor.StreamChunk{Payload: []byte(`{"type":"response.completed","response":{"id":"resp-upstream","output":[{"type":"message","id":"out-1"}]}}`)}
|
||||
close(chunks)
|
||||
return &coreexecutor.StreamResult{Chunks: chunks}, nil
|
||||
}
|
||||
|
||||
func (e *websocketCaptureExecutor) Refresh(_ context.Context, auth *coreauth.Auth) (*coreauth.Auth, error) {
|
||||
return auth, nil
|
||||
}
|
||||
|
||||
func (e *websocketCaptureExecutor) CountTokens(context.Context, *coreauth.Auth, coreexecutor.Request, coreexecutor.Options) (coreexecutor.Response, error) {
|
||||
return coreexecutor.Response{}, errors.New("not implemented")
|
||||
}
|
||||
|
||||
func (e *websocketCaptureExecutor) HttpRequest(context.Context, *coreauth.Auth, *http.Request) (*http.Response, error) {
|
||||
return nil, errors.New("not implemented")
|
||||
}
|
||||
|
||||
func TestNormalizeResponsesWebsocketRequestCreate(t *testing.T) {
|
||||
raw := []byte(`{"type":"response.create","model":"test-model","stream":false,"input":[{"type":"message","id":"msg-1"}]}`)
|
||||
|
||||
@@ -247,3 +289,206 @@ func TestSetWebsocketRequestBody(t *testing.T) {
|
||||
t.Fatalf("request body = %q, want %q", string(bodyBytes), "event body")
|
||||
}
|
||||
}
|
||||
|
||||
func TestForwardResponsesWebsocketPreservesCompletedEvent(t *testing.T) {
|
||||
gin.SetMode(gin.TestMode)
|
||||
|
||||
serverErrCh := make(chan error, 1)
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
conn, err := responsesWebsocketUpgrader.Upgrade(w, r, nil)
|
||||
if err != nil {
|
||||
serverErrCh <- err
|
||||
return
|
||||
}
|
||||
defer func() {
|
||||
errClose := conn.Close()
|
||||
if errClose != nil {
|
||||
serverErrCh <- errClose
|
||||
}
|
||||
}()
|
||||
|
||||
ctx, _ := gin.CreateTestContext(httptest.NewRecorder())
|
||||
ctx.Request = r
|
||||
|
||||
data := make(chan []byte, 1)
|
||||
errCh := make(chan *interfaces.ErrorMessage)
|
||||
data <- []byte("data: {\"type\":\"response.completed\",\"response\":{\"id\":\"resp-1\",\"output\":[{\"type\":\"message\",\"id\":\"out-1\"}]}}\n\n")
|
||||
close(data)
|
||||
close(errCh)
|
||||
|
||||
var bodyLog strings.Builder
|
||||
completedOutput, err := (*OpenAIResponsesAPIHandler)(nil).forwardResponsesWebsocket(
|
||||
ctx,
|
||||
conn,
|
||||
func(...interface{}) {},
|
||||
data,
|
||||
errCh,
|
||||
&bodyLog,
|
||||
"session-1",
|
||||
)
|
||||
if err != nil {
|
||||
serverErrCh <- err
|
||||
return
|
||||
}
|
||||
if gjson.GetBytes(completedOutput, "0.id").String() != "out-1" {
|
||||
serverErrCh <- errors.New("completed output not captured")
|
||||
return
|
||||
}
|
||||
serverErrCh <- nil
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
wsURL := "ws" + strings.TrimPrefix(server.URL, "http")
|
||||
conn, _, err := websocket.DefaultDialer.Dial(wsURL, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("dial websocket: %v", err)
|
||||
}
|
||||
defer func() {
|
||||
errClose := conn.Close()
|
||||
if errClose != nil {
|
||||
t.Fatalf("close websocket: %v", errClose)
|
||||
}
|
||||
}()
|
||||
|
||||
_, payload, errReadMessage := conn.ReadMessage()
|
||||
if errReadMessage != nil {
|
||||
t.Fatalf("read websocket message: %v", errReadMessage)
|
||||
}
|
||||
if gjson.GetBytes(payload, "type").String() != wsEventTypeCompleted {
|
||||
t.Fatalf("payload type = %s, want %s", gjson.GetBytes(payload, "type").String(), wsEventTypeCompleted)
|
||||
}
|
||||
if strings.Contains(string(payload), "response.done") {
|
||||
t.Fatalf("payload unexpectedly rewrote completed event: %s", payload)
|
||||
}
|
||||
|
||||
if errServer := <-serverErrCh; errServer != nil {
|
||||
t.Fatalf("server error: %v", errServer)
|
||||
}
|
||||
}
|
||||
|
||||
func TestWebsocketUpstreamSupportsIncrementalInputForModel(t *testing.T) {
|
||||
manager := coreauth.NewManager(nil, nil, nil)
|
||||
auth := &coreauth.Auth{
|
||||
ID: "auth-ws",
|
||||
Provider: "test-provider",
|
||||
Status: coreauth.StatusActive,
|
||||
Attributes: map[string]string{"websockets": "true"},
|
||||
}
|
||||
if _, err := manager.Register(context.Background(), auth); err != nil {
|
||||
t.Fatalf("Register auth: %v", err)
|
||||
}
|
||||
registry.GetGlobalRegistry().RegisterClient(auth.ID, auth.Provider, []*registry.ModelInfo{{ID: "test-model"}})
|
||||
t.Cleanup(func() {
|
||||
registry.GetGlobalRegistry().UnregisterClient(auth.ID)
|
||||
})
|
||||
|
||||
base := handlers.NewBaseAPIHandlers(&sdkconfig.SDKConfig{}, manager)
|
||||
h := NewOpenAIResponsesAPIHandler(base)
|
||||
if !h.websocketUpstreamSupportsIncrementalInputForModel("test-model") {
|
||||
t.Fatalf("expected websocket-capable upstream for test-model")
|
||||
}
|
||||
}
|
||||
|
||||
func TestResponsesWebsocketPrewarmHandledLocallyForSSEUpstream(t *testing.T) {
|
||||
gin.SetMode(gin.TestMode)
|
||||
|
||||
executor := &websocketCaptureExecutor{}
|
||||
manager := coreauth.NewManager(nil, nil, nil)
|
||||
manager.RegisterExecutor(executor)
|
||||
auth := &coreauth.Auth{ID: "auth-sse", Provider: executor.Identifier(), Status: coreauth.StatusActive}
|
||||
if _, err := manager.Register(context.Background(), auth); err != nil {
|
||||
t.Fatalf("Register auth: %v", err)
|
||||
}
|
||||
registry.GetGlobalRegistry().RegisterClient(auth.ID, auth.Provider, []*registry.ModelInfo{{ID: "test-model"}})
|
||||
t.Cleanup(func() {
|
||||
registry.GetGlobalRegistry().UnregisterClient(auth.ID)
|
||||
})
|
||||
|
||||
base := handlers.NewBaseAPIHandlers(&sdkconfig.SDKConfig{}, manager)
|
||||
h := NewOpenAIResponsesAPIHandler(base)
|
||||
router := gin.New()
|
||||
router.GET("/v1/responses/ws", h.ResponsesWebsocket)
|
||||
|
||||
server := httptest.NewServer(router)
|
||||
defer server.Close()
|
||||
|
||||
wsURL := "ws" + strings.TrimPrefix(server.URL, "http") + "/v1/responses/ws"
|
||||
conn, _, err := websocket.DefaultDialer.Dial(wsURL, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("dial websocket: %v", err)
|
||||
}
|
||||
defer func() {
|
||||
errClose := conn.Close()
|
||||
if errClose != nil {
|
||||
t.Fatalf("close websocket: %v", errClose)
|
||||
}
|
||||
}()
|
||||
|
||||
errWrite := conn.WriteMessage(websocket.TextMessage, []byte(`{"type":"response.create","model":"test-model","generate":false}`))
|
||||
if errWrite != nil {
|
||||
t.Fatalf("write prewarm websocket message: %v", errWrite)
|
||||
}
|
||||
|
||||
_, createdPayload, errReadMessage := conn.ReadMessage()
|
||||
if errReadMessage != nil {
|
||||
t.Fatalf("read prewarm created message: %v", errReadMessage)
|
||||
}
|
||||
if gjson.GetBytes(createdPayload, "type").String() != "response.created" {
|
||||
t.Fatalf("created payload type = %s, want response.created", gjson.GetBytes(createdPayload, "type").String())
|
||||
}
|
||||
prewarmResponseID := gjson.GetBytes(createdPayload, "response.id").String()
|
||||
if prewarmResponseID == "" {
|
||||
t.Fatalf("prewarm response id is empty")
|
||||
}
|
||||
if executor.streamCalls != 0 {
|
||||
t.Fatalf("stream calls after prewarm = %d, want 0", executor.streamCalls)
|
||||
}
|
||||
|
||||
_, completedPayload, errReadMessage := conn.ReadMessage()
|
||||
if errReadMessage != nil {
|
||||
t.Fatalf("read prewarm completed message: %v", errReadMessage)
|
||||
}
|
||||
if gjson.GetBytes(completedPayload, "type").String() != wsEventTypeCompleted {
|
||||
t.Fatalf("completed payload type = %s, want %s", gjson.GetBytes(completedPayload, "type").String(), wsEventTypeCompleted)
|
||||
}
|
||||
if gjson.GetBytes(completedPayload, "response.id").String() != prewarmResponseID {
|
||||
t.Fatalf("completed response id = %s, want %s", gjson.GetBytes(completedPayload, "response.id").String(), prewarmResponseID)
|
||||
}
|
||||
if gjson.GetBytes(completedPayload, "response.usage.total_tokens").Int() != 0 {
|
||||
t.Fatalf("prewarm total tokens = %d, want 0", gjson.GetBytes(completedPayload, "response.usage.total_tokens").Int())
|
||||
}
|
||||
|
||||
secondRequest := fmt.Sprintf(`{"type":"response.create","previous_response_id":%q,"input":[{"type":"message","id":"msg-1"}]}`, prewarmResponseID)
|
||||
errWrite = conn.WriteMessage(websocket.TextMessage, []byte(secondRequest))
|
||||
if errWrite != nil {
|
||||
t.Fatalf("write follow-up websocket message: %v", errWrite)
|
||||
}
|
||||
|
||||
_, upstreamPayload, errReadMessage := conn.ReadMessage()
|
||||
if errReadMessage != nil {
|
||||
t.Fatalf("read upstream completed message: %v", errReadMessage)
|
||||
}
|
||||
if gjson.GetBytes(upstreamPayload, "type").String() != wsEventTypeCompleted {
|
||||
t.Fatalf("upstream payload type = %s, want %s", gjson.GetBytes(upstreamPayload, "type").String(), wsEventTypeCompleted)
|
||||
}
|
||||
if executor.streamCalls != 1 {
|
||||
t.Fatalf("stream calls after follow-up = %d, want 1", executor.streamCalls)
|
||||
}
|
||||
if len(executor.payloads) != 1 {
|
||||
t.Fatalf("captured upstream payloads = %d, want 1", len(executor.payloads))
|
||||
}
|
||||
forwarded := executor.payloads[0]
|
||||
if gjson.GetBytes(forwarded, "previous_response_id").Exists() {
|
||||
t.Fatalf("previous_response_id leaked upstream: %s", forwarded)
|
||||
}
|
||||
if gjson.GetBytes(forwarded, "generate").Exists() {
|
||||
t.Fatalf("generate leaked upstream: %s", forwarded)
|
||||
}
|
||||
if gjson.GetBytes(forwarded, "model").String() != "test-model" {
|
||||
t.Fatalf("forwarded model = %s, want test-model", gjson.GetBytes(forwarded, "model").String())
|
||||
}
|
||||
input := gjson.GetBytes(forwarded, "input").Array()
|
||||
if len(input) != 1 || input[0].Get("id").String() != "msg-1" {
|
||||
t.Fatalf("unexpected forwarded input: %s", forwarded)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user