mirror of
https://github.com/router-for-me/CLIProxyAPIPlus.git
synced 2026-03-21 16:40:22 +00:00
fix(watcher): debounce auth event callback storms
This commit is contained in:
@@ -183,7 +183,7 @@ func (w *Watcher) addOrUpdateClient(path string) {
|
|||||||
|
|
||||||
if w.reloadCallback != nil {
|
if w.reloadCallback != nil {
|
||||||
log.Debugf("triggering server update callback after add/update")
|
log.Debugf("triggering server update callback after add/update")
|
||||||
w.reloadCallback(cfg)
|
w.triggerServerUpdate(cfg)
|
||||||
}
|
}
|
||||||
w.persistAuthAsync(fmt.Sprintf("Sync auth %s", filepath.Base(path)), path)
|
w.persistAuthAsync(fmt.Sprintf("Sync auth %s", filepath.Base(path)), path)
|
||||||
}
|
}
|
||||||
@@ -202,7 +202,7 @@ func (w *Watcher) removeClient(path string) {
|
|||||||
|
|
||||||
if w.reloadCallback != nil {
|
if w.reloadCallback != nil {
|
||||||
log.Debugf("triggering server update callback after removal")
|
log.Debugf("triggering server update callback after removal")
|
||||||
w.reloadCallback(cfg)
|
w.triggerServerUpdate(cfg)
|
||||||
}
|
}
|
||||||
w.persistAuthAsync(fmt.Sprintf("Remove auth %s", filepath.Base(path)), path)
|
w.persistAuthAsync(fmt.Sprintf("Remove auth %s", filepath.Base(path)), path)
|
||||||
}
|
}
|
||||||
@@ -303,3 +303,61 @@ func (w *Watcher) persistAuthAsync(message string, paths ...string) {
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (w *Watcher) stopServerUpdateTimer() {
|
||||||
|
w.serverUpdateMu.Lock()
|
||||||
|
defer w.serverUpdateMu.Unlock()
|
||||||
|
if w.serverUpdateTimer != nil {
|
||||||
|
w.serverUpdateTimer.Stop()
|
||||||
|
w.serverUpdateTimer = nil
|
||||||
|
}
|
||||||
|
w.serverUpdatePend = false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *Watcher) triggerServerUpdate(cfg *config.Config) {
|
||||||
|
if w == nil || w.reloadCallback == nil || cfg == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
|
w.serverUpdateMu.Lock()
|
||||||
|
if w.serverUpdateLast.IsZero() || now.Sub(w.serverUpdateLast) >= serverUpdateDebounce {
|
||||||
|
w.serverUpdateLast = now
|
||||||
|
w.serverUpdateMu.Unlock()
|
||||||
|
w.reloadCallback(cfg)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if w.serverUpdatePend {
|
||||||
|
w.serverUpdateMu.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
delay := serverUpdateDebounce - now.Sub(w.serverUpdateLast)
|
||||||
|
if delay < 10*time.Millisecond {
|
||||||
|
delay = 10 * time.Millisecond
|
||||||
|
}
|
||||||
|
w.serverUpdatePend = true
|
||||||
|
if w.serverUpdateTimer != nil {
|
||||||
|
w.serverUpdateTimer.Stop()
|
||||||
|
}
|
||||||
|
w.serverUpdateTimer = time.AfterFunc(delay, func() {
|
||||||
|
w.clientsMutex.RLock()
|
||||||
|
latestCfg := w.config
|
||||||
|
w.clientsMutex.RUnlock()
|
||||||
|
if latestCfg == nil || w.reloadCallback == nil {
|
||||||
|
w.serverUpdateMu.Lock()
|
||||||
|
w.serverUpdatePend = false
|
||||||
|
w.serverUpdateMu.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
w.serverUpdateMu.Lock()
|
||||||
|
w.serverUpdateLast = time.Now()
|
||||||
|
w.serverUpdatePend = false
|
||||||
|
w.serverUpdateMu.Unlock()
|
||||||
|
w.reloadCallback(latestCfg)
|
||||||
|
})
|
||||||
|
w.serverUpdateMu.Unlock()
|
||||||
|
}
|
||||||
|
|||||||
@@ -35,6 +35,10 @@ type Watcher struct {
|
|||||||
clientsMutex sync.RWMutex
|
clientsMutex sync.RWMutex
|
||||||
configReloadMu sync.Mutex
|
configReloadMu sync.Mutex
|
||||||
configReloadTimer *time.Timer
|
configReloadTimer *time.Timer
|
||||||
|
serverUpdateMu sync.Mutex
|
||||||
|
serverUpdateTimer *time.Timer
|
||||||
|
serverUpdateLast time.Time
|
||||||
|
serverUpdatePend bool
|
||||||
reloadCallback func(*config.Config)
|
reloadCallback func(*config.Config)
|
||||||
watcher *fsnotify.Watcher
|
watcher *fsnotify.Watcher
|
||||||
lastAuthHashes map[string]string
|
lastAuthHashes map[string]string
|
||||||
@@ -76,6 +80,7 @@ const (
|
|||||||
replaceCheckDelay = 50 * time.Millisecond
|
replaceCheckDelay = 50 * time.Millisecond
|
||||||
configReloadDebounce = 150 * time.Millisecond
|
configReloadDebounce = 150 * time.Millisecond
|
||||||
authRemoveDebounceWindow = 1 * time.Second
|
authRemoveDebounceWindow = 1 * time.Second
|
||||||
|
serverUpdateDebounce = 1 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewWatcher creates a new file watcher instance
|
// NewWatcher creates a new file watcher instance
|
||||||
@@ -116,6 +121,7 @@ func (w *Watcher) Start(ctx context.Context) error {
|
|||||||
func (w *Watcher) Stop() error {
|
func (w *Watcher) Stop() error {
|
||||||
w.stopDispatch()
|
w.stopDispatch()
|
||||||
w.stopConfigReloadTimer()
|
w.stopConfigReloadTimer()
|
||||||
|
w.stopServerUpdateTimer()
|
||||||
return w.watcher.Close()
|
return w.watcher.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user