mirror of
https://github.com/router-for-me/CLIProxyAPIPlus.git
synced 2026-04-09 07:42:42 +00:00
Reconcile registry model states on auth changes
Add Manager.ReconcileRegistryModelStates to clear stale per-model runtime failures for models currently registered in the global model registry. The method finds models supported for an auth, resets non-clean ModelState entries, updates aggregated availability, persists changes, and pushes a snapshot to the scheduler. Introduce modelStateIsClean helper to determine when a model state needs resetting. Call ReconcileRegistryModelStates from Service paths that register/refresh models (applyCoreAuthAddOrUpdate and refreshModelRegistrationForAuth) to keep the scheduler and global registry aligned after model re-registration.
This commit is contained in:
@@ -233,6 +233,81 @@ func (m *Manager) RefreshSchedulerEntry(authID string) {
|
||||
m.scheduler.upsertAuth(snapshot)
|
||||
}
|
||||
|
||||
// ReconcileRegistryModelStates clears stale per-model runtime failures for
|
||||
// models that are currently registered for the auth in the global model registry.
|
||||
//
|
||||
// This keeps the scheduler and the global registry aligned after model
|
||||
// re-registration. Without this reconciliation, a model can reappear in
|
||||
// /v1/models after registry refresh while the scheduler still blocks it because
|
||||
// auth.ModelStates retained an older failure such as not_found or quota.
|
||||
func (m *Manager) ReconcileRegistryModelStates(ctx context.Context, authID string) {
|
||||
if m == nil || authID == "" {
|
||||
return
|
||||
}
|
||||
|
||||
supportedModels := registry.GetGlobalRegistry().GetModelsForClient(authID)
|
||||
if len(supportedModels) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
supported := make(map[string]struct{}, len(supportedModels))
|
||||
for _, model := range supportedModels {
|
||||
if model == nil {
|
||||
continue
|
||||
}
|
||||
modelKey := canonicalModelKey(model.ID)
|
||||
if modelKey == "" {
|
||||
continue
|
||||
}
|
||||
supported[modelKey] = struct{}{}
|
||||
}
|
||||
if len(supported) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
var snapshot *Auth
|
||||
now := time.Now()
|
||||
|
||||
m.mu.Lock()
|
||||
auth, ok := m.auths[authID]
|
||||
if ok && auth != nil && len(auth.ModelStates) > 0 {
|
||||
changed := false
|
||||
for modelKey, state := range auth.ModelStates {
|
||||
if state == nil {
|
||||
continue
|
||||
}
|
||||
baseModel := canonicalModelKey(modelKey)
|
||||
if baseModel == "" {
|
||||
baseModel = strings.TrimSpace(modelKey)
|
||||
}
|
||||
if _, supportedModel := supported[baseModel]; !supportedModel {
|
||||
continue
|
||||
}
|
||||
if modelStateIsClean(state) {
|
||||
continue
|
||||
}
|
||||
resetModelState(state, now)
|
||||
changed = true
|
||||
}
|
||||
if changed {
|
||||
updateAggregatedAvailability(auth, now)
|
||||
if !hasModelError(auth, now) {
|
||||
auth.LastError = nil
|
||||
auth.StatusMessage = ""
|
||||
auth.Status = StatusActive
|
||||
}
|
||||
auth.UpdatedAt = now
|
||||
_ = m.persist(ctx, auth)
|
||||
snapshot = auth.Clone()
|
||||
}
|
||||
}
|
||||
m.mu.Unlock()
|
||||
|
||||
if m.scheduler != nil && snapshot != nil {
|
||||
m.scheduler.upsertAuth(snapshot)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Manager) SetSelector(selector Selector) {
|
||||
if m == nil {
|
||||
return
|
||||
@@ -1735,6 +1810,22 @@ func resetModelState(state *ModelState, now time.Time) {
|
||||
state.UpdatedAt = now
|
||||
}
|
||||
|
||||
func modelStateIsClean(state *ModelState) bool {
|
||||
if state == nil {
|
||||
return true
|
||||
}
|
||||
if state.Status != StatusActive {
|
||||
return false
|
||||
}
|
||||
if state.Unavailable || state.StatusMessage != "" || !state.NextRetryAfter.IsZero() || state.LastError != nil {
|
||||
return false
|
||||
}
|
||||
if state.Quota.Exceeded || state.Quota.Reason != "" || !state.Quota.NextRecoverAt.IsZero() || state.Quota.BackoffLevel != 0 {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func updateAggregatedAvailability(auth *Auth, now time.Time) {
|
||||
if auth == nil || len(auth.ModelStates) == 0 {
|
||||
return
|
||||
|
||||
@@ -310,6 +310,7 @@ func (s *Service) applyCoreAuthAddOrUpdate(ctx context.Context, auth *coreauth.A
|
||||
// This operation may block on network calls, but the auth configuration
|
||||
// is already effective at this point.
|
||||
s.registerModelsForAuth(auth)
|
||||
s.coreManager.ReconcileRegistryModelStates(ctx, auth.ID)
|
||||
|
||||
// Refresh the scheduler entry so that the auth's supportedModelSet is rebuilt
|
||||
// from the now-populated global model registry. Without this, newly added auths
|
||||
@@ -1019,6 +1020,7 @@ func (s *Service) refreshModelRegistrationForAuth(current *coreauth.Auth) bool {
|
||||
s.ensureExecutorsForAuth(current)
|
||||
}
|
||||
s.registerModelsForAuth(current)
|
||||
s.coreManager.ReconcileRegistryModelStates(context.Background(), current.ID)
|
||||
|
||||
latest, ok := s.latestAuthForModelRegistration(current.ID)
|
||||
if !ok || latest.Disabled {
|
||||
@@ -1032,6 +1034,7 @@ func (s *Service) refreshModelRegistrationForAuth(current *coreauth.Auth) bool {
|
||||
// no auth fields changed, but keeps the refresh path simple and correct.
|
||||
s.ensureExecutorsForAuth(latest)
|
||||
s.registerModelsForAuth(latest)
|
||||
s.coreManager.ReconcileRegistryModelStates(context.Background(), latest.ID)
|
||||
s.coreManager.RefreshSchedulerEntry(current.ID)
|
||||
return true
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user