mirror of
https://github.com/router-for-me/CLIProxyAPIPlus.git
synced 2026-03-09 15:25:17 +00:00
@@ -147,21 +147,21 @@ func ConvertOpenAIRequestToGemini(modelName string, inputRawJSON []byte, _ bool)
|
||||
content := m.Get("content")
|
||||
|
||||
if (role == "system" || role == "developer") && len(arr) > 1 {
|
||||
// system -> system_instruction as a user message style
|
||||
// system -> systemInstruction as a user message style
|
||||
if content.Type == gjson.String {
|
||||
out, _ = sjson.SetBytes(out, "system_instruction.role", "user")
|
||||
out, _ = sjson.SetBytes(out, fmt.Sprintf("system_instruction.parts.%d.text", systemPartIndex), content.String())
|
||||
out, _ = sjson.SetBytes(out, "systemInstruction.role", "user")
|
||||
out, _ = sjson.SetBytes(out, fmt.Sprintf("systemInstruction.parts.%d.text", systemPartIndex), content.String())
|
||||
systemPartIndex++
|
||||
} else if content.IsObject() && content.Get("type").String() == "text" {
|
||||
out, _ = sjson.SetBytes(out, "system_instruction.role", "user")
|
||||
out, _ = sjson.SetBytes(out, fmt.Sprintf("system_instruction.parts.%d.text", systemPartIndex), content.Get("text").String())
|
||||
out, _ = sjson.SetBytes(out, "systemInstruction.role", "user")
|
||||
out, _ = sjson.SetBytes(out, fmt.Sprintf("systemInstruction.parts.%d.text", systemPartIndex), content.Get("text").String())
|
||||
systemPartIndex++
|
||||
} else if content.IsArray() {
|
||||
contents := content.Array()
|
||||
if len(contents) > 0 {
|
||||
out, _ = sjson.SetBytes(out, "system_instruction.role", "user")
|
||||
out, _ = sjson.SetBytes(out, "systemInstruction.role", "user")
|
||||
for j := 0; j < len(contents); j++ {
|
||||
out, _ = sjson.SetBytes(out, fmt.Sprintf("system_instruction.parts.%d.text", systemPartIndex), contents[j].Get("text").String())
|
||||
out, _ = sjson.SetBytes(out, fmt.Sprintf("systemInstruction.parts.%d.text", systemPartIndex), contents[j].Get("text").String())
|
||||
systemPartIndex++
|
||||
}
|
||||
}
|
||||
|
||||
@@ -26,7 +26,7 @@ func ConvertOpenAIResponsesRequestToGemini(modelName string, inputRawJSON []byte
|
||||
if instructions := root.Get("instructions"); instructions.Exists() {
|
||||
systemInstr := `{"parts":[{"text":""}]}`
|
||||
systemInstr, _ = sjson.Set(systemInstr, "parts.0.text", instructions.String())
|
||||
out, _ = sjson.SetRaw(out, "system_instruction", systemInstr)
|
||||
out, _ = sjson.SetRaw(out, "systemInstruction", systemInstr)
|
||||
}
|
||||
|
||||
// Convert input messages to Gemini contents format
|
||||
@@ -119,7 +119,7 @@ func ConvertOpenAIResponsesRequestToGemini(modelName string, inputRawJSON []byte
|
||||
if strings.EqualFold(itemRole, "system") {
|
||||
if contentArray := item.Get("content"); contentArray.Exists() {
|
||||
systemInstr := ""
|
||||
if systemInstructionResult := gjson.Get(out, "system_instruction"); systemInstructionResult.Exists() {
|
||||
if systemInstructionResult := gjson.Get(out, "systemInstruction"); systemInstructionResult.Exists() {
|
||||
systemInstr = systemInstructionResult.Raw
|
||||
} else {
|
||||
systemInstr = `{"parts":[]}`
|
||||
@@ -140,7 +140,7 @@ func ConvertOpenAIResponsesRequestToGemini(modelName string, inputRawJSON []byte
|
||||
}
|
||||
|
||||
if systemInstr != `{"parts":[]}` {
|
||||
out, _ = sjson.SetRaw(out, "system_instruction", systemInstr)
|
||||
out, _ = sjson.SetRaw(out, "systemInstruction", systemInstr)
|
||||
}
|
||||
}
|
||||
continue
|
||||
|
||||
@@ -213,6 +213,26 @@ func (m *Manager) syncScheduler() {
|
||||
m.syncSchedulerFromSnapshot(m.snapshotAuths())
|
||||
}
|
||||
|
||||
// RefreshSchedulerEntry re-upserts a single auth into the scheduler so that its
|
||||
// supportedModelSet is rebuilt from the current global model registry state.
|
||||
// This must be called after models have been registered for a newly added auth,
|
||||
// because the initial scheduler.upsertAuth during Register/Update runs before
|
||||
// registerModelsForAuth and therefore snapshots an empty model set.
|
||||
func (m *Manager) RefreshSchedulerEntry(authID string) {
|
||||
if m == nil || m.scheduler == nil || authID == "" {
|
||||
return
|
||||
}
|
||||
m.mu.RLock()
|
||||
auth, ok := m.auths[authID]
|
||||
if !ok || auth == nil {
|
||||
m.mu.RUnlock()
|
||||
return
|
||||
}
|
||||
snapshot := auth.Clone()
|
||||
m.mu.RUnlock()
|
||||
m.scheduler.upsertAuth(snapshot)
|
||||
}
|
||||
|
||||
func (m *Manager) SetSelector(selector Selector) {
|
||||
if m == nil {
|
||||
return
|
||||
@@ -2038,6 +2058,10 @@ func shouldRetrySchedulerPick(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
var cooldownErr *modelCooldownError
|
||||
if errors.As(err, &cooldownErr) {
|
||||
return true
|
||||
}
|
||||
var authErr *Error
|
||||
if !errors.As(err, &authErr) || authErr == nil {
|
||||
return false
|
||||
|
||||
163
sdk/cliproxy/auth/conductor_scheduler_refresh_test.go
Normal file
163
sdk/cliproxy/auth/conductor_scheduler_refresh_test.go
Normal file
@@ -0,0 +1,163 @@
|
||||
package auth
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"net/http"
|
||||
"testing"
|
||||
|
||||
"github.com/router-for-me/CLIProxyAPI/v6/internal/registry"
|
||||
cliproxyexecutor "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/executor"
|
||||
)
|
||||
|
||||
type schedulerProviderTestExecutor struct {
|
||||
provider string
|
||||
}
|
||||
|
||||
func (e schedulerProviderTestExecutor) Identifier() string { return e.provider }
|
||||
|
||||
func (e schedulerProviderTestExecutor) Execute(ctx context.Context, auth *Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {
|
||||
return cliproxyexecutor.Response{}, nil
|
||||
}
|
||||
|
||||
func (e schedulerProviderTestExecutor) ExecuteStream(ctx context.Context, auth *Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (*cliproxyexecutor.StreamResult, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (e schedulerProviderTestExecutor) Refresh(ctx context.Context, auth *Auth) (*Auth, error) {
|
||||
return auth, nil
|
||||
}
|
||||
|
||||
func (e schedulerProviderTestExecutor) CountTokens(ctx context.Context, auth *Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {
|
||||
return cliproxyexecutor.Response{}, nil
|
||||
}
|
||||
|
||||
func (e schedulerProviderTestExecutor) HttpRequest(ctx context.Context, auth *Auth, req *http.Request) (*http.Response, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func TestManager_RefreshSchedulerEntry_RebuildsSupportedModelSetAfterModelRegistration(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
prime func(*Manager, *Auth) error
|
||||
}{
|
||||
{
|
||||
name: "register",
|
||||
prime: func(manager *Manager, auth *Auth) error {
|
||||
_, errRegister := manager.Register(ctx, auth)
|
||||
return errRegister
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "update",
|
||||
prime: func(manager *Manager, auth *Auth) error {
|
||||
_, errRegister := manager.Register(ctx, auth)
|
||||
if errRegister != nil {
|
||||
return errRegister
|
||||
}
|
||||
updated := auth.Clone()
|
||||
updated.Metadata = map[string]any{"updated": true}
|
||||
_, errUpdate := manager.Update(ctx, updated)
|
||||
return errUpdate
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, testCase := range testCases {
|
||||
testCase := testCase
|
||||
t.Run(testCase.name, func(t *testing.T) {
|
||||
manager := NewManager(nil, &RoundRobinSelector{}, nil)
|
||||
auth := &Auth{
|
||||
ID: "refresh-entry-" + testCase.name,
|
||||
Provider: "gemini",
|
||||
}
|
||||
if errPrime := testCase.prime(manager, auth); errPrime != nil {
|
||||
t.Fatalf("prime auth %s: %v", testCase.name, errPrime)
|
||||
}
|
||||
|
||||
registerSchedulerModels(t, "gemini", "scheduler-refresh-model", auth.ID)
|
||||
|
||||
got, errPick := manager.scheduler.pickSingle(ctx, "gemini", "scheduler-refresh-model", cliproxyexecutor.Options{}, nil)
|
||||
var authErr *Error
|
||||
if !errors.As(errPick, &authErr) || authErr == nil {
|
||||
t.Fatalf("pickSingle() before refresh error = %v, want auth_not_found", errPick)
|
||||
}
|
||||
if authErr.Code != "auth_not_found" {
|
||||
t.Fatalf("pickSingle() before refresh code = %q, want %q", authErr.Code, "auth_not_found")
|
||||
}
|
||||
if got != nil {
|
||||
t.Fatalf("pickSingle() before refresh auth = %v, want nil", got)
|
||||
}
|
||||
|
||||
manager.RefreshSchedulerEntry(auth.ID)
|
||||
|
||||
got, errPick = manager.scheduler.pickSingle(ctx, "gemini", "scheduler-refresh-model", cliproxyexecutor.Options{}, nil)
|
||||
if errPick != nil {
|
||||
t.Fatalf("pickSingle() after refresh error = %v", errPick)
|
||||
}
|
||||
if got == nil || got.ID != auth.ID {
|
||||
t.Fatalf("pickSingle() after refresh auth = %v, want %q", got, auth.ID)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestManager_PickNext_RebuildsSchedulerAfterModelCooldownError(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
manager := NewManager(nil, &RoundRobinSelector{}, nil)
|
||||
manager.RegisterExecutor(schedulerProviderTestExecutor{provider: "gemini"})
|
||||
|
||||
registerSchedulerModels(t, "gemini", "scheduler-cooldown-rebuild-model", "cooldown-stale-old")
|
||||
|
||||
oldAuth := &Auth{
|
||||
ID: "cooldown-stale-old",
|
||||
Provider: "gemini",
|
||||
}
|
||||
if _, errRegister := manager.Register(ctx, oldAuth); errRegister != nil {
|
||||
t.Fatalf("register old auth: %v", errRegister)
|
||||
}
|
||||
|
||||
manager.MarkResult(ctx, Result{
|
||||
AuthID: oldAuth.ID,
|
||||
Provider: "gemini",
|
||||
Model: "scheduler-cooldown-rebuild-model",
|
||||
Success: false,
|
||||
Error: &Error{HTTPStatus: http.StatusTooManyRequests, Message: "quota"},
|
||||
})
|
||||
|
||||
newAuth := &Auth{
|
||||
ID: "cooldown-stale-new",
|
||||
Provider: "gemini",
|
||||
}
|
||||
if _, errRegister := manager.Register(ctx, newAuth); errRegister != nil {
|
||||
t.Fatalf("register new auth: %v", errRegister)
|
||||
}
|
||||
|
||||
reg := registry.GetGlobalRegistry()
|
||||
reg.RegisterClient(newAuth.ID, "gemini", []*registry.ModelInfo{{ID: "scheduler-cooldown-rebuild-model"}})
|
||||
t.Cleanup(func() {
|
||||
reg.UnregisterClient(newAuth.ID)
|
||||
})
|
||||
|
||||
got, errPick := manager.scheduler.pickSingle(ctx, "gemini", "scheduler-cooldown-rebuild-model", cliproxyexecutor.Options{}, nil)
|
||||
var cooldownErr *modelCooldownError
|
||||
if !errors.As(errPick, &cooldownErr) {
|
||||
t.Fatalf("pickSingle() before sync error = %v, want modelCooldownError", errPick)
|
||||
}
|
||||
if got != nil {
|
||||
t.Fatalf("pickSingle() before sync auth = %v, want nil", got)
|
||||
}
|
||||
|
||||
got, executor, errPick := manager.pickNext(ctx, "gemini", "scheduler-cooldown-rebuild-model", cliproxyexecutor.Options{}, nil)
|
||||
if errPick != nil {
|
||||
t.Fatalf("pickNext() error = %v", errPick)
|
||||
}
|
||||
if executor == nil {
|
||||
t.Fatal("pickNext() executor = nil")
|
||||
}
|
||||
if got == nil || got.ID != newAuth.ID {
|
||||
t.Fatalf("pickNext() auth = %v, want %q", got, newAuth.ID)
|
||||
}
|
||||
}
|
||||
@@ -323,6 +323,12 @@ func (s *Service) applyCoreAuthAddOrUpdate(ctx context.Context, auth *coreauth.A
|
||||
// This operation may block on network calls, but the auth configuration
|
||||
// is already effective at this point.
|
||||
s.registerModelsForAuth(auth)
|
||||
|
||||
// Refresh the scheduler entry so that the auth's supportedModelSet is rebuilt
|
||||
// from the now-populated global model registry. Without this, newly added auths
|
||||
// have an empty supportedModelSet (because Register/Update upserts into the
|
||||
// scheduler before registerModelsForAuth runs) and are invisible to the scheduler.
|
||||
s.coreManager.RefreshSchedulerEntry(auth.ID)
|
||||
}
|
||||
|
||||
func (s *Service) applyCoreAuthRemoval(ctx context.Context, id string) {
|
||||
|
||||
Reference in New Issue
Block a user