From 07b7c1a1e01f14d06c6d33abe382013cadaf20f5 Mon Sep 17 00:00:00 2001 From: MonsterQiu <72pgstan@gmail.com> Date: Tue, 31 Mar 2026 14:27:14 +0800 Subject: [PATCH] fix(auth): resolve oauth aliases before suspension checks --- sdk/cliproxy/auth/conductor.go | 176 ++++++++++++++++-- .../conductor_oauth_alias_suspension_test.go | 111 +++++++++++ 2 files changed, 275 insertions(+), 12 deletions(-) create mode 100644 sdk/cliproxy/auth/conductor_oauth_alias_suspension_test.go diff --git a/sdk/cliproxy/auth/conductor.go b/sdk/cliproxy/auth/conductor.go index 61f32278..82037c51 100644 --- a/sdk/cliproxy/auth/conductor.go +++ b/sdk/cliproxy/auth/conductor.go @@ -8,6 +8,7 @@ import ( "io" "net/http" "path/filepath" + "sort" "strconv" "strings" "sync" @@ -437,6 +438,27 @@ func (m *Manager) executionModelCandidates(auth *Auth, routeModel string) []stri return []string{resolved} } +func (m *Manager) selectionModelForAuth(auth *Auth, routeModel string) string { + requestedModel := rewriteModelForAuth(routeModel, auth) + if strings.TrimSpace(requestedModel) == "" { + requestedModel = strings.TrimSpace(routeModel) + } + resolvedModel := m.applyOAuthModelAlias(auth, requestedModel) + if strings.TrimSpace(resolvedModel) == "" { + resolvedModel = requestedModel + } + return resolvedModel +} + +func (m *Manager) stateModelForExecution(auth *Auth, routeModel, upstreamModel string, pooled bool) string { + stateModel := executionResultModel(routeModel, upstreamModel, pooled) + selectionModel := m.selectionModelForAuth(auth, routeModel) + if canonicalModelKey(selectionModel) == canonicalModelKey(upstreamModel) && strings.TrimSpace(selectionModel) != "" { + return strings.TrimSpace(upstreamModel) + } + return stateModel +} + func executionResultModel(routeModel, upstreamModel string, pooled bool) string { if pooled { if resolved := strings.TrimSpace(upstreamModel); resolved != "" { @@ -449,14 +471,14 @@ func executionResultModel(routeModel, upstreamModel string, pooled bool) string return strings.TrimSpace(upstreamModel) } -func filterExecutionModels(auth *Auth, routeModel string, candidates []string, pooled bool) []string { +func (m *Manager) filterExecutionModels(auth *Auth, routeModel string, candidates []string, pooled bool) []string { if len(candidates) == 0 { return nil } now := time.Now() out := make([]string, 0, len(candidates)) for _, upstreamModel := range candidates { - stateModel := executionResultModel(routeModel, upstreamModel, pooled) + stateModel := m.stateModelForExecution(auth, routeModel, upstreamModel, pooled) blocked, _, _ := isAuthBlockedForModel(auth, stateModel, now) if blocked { continue @@ -469,7 +491,7 @@ func filterExecutionModels(auth *Auth, routeModel string, candidates []string, p func (m *Manager) preparedExecutionModels(auth *Auth, routeModel string) ([]string, bool) { candidates := m.executionModelCandidates(auth, routeModel) pooled := len(candidates) > 1 - return filterExecutionModels(auth, routeModel, candidates, pooled), pooled + return m.filterExecutionModels(auth, routeModel, candidates, pooled), pooled } func (m *Manager) prepareExecutionModels(auth *Auth, routeModel string) []string { @@ -477,6 +499,62 @@ func (m *Manager) prepareExecutionModels(auth *Auth, routeModel string) []string return models } +func (m *Manager) availableAuthsForRouteModel(auths []*Auth, provider, routeModel string, now time.Time) ([]*Auth, error) { + if len(auths) == 0 { + return nil, &Error{Code: "auth_not_found", Message: "no auth candidates"} + } + + availableByPriority := make(map[int][]*Auth) + cooldownCount := 0 + var earliest time.Time + for i := 0; i < len(auths); i++ { + candidate := auths[i] + checkModel := m.selectionModelForAuth(candidate, routeModel) + blocked, reason, next := isAuthBlockedForModel(candidate, checkModel, now) + if !blocked { + priority := authPriority(candidate) + availableByPriority[priority] = append(availableByPriority[priority], candidate) + continue + } + if reason == blockReasonCooldown { + cooldownCount++ + if !next.IsZero() && (earliest.IsZero() || next.Before(earliest)) { + earliest = next + } + } + } + + if len(availableByPriority) == 0 { + if cooldownCount == len(auths) && !earliest.IsZero() { + providerForError := provider + if providerForError == "mixed" { + providerForError = "" + } + resetIn := earliest.Sub(now) + if resetIn < 0 { + resetIn = 0 + } + return nil, newModelCooldownError(routeModel, providerForError, resetIn) + } + return nil, &Error{Code: "auth_unavailable", Message: "no auth available"} + } + + bestPriority := 0 + found := false + for priority := range availableByPriority { + if !found || priority > bestPriority { + bestPriority = priority + found = true + } + } + + available := availableByPriority[bestPriority] + if len(available) > 1 { + sort.Slice(available, func(i, j int) bool { return available[i].ID < available[j].ID }) + } + return available, nil +} + func discardStreamChunks(ch <-chan cliproxyexecutor.StreamChunk) { if ch == nil { return @@ -627,7 +705,7 @@ func (m *Manager) executeStreamWithModelPool(ctx context.Context, executor Provi } var lastErr error for idx, execModel := range execModels { - resultModel := executionResultModel(routeModel, execModel, pooled) + resultModel := m.stateModelForExecution(auth, routeModel, execModel, pooled) execReq := req execReq.Model = execModel streamResult, errStream := executor.ExecuteStream(ctx, auth, execReq, opts) @@ -1107,7 +1185,7 @@ func (m *Manager) executeMixedOnce(ctx context.Context, providers []string, req attempted[auth.ID] = struct{}{} var authErr error for _, upstreamModel := range models { - resultModel := executionResultModel(routeModel, upstreamModel, pooled) + resultModel := m.stateModelForExecution(auth, routeModel, upstreamModel, pooled) execReq := req execReq.Model = upstreamModel resp, errExec := executor.Execute(execCtx, auth, execReq, opts) @@ -1185,7 +1263,7 @@ func (m *Manager) executeCountMixedOnce(ctx context.Context, providers []string, attempted[auth.ID] = struct{}{} var authErr error for _, upstreamModel := range models { - resultModel := executionResultModel(routeModel, upstreamModel, pooled) + resultModel := m.stateModelForExecution(auth, routeModel, upstreamModel, pooled) execReq := req execReq.Model = upstreamModel resp, errExec := executor.CountTokens(execCtx, auth, execReq, opts) @@ -2271,6 +2349,13 @@ func shouldRetrySchedulerPick(err error) bool { return authErr.Code == "auth_not_found" || authErr.Code == "auth_unavailable" } +func (m *Manager) routeAwareSelectionRequired(auth *Auth, routeModel string) bool { + if auth == nil || strings.TrimSpace(routeModel) == "" { + return false + } + return canonicalModelKey(m.selectionModelForAuth(auth, routeModel)) != canonicalModelKey(routeModel) +} + func (m *Manager) pickNextLegacy(ctx context.Context, provider, model string, opts cliproxyexecutor.Options, tried map[string]struct{}) (*Auth, ProviderExecutor, error) { pinnedAuthID := pinnedAuthIDFromMetadata(opts.Metadata) @@ -2300,8 +2385,17 @@ func (m *Manager) pickNextLegacy(ctx context.Context, provider, model string, op if _, used := tried[candidate.ID]; used { continue } - if modelKey != "" && registryRef != nil && !registryRef.ClientSupportsModel(candidate.ID, modelKey) { - continue + if modelKey != "" && registryRef != nil { + supportsModel := registryRef.ClientSupportsModel(candidate.ID, modelKey) + if !supportsModel { + selectionKey := canonicalModelKey(m.selectionModelForAuth(candidate, model)) + if selectionKey != "" && selectionKey != modelKey { + supportsModel = registryRef.ClientSupportsModel(candidate.ID, selectionKey) + } + } + if !supportsModel { + continue + } } candidates = append(candidates, candidate) } @@ -2309,7 +2403,12 @@ func (m *Manager) pickNextLegacy(ctx context.Context, provider, model string, op m.mu.RUnlock() return nil, nil, &Error{Code: "auth_not_found", Message: "no auth available"} } - selected, errPick := m.selector.Pick(ctx, provider, model, opts, candidates) + available, errAvailable := m.availableAuthsForRouteModel(candidates, provider, model, time.Now()) + if errAvailable != nil { + m.mu.RUnlock() + return nil, nil, errAvailable + } + selected, errPick := m.selector.Pick(ctx, provider, "", opts, available) if errPick != nil { m.mu.RUnlock() return nil, nil, errPick @@ -2335,6 +2434,22 @@ func (m *Manager) pickNext(ctx context.Context, provider, model string, opts cli if !m.useSchedulerFastPath() { return m.pickNextLegacy(ctx, provider, model, opts, tried) } + if strings.TrimSpace(model) != "" { + m.mu.RLock() + for _, candidate := range m.auths { + if candidate == nil || candidate.Provider != provider || candidate.Disabled { + continue + } + if _, used := tried[candidate.ID]; used { + continue + } + if m.routeAwareSelectionRequired(candidate, model) { + m.mu.RUnlock() + return m.pickNextLegacy(ctx, provider, model, opts, tried) + } + } + m.mu.RUnlock() + } executor, okExecutor := m.Executor(provider) if !okExecutor { return nil, nil, &Error{Code: "executor_not_found", Message: "executor not registered"} @@ -2408,8 +2523,17 @@ func (m *Manager) pickNextMixedLegacy(ctx context.Context, providers []string, m if _, ok := m.executors[providerKey]; !ok { continue } - if modelKey != "" && registryRef != nil && !registryRef.ClientSupportsModel(candidate.ID, modelKey) { - continue + if modelKey != "" && registryRef != nil { + supportsModel := registryRef.ClientSupportsModel(candidate.ID, modelKey) + if !supportsModel { + selectionKey := canonicalModelKey(m.selectionModelForAuth(candidate, model)) + if selectionKey != "" && selectionKey != modelKey { + supportsModel = registryRef.ClientSupportsModel(candidate.ID, selectionKey) + } + } + if !supportsModel { + continue + } } candidates = append(candidates, candidate) } @@ -2417,7 +2541,12 @@ func (m *Manager) pickNextMixedLegacy(ctx context.Context, providers []string, m m.mu.RUnlock() return nil, nil, "", &Error{Code: "auth_not_found", Message: "no auth available"} } - selected, errPick := m.selector.Pick(ctx, "mixed", model, opts, candidates) + available, errAvailable := m.availableAuthsForRouteModel(candidates, "mixed", model, time.Now()) + if errAvailable != nil { + m.mu.RUnlock() + return nil, nil, "", errAvailable + } + selected, errPick := m.selector.Pick(ctx, "mixed", "", opts, available) if errPick != nil { m.mu.RUnlock() return nil, nil, "", errPick @@ -2469,6 +2598,29 @@ func (m *Manager) pickNextMixed(ctx context.Context, providers []string, model s if len(eligibleProviders) == 0 { return nil, nil, "", &Error{Code: "auth_not_found", Message: "no auth available"} } + if strings.TrimSpace(model) != "" { + providerSet := make(map[string]struct{}, len(eligibleProviders)) + for _, providerKey := range eligibleProviders { + providerSet[providerKey] = struct{}{} + } + m.mu.RLock() + for _, candidate := range m.auths { + if candidate == nil || candidate.Disabled { + continue + } + if _, ok := providerSet[strings.TrimSpace(strings.ToLower(candidate.Provider))]; !ok { + continue + } + if _, used := tried[candidate.ID]; used { + continue + } + if m.routeAwareSelectionRequired(candidate, model) { + m.mu.RUnlock() + return m.pickNextMixedLegacy(ctx, providers, model, opts, tried) + } + } + m.mu.RUnlock() + } selected, providerKey, errPick := m.scheduler.pickMixed(ctx, eligibleProviders, model, opts, tried) if errPick != nil && model != "" && shouldRetrySchedulerPick(errPick) { diff --git a/sdk/cliproxy/auth/conductor_oauth_alias_suspension_test.go b/sdk/cliproxy/auth/conductor_oauth_alias_suspension_test.go new file mode 100644 index 00000000..8bc779e5 --- /dev/null +++ b/sdk/cliproxy/auth/conductor_oauth_alias_suspension_test.go @@ -0,0 +1,111 @@ +package auth + +import ( + "context" + "net/http" + "sync" + "testing" + "time" + + internalconfig "github.com/router-for-me/CLIProxyAPI/v6/internal/config" + "github.com/router-for-me/CLIProxyAPI/v6/internal/registry" + cliproxyexecutor "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/executor" +) + +type aliasRoutingExecutor struct { + id string + + mu sync.Mutex + executeModels []string +} + +func (e *aliasRoutingExecutor) Identifier() string { return e.id } + +func (e *aliasRoutingExecutor) Execute(_ context.Context, _ *Auth, req cliproxyexecutor.Request, _ cliproxyexecutor.Options) (cliproxyexecutor.Response, error) { + e.mu.Lock() + e.executeModels = append(e.executeModels, req.Model) + e.mu.Unlock() + return cliproxyexecutor.Response{Payload: []byte(req.Model)}, nil +} + +func (e *aliasRoutingExecutor) ExecuteStream(context.Context, *Auth, cliproxyexecutor.Request, cliproxyexecutor.Options) (*cliproxyexecutor.StreamResult, error) { + return nil, &Error{HTTPStatus: http.StatusNotImplemented, Message: "ExecuteStream not implemented"} +} + +func (e *aliasRoutingExecutor) Refresh(_ context.Context, auth *Auth) (*Auth, error) { + return auth, nil +} + +func (e *aliasRoutingExecutor) CountTokens(context.Context, *Auth, cliproxyexecutor.Request, cliproxyexecutor.Options) (cliproxyexecutor.Response, error) { + return cliproxyexecutor.Response{}, &Error{HTTPStatus: http.StatusNotImplemented, Message: "CountTokens not implemented"} +} + +func (e *aliasRoutingExecutor) HttpRequest(context.Context, *Auth, *http.Request) (*http.Response, error) { + return nil, &Error{HTTPStatus: http.StatusNotImplemented, Message: "HttpRequest not implemented"} +} + +func (e *aliasRoutingExecutor) ExecuteModels() []string { + e.mu.Lock() + defer e.mu.Unlock() + out := make([]string, len(e.executeModels)) + copy(out, e.executeModels) + return out +} + +func TestManagerExecute_OAuthAliasBypassesBlockedRouteModel(t *testing.T) { + const ( + provider = "antigravity" + routeModel = "claude-opus-4-6" + targetModel = "claude-opus-4-6-thinking" + ) + + manager := NewManager(nil, nil, nil) + executor := &aliasRoutingExecutor{id: provider} + manager.RegisterExecutor(executor) + manager.SetOAuthModelAlias(map[string][]internalconfig.OAuthModelAlias{ + provider: {{ + Name: targetModel, + Alias: routeModel, + Fork: true, + }}, + }) + + auth := &Auth{ + ID: "oauth-alias-auth", + Provider: provider, + Status: StatusActive, + ModelStates: map[string]*ModelState{ + routeModel: { + Unavailable: true, + Status: StatusError, + NextRetryAfter: time.Now().Add(1 * time.Hour), + }, + }, + } + if _, errRegister := manager.Register(context.Background(), auth); errRegister != nil { + t.Fatalf("register auth: %v", errRegister) + } + + reg := registry.GetGlobalRegistry() + reg.RegisterClient(auth.ID, provider, []*registry.ModelInfo{{ID: routeModel}, {ID: targetModel}}) + t.Cleanup(func() { + reg.UnregisterClient(auth.ID) + }) + manager.RefreshSchedulerEntry(auth.ID) + + resp, errExecute := manager.Execute(context.Background(), []string{provider}, cliproxyexecutor.Request{Model: routeModel}, cliproxyexecutor.Options{}) + if errExecute != nil { + t.Fatalf("execute error = %v, want success", errExecute) + } + if string(resp.Payload) != targetModel { + t.Fatalf("execute payload = %q, want %q", string(resp.Payload), targetModel) + } + + gotModels := executor.ExecuteModels() + if len(gotModels) != 1 { + t.Fatalf("execute models len = %d, want 1", len(gotModels)) + } + if gotModels[0] != targetModel { + t.Fatalf("execute model = %q, want %q", gotModels[0], targetModel) + } +}