diff --git a/internal/watcher/clients.go b/internal/watcher/clients.go index cf0ed076..ae11967b 100644 --- a/internal/watcher/clients.go +++ b/internal/watcher/clients.go @@ -17,6 +17,7 @@ import ( "github.com/router-for-me/CLIProxyAPI/v6/internal/config" "github.com/router-for-me/CLIProxyAPI/v6/internal/util" "github.com/router-for-me/CLIProxyAPI/v6/internal/watcher/diff" + "github.com/router-for-me/CLIProxyAPI/v6/internal/watcher/synthesizer" coreauth "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth" log "github.com/sirupsen/logrus" ) @@ -75,6 +76,7 @@ func (w *Watcher) reloadClients(rescanAuth bool, affectedOAuthProviders []string w.lastAuthHashes = make(map[string]string) w.lastAuthContents = make(map[string]*coreauth.Auth) + w.fileAuthsByPath = make(map[string]map[string]*coreauth.Auth) if resolvedAuthDir, errResolveAuthDir := util.ResolveAuthDir(cfg.AuthDir); errResolveAuthDir != nil { log.Errorf("failed to resolve auth directory for hash cache: %v", errResolveAuthDir) } else if resolvedAuthDir != "" { @@ -92,6 +94,24 @@ func (w *Watcher) reloadClients(rescanAuth bool, affectedOAuthProviders []string if errParse := json.Unmarshal(data, &auth); errParse == nil { w.lastAuthContents[normalizedPath] = &auth } + ctx := &synthesizer.SynthesisContext{ + Config: cfg, + AuthDir: resolvedAuthDir, + Now: time.Now(), + IDGenerator: synthesizer.NewStableIDGenerator(), + } + if generated := synthesizer.SynthesizeAuthFile(ctx, path, data); len(generated) > 0 { + pathAuths := make(map[string]*coreauth.Auth, len(generated)) + for _, a := range generated { + if a == nil || strings.TrimSpace(a.ID) == "" { + continue + } + pathAuths[a.ID] = a.Clone() + } + if len(pathAuths) > 0 { + w.fileAuthsByPath[normalizedPath] = pathAuths + } + } } } return nil @@ -143,13 +163,14 @@ func (w *Watcher) addOrUpdateClient(path string) { } w.clientsMutex.Lock() - - cfg := w.config - if cfg == nil { + if w.config == nil { log.Error("config is nil, cannot add or update client") w.clientsMutex.Unlock() return } + if w.fileAuthsByPath == nil { + w.fileAuthsByPath = make(map[string]map[string]*coreauth.Auth) + } if prev, ok := w.lastAuthHashes[normalized]; ok && prev == curHash { log.Debugf("auth file unchanged (hash match), skipping reload: %s", filepath.Base(path)) w.clientsMutex.Unlock() @@ -177,34 +198,85 @@ func (w *Watcher) addOrUpdateClient(path string) { } w.lastAuthContents[normalized] = &newAuth - w.clientsMutex.Unlock() // Unlock before the callback - - w.refreshAuthState(false) - - if w.reloadCallback != nil { - log.Debugf("triggering server update callback after add/update") - w.reloadCallback(cfg) + oldByID := make(map[string]*coreauth.Auth) + if existing := w.fileAuthsByPath[normalized]; len(existing) > 0 { + for id, a := range existing { + oldByID[id] = a + } } + + // Build synthesized auth entries for this single file only. + sctx := &synthesizer.SynthesisContext{ + Config: w.config, + AuthDir: w.authDir, + Now: time.Now(), + IDGenerator: synthesizer.NewStableIDGenerator(), + } + generated := synthesizer.SynthesizeAuthFile(sctx, path, data) + newByID := make(map[string]*coreauth.Auth) + for _, a := range generated { + if a == nil || strings.TrimSpace(a.ID) == "" { + continue + } + newByID[a.ID] = a.Clone() + } + if len(newByID) > 0 { + w.fileAuthsByPath[normalized] = newByID + } else { + delete(w.fileAuthsByPath, normalized) + } + updates := w.computePerPathUpdatesLocked(oldByID, newByID) + w.clientsMutex.Unlock() + w.persistAuthAsync(fmt.Sprintf("Sync auth %s", filepath.Base(path)), path) + w.dispatchAuthUpdates(updates) } func (w *Watcher) removeClient(path string) { normalized := w.normalizeAuthPath(path) w.clientsMutex.Lock() - - cfg := w.config + oldByID := make(map[string]*coreauth.Auth) + if existing := w.fileAuthsByPath[normalized]; len(existing) > 0 { + for id, a := range existing { + oldByID[id] = a + } + } delete(w.lastAuthHashes, normalized) delete(w.lastAuthContents, normalized) + delete(w.fileAuthsByPath, normalized) - w.clientsMutex.Unlock() // Release the lock before the callback + updates := w.computePerPathUpdatesLocked(oldByID, map[string]*coreauth.Auth{}) + w.clientsMutex.Unlock() - w.refreshAuthState(false) - - if w.reloadCallback != nil { - log.Debugf("triggering server update callback after removal") - w.reloadCallback(cfg) - } w.persistAuthAsync(fmt.Sprintf("Remove auth %s", filepath.Base(path)), path) + w.dispatchAuthUpdates(updates) +} + +func (w *Watcher) computePerPathUpdatesLocked(oldByID, newByID map[string]*coreauth.Auth) []AuthUpdate { + if w.currentAuths == nil { + w.currentAuths = make(map[string]*coreauth.Auth) + } + updates := make([]AuthUpdate, 0, len(oldByID)+len(newByID)) + for id, newAuth := range newByID { + existing, ok := w.currentAuths[id] + if !ok { + w.currentAuths[id] = newAuth.Clone() + updates = append(updates, AuthUpdate{Action: AuthUpdateActionAdd, ID: id, Auth: newAuth.Clone()}) + continue + } + if !authEqual(existing, newAuth) { + w.currentAuths[id] = newAuth.Clone() + updates = append(updates, AuthUpdate{Action: AuthUpdateActionModify, ID: id, Auth: newAuth.Clone()}) + } + } + for id := range oldByID { + if _, stillExists := newByID[id]; stillExists { + continue + } + delete(w.currentAuths, id) + updates = append(updates, AuthUpdate{Action: AuthUpdateActionDelete, ID: id}) + } + return updates } func (w *Watcher) loadFileClients(cfg *config.Config) int { diff --git a/internal/watcher/dispatcher.go b/internal/watcher/dispatcher.go index ff3c5b63..3d7d7527 100644 --- a/internal/watcher/dispatcher.go +++ b/internal/watcher/dispatcher.go @@ -14,6 +14,8 @@ import ( coreauth "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth" ) +var snapshotCoreAuthsFunc = snapshotCoreAuths + func (w *Watcher) setAuthUpdateQueue(queue chan<- AuthUpdate) { w.clientsMutex.Lock() defer w.clientsMutex.Unlock() @@ -76,7 +78,11 @@ func (w *Watcher) dispatchRuntimeAuthUpdate(update AuthUpdate) bool { } func (w *Watcher) refreshAuthState(force bool) { - auths := w.SnapshotCoreAuths() + w.clientsMutex.RLock() + cfg := w.config + authDir := w.authDir + w.clientsMutex.RUnlock() + auths := snapshotCoreAuthsFunc(cfg, authDir) w.clientsMutex.Lock() if len(w.runtimeAuths) > 0 { for _, a := range w.runtimeAuths { diff --git a/internal/watcher/synthesizer/file.go b/internal/watcher/synthesizer/file.go index 4e053117..50f3a2ab 100644 --- a/internal/watcher/synthesizer/file.go +++ b/internal/watcher/synthesizer/file.go @@ -35,9 +35,6 @@ func (s *FileSynthesizer) Synthesize(ctx *SynthesisContext) ([]*coreauth.Auth, e return out, nil } - now := ctx.Now - cfg := ctx.Config - for _, e := range entries { if e.IsDir() { continue @@ -51,95 +48,117 @@ func (s *FileSynthesizer) Synthesize(ctx *SynthesisContext) ([]*coreauth.Auth, e if errRead != nil || len(data) == 0 { continue } - var metadata map[string]any - if errUnmarshal := json.Unmarshal(data, &metadata); errUnmarshal != nil { + auths := synthesizeFileAuths(ctx, full, data) + if len(auths) == 0 { continue } - t, _ := metadata["type"].(string) - if t == "" { - continue - } - provider := strings.ToLower(t) - if provider == "gemini" { - provider = "gemini-cli" - } - label := provider - if email, _ := metadata["email"].(string); email != "" { - label = email - } - // Use relative path under authDir as ID to stay consistent with the file-based token store - id := full - if rel, errRel := filepath.Rel(ctx.AuthDir, full); errRel == nil && rel != "" { - id = rel - } - - proxyURL := "" - if p, ok := metadata["proxy_url"].(string); ok { - proxyURL = p - } - - prefix := "" - if rawPrefix, ok := metadata["prefix"].(string); ok { - trimmed := strings.TrimSpace(rawPrefix) - trimmed = strings.Trim(trimmed, "/") - if trimmed != "" && !strings.Contains(trimmed, "/") { - prefix = trimmed - } - } - - disabled, _ := metadata["disabled"].(bool) - status := coreauth.StatusActive - if disabled { - status = coreauth.StatusDisabled - } - - // Read per-account excluded models from the OAuth JSON file - perAccountExcluded := extractExcludedModelsFromMetadata(metadata) - - a := &coreauth.Auth{ - ID: id, - Provider: provider, - Label: label, - Prefix: prefix, - Status: status, - Disabled: disabled, - Attributes: map[string]string{ - "source": full, - "path": full, - }, - ProxyURL: proxyURL, - Metadata: metadata, - CreatedAt: now, - UpdatedAt: now, - } - // Read priority from auth file - if rawPriority, ok := metadata["priority"]; ok { - switch v := rawPriority.(type) { - case float64: - a.Attributes["priority"] = strconv.Itoa(int(v)) - case string: - priority := strings.TrimSpace(v) - if _, errAtoi := strconv.Atoi(priority); errAtoi == nil { - a.Attributes["priority"] = priority - } - } - } - ApplyAuthExcludedModelsMeta(a, cfg, perAccountExcluded, "oauth") - if provider == "gemini-cli" { - if virtuals := SynthesizeGeminiVirtualAuths(a, metadata, now); len(virtuals) > 0 { - for _, v := range virtuals { - ApplyAuthExcludedModelsMeta(v, cfg, perAccountExcluded, "oauth") - } - out = append(out, a) - out = append(out, virtuals...) - continue - } - } - out = append(out, a) + out = append(out, auths...) } return out, nil } +// SynthesizeAuthFile generates Auth entries for one auth JSON file payload. +// It shares exactly the same mapping behavior as FileSynthesizer.Synthesize. +func SynthesizeAuthFile(ctx *SynthesisContext, fullPath string, data []byte) []*coreauth.Auth { + return synthesizeFileAuths(ctx, fullPath, data) +} + +func synthesizeFileAuths(ctx *SynthesisContext, fullPath string, data []byte) []*coreauth.Auth { + if ctx == nil || len(data) == 0 { + return nil + } + now := ctx.Now + cfg := ctx.Config + var metadata map[string]any + if errUnmarshal := json.Unmarshal(data, &metadata); errUnmarshal != nil { + return nil + } + t, _ := metadata["type"].(string) + if t == "" { + return nil + } + provider := strings.ToLower(t) + if provider == "gemini" { + provider = "gemini-cli" + } + label := provider + if email, _ := metadata["email"].(string); email != "" { + label = email + } + // Use relative path under authDir as ID to stay consistent with the file-based token store. + id := fullPath + if strings.TrimSpace(ctx.AuthDir) != "" { + if rel, errRel := filepath.Rel(ctx.AuthDir, fullPath); errRel == nil && rel != "" { + id = rel + } + } + + proxyURL := "" + if p, ok := metadata["proxy_url"].(string); ok { + proxyURL = p + } + + prefix := "" + if rawPrefix, ok := metadata["prefix"].(string); ok { + trimmed := strings.TrimSpace(rawPrefix) + trimmed = strings.Trim(trimmed, "/") + if trimmed != "" && !strings.Contains(trimmed, "/") { + prefix = trimmed + } + } + + disabled, _ := metadata["disabled"].(bool) + status := coreauth.StatusActive + if disabled { + status = coreauth.StatusDisabled + } + + // Read per-account excluded models from the OAuth JSON file. + perAccountExcluded := extractExcludedModelsFromMetadata(metadata) + + a := &coreauth.Auth{ + ID: id, + Provider: provider, + Label: label, + Prefix: prefix, + Status: status, + Disabled: disabled, + Attributes: map[string]string{ + "source": fullPath, + "path": fullPath, + }, + ProxyURL: proxyURL, + Metadata: metadata, + CreatedAt: now, + UpdatedAt: now, + } + // Read priority from auth file. + if rawPriority, ok := metadata["priority"]; ok { + switch v := rawPriority.(type) { + case float64: + a.Attributes["priority"] = strconv.Itoa(int(v)) + case string: + priority := strings.TrimSpace(v) + if _, errAtoi := strconv.Atoi(priority); errAtoi == nil { + a.Attributes["priority"] = priority + } + } + } + ApplyAuthExcludedModelsMeta(a, cfg, perAccountExcluded, "oauth") + if provider == "gemini-cli" { + if virtuals := SynthesizeGeminiVirtualAuths(a, metadata, now); len(virtuals) > 0 { + for _, v := range virtuals { + ApplyAuthExcludedModelsMeta(v, cfg, perAccountExcluded, "oauth") + } + out := make([]*coreauth.Auth, 0, 1+len(virtuals)) + out = append(out, a) + out = append(out, virtuals...) + return out + } + } + return []*coreauth.Auth{a} +} + // SynthesizeGeminiVirtualAuths creates virtual Auth entries for multi-project Gemini credentials. // It disables the primary auth and creates one virtual auth per project. func SynthesizeGeminiVirtualAuths(primary *coreauth.Auth, metadata map[string]any, now time.Time) []*coreauth.Auth { diff --git a/internal/watcher/watcher.go b/internal/watcher/watcher.go index 9f370127..8180e474 100644 --- a/internal/watcher/watcher.go +++ b/internal/watcher/watcher.go @@ -39,6 +39,7 @@ type Watcher struct { watcher *fsnotify.Watcher lastAuthHashes map[string]string lastAuthContents map[string]*coreauth.Auth + fileAuthsByPath map[string]map[string]*coreauth.Auth lastRemoveTimes map[string]time.Time lastConfigHash string authQueue chan<- AuthUpdate @@ -85,11 +86,12 @@ func NewWatcher(configPath, authDir string, reloadCallback func(*config.Config)) return nil, errNewWatcher } w := &Watcher{ - configPath: configPath, - authDir: authDir, - reloadCallback: reloadCallback, - watcher: watcher, - lastAuthHashes: make(map[string]string), + configPath: configPath, + authDir: authDir, + reloadCallback: reloadCallback, + watcher: watcher, + lastAuthHashes: make(map[string]string), + fileAuthsByPath: make(map[string]map[string]*coreauth.Auth), } w.dispatchCond = sync.NewCond(&w.dispatchMu) if store := sdkAuth.GetTokenStore(); store != nil { diff --git a/internal/watcher/watcher_test.go b/internal/watcher/watcher_test.go index a3be5877..32354e2f 100644 --- a/internal/watcher/watcher_test.go +++ b/internal/watcher/watcher_test.go @@ -387,7 +387,7 @@ func TestAddOrUpdateClientSkipsUnchanged(t *testing.T) { } } -func TestAddOrUpdateClientTriggersReloadAndHash(t *testing.T) { +func TestAddOrUpdateClientUpdatesHashWithoutReload(t *testing.T) { tmpDir := t.TempDir() authFile := filepath.Join(tmpDir, "sample.json") if err := os.WriteFile(authFile, []byte(`{"type":"demo","api_key":"k"}`), 0o644); err != nil { @@ -406,8 +406,8 @@ func TestAddOrUpdateClientTriggersReloadAndHash(t *testing.T) { w.addOrUpdateClient(authFile) - if got := atomic.LoadInt32(&reloads); got != 1 { - t.Fatalf("expected reload callback once, got %d", got) + if got := atomic.LoadInt32(&reloads); got != 0 { + t.Fatalf("expected no reload callback for auth update, got %d", got) } // Use normalizeAuthPath to match how addOrUpdateClient stores the key normalized := w.normalizeAuthPath(authFile) @@ -416,7 +416,7 @@ func TestAddOrUpdateClientTriggersReloadAndHash(t *testing.T) { } } -func TestRemoveClientRemovesHash(t *testing.T) { +func TestRemoveClientRemovesHashWithoutReload(t *testing.T) { tmpDir := t.TempDir() authFile := filepath.Join(tmpDir, "sample.json") var reloads int32 @@ -436,8 +436,39 @@ func TestRemoveClientRemovesHash(t *testing.T) { if _, ok := w.lastAuthHashes[w.normalizeAuthPath(authFile)]; ok { t.Fatal("expected hash to be removed after deletion") } - if got := atomic.LoadInt32(&reloads); got != 1 { - t.Fatalf("expected reload callback once, got %d", got) + if got := atomic.LoadInt32(&reloads); got != 0 { + t.Fatalf("expected no reload callback for auth removal, got %d", got) + } +} + +func TestAuthFileEventsDoNotInvokeSnapshotCoreAuths(t *testing.T) { + tmpDir := t.TempDir() + authFile := filepath.Join(tmpDir, "sample.json") + if err := os.WriteFile(authFile, []byte(`{"type":"codex","email":"u@example.com"}`), 0o644); err != nil { + t.Fatalf("failed to create auth file: %v", err) + } + + origSnapshot := snapshotCoreAuthsFunc + var snapshotCalls int32 + snapshotCoreAuthsFunc = func(cfg *config.Config, authDir string) []*coreauth.Auth { + atomic.AddInt32(&snapshotCalls, 1) + return origSnapshot(cfg, authDir) + } + defer func() { snapshotCoreAuthsFunc = origSnapshot }() + + w := &Watcher{ + authDir: tmpDir, + lastAuthHashes: make(map[string]string), + lastAuthContents: make(map[string]*coreauth.Auth), + fileAuthsByPath: make(map[string]map[string]*coreauth.Auth), + } + w.SetConfig(&config.Config{AuthDir: tmpDir}) + + w.addOrUpdateClient(authFile) + w.removeClient(authFile) + + if got := atomic.LoadInt32(&snapshotCalls); got != 0 { + t.Fatalf("expected auth file events to avoid full snapshot, got %d calls", got) } } @@ -631,7 +662,7 @@ func TestStopConfigReloadTimerSafeWhenNil(t *testing.T) { w.stopConfigReloadTimer() } -func TestHandleEventRemovesAuthFile(t *testing.T) { +func TestHandleEventRemovesAuthFileWithoutReload(t *testing.T) { tmpDir := t.TempDir() authFile := filepath.Join(tmpDir, "remove.json") if err := os.WriteFile(authFile, []byte(`{"type":"demo"}`), 0o644); err != nil { @@ -655,8 +686,8 @@ func TestHandleEventRemovesAuthFile(t *testing.T) { w.handleEvent(fsnotify.Event{Name: authFile, Op: fsnotify.Remove}) - if atomic.LoadInt32(&reloads) != 1 { - t.Fatalf("expected reload callback once, got %d", reloads) + if atomic.LoadInt32(&reloads) != 0 { + t.Fatalf("expected no reload callback for auth removal, got %d", reloads) } if _, ok := w.lastAuthHashes[w.normalizeAuthPath(authFile)]; ok { t.Fatal("expected hash entry to be removed") @@ -853,8 +884,8 @@ func TestHandleEventAuthWriteTriggersUpdate(t *testing.T) { w.SetConfig(&config.Config{AuthDir: authDir}) w.handleEvent(fsnotify.Event{Name: authFile, Op: fsnotify.Write}) - if atomic.LoadInt32(&reloads) != 1 { - t.Fatalf("expected auth write to trigger reload callback, got %d", reloads) + if atomic.LoadInt32(&reloads) != 0 { + t.Fatalf("expected auth write to avoid global reload callback, got %d", reloads) } } @@ -921,7 +952,7 @@ func TestHandleEventAtomicReplaceUnchangedSkips(t *testing.T) { } } -func TestHandleEventAtomicReplaceChangedTriggersUpdate(t *testing.T) { +func TestHandleEventAtomicReplaceChangedTriggersIncrementalUpdateOnly(t *testing.T) { tmpDir := t.TempDir() authDir := filepath.Join(tmpDir, "auth") if err := os.MkdirAll(authDir, 0o755); err != nil { @@ -950,8 +981,8 @@ func TestHandleEventAtomicReplaceChangedTriggersUpdate(t *testing.T) { w.lastAuthHashes[w.normalizeAuthPath(authFile)] = hexString(oldSum[:]) w.handleEvent(fsnotify.Event{Name: authFile, Op: fsnotify.Rename}) - if atomic.LoadInt32(&reloads) != 1 { - t.Fatalf("expected changed atomic replace to trigger update, got %d", reloads) + if atomic.LoadInt32(&reloads) != 0 { + t.Fatalf("expected changed atomic replace to avoid global reload, got %d", reloads) } } @@ -982,7 +1013,7 @@ func TestHandleEventRemoveUnknownFileIgnored(t *testing.T) { } } -func TestHandleEventRemoveKnownFileDeletes(t *testing.T) { +func TestHandleEventRemoveKnownFileDeletesWithoutReload(t *testing.T) { tmpDir := t.TempDir() authDir := filepath.Join(tmpDir, "auth") if err := os.MkdirAll(authDir, 0o755); err != nil { @@ -1005,8 +1036,8 @@ func TestHandleEventRemoveKnownFileDeletes(t *testing.T) { w.lastAuthHashes[w.normalizeAuthPath(authFile)] = "hash" w.handleEvent(fsnotify.Event{Name: authFile, Op: fsnotify.Remove}) - if atomic.LoadInt32(&reloads) != 1 { - t.Fatalf("expected known remove to trigger reload, got %d", reloads) + if atomic.LoadInt32(&reloads) != 0 { + t.Fatalf("expected known remove to avoid global reload, got %d", reloads) } if _, ok := w.lastAuthHashes[w.normalizeAuthPath(authFile)]; ok { t.Fatal("expected known auth hash to be deleted") @@ -1239,67 +1270,6 @@ func TestReloadConfigFiltersAffectedOAuthProviders(t *testing.T) { } } -func TestReloadConfigTriggersCallbackForMaxRetryCredentialsChange(t *testing.T) { - tmpDir := t.TempDir() - authDir := filepath.Join(tmpDir, "auth") - if err := os.MkdirAll(authDir, 0o755); err != nil { - t.Fatalf("failed to create auth dir: %v", err) - } - configPath := filepath.Join(tmpDir, "config.yaml") - - oldCfg := &config.Config{ - AuthDir: authDir, - MaxRetryCredentials: 0, - RequestRetry: 1, - MaxRetryInterval: 5, - } - newCfg := &config.Config{ - AuthDir: authDir, - MaxRetryCredentials: 2, - RequestRetry: 1, - MaxRetryInterval: 5, - } - data, errMarshal := yaml.Marshal(newCfg) - if errMarshal != nil { - t.Fatalf("failed to marshal config: %v", errMarshal) - } - if errWrite := os.WriteFile(configPath, data, 0o644); errWrite != nil { - t.Fatalf("failed to write config: %v", errWrite) - } - - callbackCalls := 0 - callbackMaxRetryCredentials := -1 - w := &Watcher{ - configPath: configPath, - authDir: authDir, - lastAuthHashes: make(map[string]string), - reloadCallback: func(cfg *config.Config) { - callbackCalls++ - if cfg != nil { - callbackMaxRetryCredentials = cfg.MaxRetryCredentials - } - }, - } - w.SetConfig(oldCfg) - - if ok := w.reloadConfig(); !ok { - t.Fatal("expected reloadConfig to succeed") - } - - if callbackCalls != 1 { - t.Fatalf("expected reload callback to be called once, got %d", callbackCalls) - } - if callbackMaxRetryCredentials != 2 { - t.Fatalf("expected callback MaxRetryCredentials=2, got %d", callbackMaxRetryCredentials) - } - - w.clientsMutex.RLock() - defer w.clientsMutex.RUnlock() - if w.config == nil || w.config.MaxRetryCredentials != 2 { - t.Fatalf("expected watcher config MaxRetryCredentials=2, got %+v", w.config) - } -} - func TestStartFailsWhenAuthDirMissing(t *testing.T) { tmpDir := t.TempDir() configPath := filepath.Join(tmpDir, "config.yaml")