From 8526c2da257e8b5e9bf1c640f66fd93daab2fe1f Mon Sep 17 00:00:00 2001 From: constansino Date: Thu, 5 Mar 2026 19:12:57 +0800 Subject: [PATCH 1/2] fix(watcher): debounce auth event callback storms --- internal/watcher/clients.go | 62 +++++++++++++++++++++++++++++++++++-- internal/watcher/watcher.go | 6 ++++ 2 files changed, 66 insertions(+), 2 deletions(-) diff --git a/internal/watcher/clients.go b/internal/watcher/clients.go index cf0ed076..a1f00f14 100644 --- a/internal/watcher/clients.go +++ b/internal/watcher/clients.go @@ -183,7 +183,7 @@ func (w *Watcher) addOrUpdateClient(path string) { if w.reloadCallback != nil { log.Debugf("triggering server update callback after add/update") - w.reloadCallback(cfg) + w.triggerServerUpdate(cfg) } w.persistAuthAsync(fmt.Sprintf("Sync auth %s", filepath.Base(path)), path) } @@ -202,7 +202,7 @@ func (w *Watcher) removeClient(path string) { if w.reloadCallback != nil { log.Debugf("triggering server update callback after removal") - w.reloadCallback(cfg) + w.triggerServerUpdate(cfg) } w.persistAuthAsync(fmt.Sprintf("Remove auth %s", filepath.Base(path)), path) } @@ -303,3 +303,61 @@ func (w *Watcher) persistAuthAsync(message string, paths ...string) { } }() } + +func (w *Watcher) stopServerUpdateTimer() { + w.serverUpdateMu.Lock() + defer w.serverUpdateMu.Unlock() + if w.serverUpdateTimer != nil { + w.serverUpdateTimer.Stop() + w.serverUpdateTimer = nil + } + w.serverUpdatePend = false +} + +func (w *Watcher) triggerServerUpdate(cfg *config.Config) { + if w == nil || w.reloadCallback == nil || cfg == nil { + return + } + + now := time.Now() + + w.serverUpdateMu.Lock() + if w.serverUpdateLast.IsZero() || now.Sub(w.serverUpdateLast) >= serverUpdateDebounce { + w.serverUpdateLast = now + w.serverUpdateMu.Unlock() + w.reloadCallback(cfg) + return + } + + if w.serverUpdatePend { + w.serverUpdateMu.Unlock() + return + } + + delay := serverUpdateDebounce - now.Sub(w.serverUpdateLast) + if delay < 10*time.Millisecond { + delay = 10 * time.Millisecond + } + w.serverUpdatePend = true + if w.serverUpdateTimer != nil { + w.serverUpdateTimer.Stop() + } + w.serverUpdateTimer = time.AfterFunc(delay, func() { + w.clientsMutex.RLock() + latestCfg := w.config + w.clientsMutex.RUnlock() + if latestCfg == nil || w.reloadCallback == nil { + w.serverUpdateMu.Lock() + w.serverUpdatePend = false + w.serverUpdateMu.Unlock() + return + } + + w.serverUpdateMu.Lock() + w.serverUpdateLast = time.Now() + w.serverUpdatePend = false + w.serverUpdateMu.Unlock() + w.reloadCallback(latestCfg) + }) + w.serverUpdateMu.Unlock() +} diff --git a/internal/watcher/watcher.go b/internal/watcher/watcher.go index 9f370127..c40fef7b 100644 --- a/internal/watcher/watcher.go +++ b/internal/watcher/watcher.go @@ -35,6 +35,10 @@ type Watcher struct { clientsMutex sync.RWMutex configReloadMu sync.Mutex configReloadTimer *time.Timer + serverUpdateMu sync.Mutex + serverUpdateTimer *time.Timer + serverUpdateLast time.Time + serverUpdatePend bool reloadCallback func(*config.Config) watcher *fsnotify.Watcher lastAuthHashes map[string]string @@ -76,6 +80,7 @@ const ( replaceCheckDelay = 50 * time.Millisecond configReloadDebounce = 150 * time.Millisecond authRemoveDebounceWindow = 1 * time.Second + serverUpdateDebounce = 1 * time.Second ) // NewWatcher creates a new file watcher instance @@ -116,6 +121,7 @@ func (w *Watcher) Start(ctx context.Context) error { func (w *Watcher) Stop() error { w.stopDispatch() w.stopConfigReloadTimer() + w.stopServerUpdateTimer() return w.watcher.Close() } From ac95e92829ae945c9005fb899e1567c4f83b0344 Mon Sep 17 00:00:00 2001 From: constansino Date: Thu, 5 Mar 2026 19:25:57 +0800 Subject: [PATCH 2/2] fix(watcher): guard debounced callback after Stop --- internal/watcher/clients.go | 8 +++++++- internal/watcher/watcher.go | 3 +++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/internal/watcher/clients.go b/internal/watcher/clients.go index a1f00f14..de1b80f4 100644 --- a/internal/watcher/clients.go +++ b/internal/watcher/clients.go @@ -318,6 +318,9 @@ func (w *Watcher) triggerServerUpdate(cfg *config.Config) { if w == nil || w.reloadCallback == nil || cfg == nil { return } + if w.stopped.Load() { + return + } now := time.Now() @@ -343,10 +346,13 @@ func (w *Watcher) triggerServerUpdate(cfg *config.Config) { w.serverUpdateTimer.Stop() } w.serverUpdateTimer = time.AfterFunc(delay, func() { + if w.stopped.Load() { + return + } w.clientsMutex.RLock() latestCfg := w.config w.clientsMutex.RUnlock() - if latestCfg == nil || w.reloadCallback == nil { + if latestCfg == nil || w.reloadCallback == nil || w.stopped.Load() { w.serverUpdateMu.Lock() w.serverUpdatePend = false w.serverUpdateMu.Unlock() diff --git a/internal/watcher/watcher.go b/internal/watcher/watcher.go index c40fef7b..76e2dee5 100644 --- a/internal/watcher/watcher.go +++ b/internal/watcher/watcher.go @@ -6,6 +6,7 @@ import ( "context" "strings" "sync" + "sync/atomic" "time" "github.com/fsnotify/fsnotify" @@ -39,6 +40,7 @@ type Watcher struct { serverUpdateTimer *time.Timer serverUpdateLast time.Time serverUpdatePend bool + stopped atomic.Bool reloadCallback func(*config.Config) watcher *fsnotify.Watcher lastAuthHashes map[string]string @@ -119,6 +121,7 @@ func (w *Watcher) Start(ctx context.Context) error { // Stop stops the file watcher func (w *Watcher) Stop() error { + w.stopped.Store(true) w.stopDispatch() w.stopConfigReloadTimer() w.stopServerUpdateTimer()