From 6046a8c95b4c16176db5717a338ff03ae18b45b3 Mon Sep 17 00:00:00 2001 From: Luis Pater Date: Tue, 23 Sep 2025 04:33:48 +0800 Subject: [PATCH] feat(auth): enhance watcher with asynchronous dispatch and buffering - Added async dispatch loop to `Watcher` for handling incremental `AuthUpdate` with in-memory buffering. - Improved resilience against high-frequency auth changes by coalescing updates and reducing redundant processing. - Updated `cliproxy` service to increase auth update queue capacity and optimize backlog consumption. - Added detailed SDK integration documentation in English and Chinese (`sdk-watcher.md`, `sdk-watcher_CN.md`). --- docs/sdk-watcher.md | 32 +++++++++ docs/sdk-watcher_CN.md | 32 +++++++++ internal/watcher/watcher.go | 127 ++++++++++++++++++++++++++++++++++-- sdk/cliproxy/service.go | 10 ++- 4 files changed, 195 insertions(+), 6 deletions(-) create mode 100644 docs/sdk-watcher.md create mode 100644 docs/sdk-watcher_CN.md diff --git a/docs/sdk-watcher.md b/docs/sdk-watcher.md new file mode 100644 index 00000000..c455448b --- /dev/null +++ b/docs/sdk-watcher.md @@ -0,0 +1,32 @@ +# SDK Watcher Integration + +The SDK service exposes a watcher integration that surfaces granular auth updates without forcing a full reload. This document explains the queue contract, how the service consumes updates, and how high-frequency change bursts are handled. + +## Update Queue Contract + +- `watcher.AuthUpdate` represents a single credential change. `Action` may be `add`, `modify`, or `delete`, and `ID` carries the credential identifier. For `add`/`modify` the `Auth` payload contains a fully populated clone of the credential; `delete` may omit `Auth`. +- `WatcherWrapper.SetAuthUpdateQueue(chan<- watcher.AuthUpdate)` wires the queue produced by the SDK service into the watcher. The queue must be created before the watcher starts. +- The service builds the queue via `ensureAuthUpdateQueue`, using a buffered channel (`capacity=256`) and a dedicated consumer goroutine (`consumeAuthUpdates`). The consumer drains bursts by looping through the backlog before reacquiring the select loop. + +## Watcher Behaviour + +- `internal/watcher/watcher.go` keeps a shadow snapshot of auth state (`currentAuths`). Each filesystem or configuration event triggers a recomputation and a diff against the previous snapshot to produce minimal `AuthUpdate` entries that mirror adds, edits, and removals. +- Updates are coalesced per credential identifier. If multiple changes occur before dispatch (e.g., write followed by delete), only the final action is sent downstream. +- The watcher runs an internal dispatch loop that buffers pending updates in memory and forwards them asynchronously to the queue. Producers never block on channel capacity; they just enqueue into the in-memory buffer and signal the dispatcher. Dispatch cancellation happens when the watcher stops, guaranteeing goroutines exit cleanly. + +## High-Frequency Change Handling + +- The dispatch loop and service consumer run independently, preventing filesystem watchers from blocking even when many updates arrive at once. +- Back-pressure is absorbed in two places: + - The dispatch buffer (map + order slice) coalesces repeated updates for the same credential until the consumer catches up. + - The service channel capacity (256) combined with the consumer drain loop ensures several bursts can be processed without oscillation. +- If the queue is saturated for an extended period, updates continue to be merged, so the latest state is eventually applied without replaying redundant intermediate states. + +## Usage Checklist + +1. Instantiate the SDK service (builder or manual construction). +2. Call `ensureAuthUpdateQueue` before starting the watcher to allocate the shared channel. +3. When the `WatcherWrapper` is created, call `SetAuthUpdateQueue` with the service queue, then start the watcher. +4. Provide a reload callback that handles configuration updates; auth deltas will arrive via the queue and are applied by the service automatically through `handleAuthUpdate`. + +Following this flow keeps auth changes responsive while avoiding full reloads for every edit. diff --git a/docs/sdk-watcher_CN.md b/docs/sdk-watcher_CN.md new file mode 100644 index 00000000..0373a45d --- /dev/null +++ b/docs/sdk-watcher_CN.md @@ -0,0 +1,32 @@ +# SDK Watcher集成说明 + +本文档介绍SDK服务与文件监控器之间的增量更新队列,包括接口契约、高频变更下的处理策略以及接入步骤。 + +## 更新队列契约 + +- `watcher.AuthUpdate`描述单条凭据变更,`Action`可能为`add`、`modify`或`delete`,`ID`是凭据标识。对于`add`/`modify`会携带完整的`Auth`克隆,`delete`可以省略`Auth`。 +- `WatcherWrapper.SetAuthUpdateQueue(chan<- watcher.AuthUpdate)`用于将服务侧创建的队列注入watcher,必须在watcher启动前完成。 +- 服务通过`ensureAuthUpdateQueue`创建容量为256的缓冲通道,并在`consumeAuthUpdates`中使用专职goroutine消费;消费侧会主动“抽干”积压事件,降低切换开销。 + +## Watcher行为 + +- `internal/watcher/watcher.go`维护`currentAuths`快照,文件或配置事件触发后会重建快照并与旧快照对比,生成最小化的`AuthUpdate`列表。 +- 以凭据ID为维度对更新进行合并,同一凭据在短时间内的多次变更只会保留最新状态(例如先写后删只会下发`delete`)。 +- watcher内部运行异步分发循环:生产者只向内存缓冲追加事件并唤醒分发协程,即使通道暂时写满也不会阻塞文件事件线程。watcher停止时会取消分发循环,确保协程正常退出。 + +## 高频变更处理 + +- 分发循环与服务消费协程相互独立,因此即便短时间内出现大量变更也不会阻塞watcher事件处理。 +- 背压通过两级缓冲吸收: + - 分发缓冲(map + 顺序切片)会合并同一凭据的重复事件,直到消费者完成处理。 + - 服务端通道的256容量加上消费侧的“抽干”逻辑,可平稳处理多个突发批次。 +- 当通道长时间处于高压状态时,缓冲仍持续合并事件,从而在消费者恢复后一次性应用最新状态,避免重复处理无意义的中间状态。 + +## 接入步骤 + +1. 实例化SDK Service(构建器或手工创建)。 +2. 在启动watcher之前调用`ensureAuthUpdateQueue`创建共享通道。 +3. watcher通过工厂函数创建后立刻调用`SetAuthUpdateQueue`注入通道,然后再启动watcher。 +4. Reload回调专注于配置更新;认证增量会通过队列送达,并由`handleAuthUpdate`自动应用。 + +遵循上述流程即可在避免全量重载的同时保持凭据变更的实时性。 diff --git a/internal/watcher/watcher.go b/internal/watcher/watcher.go index e2311171..905d1908 100644 --- a/internal/watcher/watcher.go +++ b/internal/watcher/watcher.go @@ -45,6 +45,11 @@ type Watcher struct { lastConfigHash string authQueue chan<- AuthUpdate currentAuths map[string]*coreauth.Auth + dispatchMu sync.Mutex + dispatchCond *sync.Cond + pendingUpdates map[string]AuthUpdate + pendingOrder []string + dispatchCancel context.CancelFunc } // AuthUpdateAction represents the type of change detected in auth sources. @@ -78,13 +83,15 @@ func NewWatcher(configPath, authDir string, reloadCallback func(*config.Config)) return nil, errNewWatcher } - return &Watcher{ + w := &Watcher{ configPath: configPath, authDir: authDir, reloadCallback: reloadCallback, watcher: watcher, lastAuthHashes: make(map[string]string), - }, nil + } + w.dispatchCond = sync.NewCond(&w.dispatchMu) + return w, nil } // Start begins watching the configuration file and authentication directory @@ -113,6 +120,7 @@ func (w *Watcher) Start(ctx context.Context) error { // Stop stops the file watcher func (w *Watcher) Stop() error { + w.stopDispatch() return w.watcher.Close() } @@ -128,6 +136,23 @@ func (w *Watcher) SetAuthUpdateQueue(queue chan<- AuthUpdate) { w.clientsMutex.Lock() defer w.clientsMutex.Unlock() w.authQueue = queue + if w.dispatchCond == nil { + w.dispatchCond = sync.NewCond(&w.dispatchMu) + } + if w.dispatchCancel != nil { + w.dispatchCancel() + if w.dispatchCond != nil { + w.dispatchMu.Lock() + w.dispatchCond.Broadcast() + w.dispatchMu.Unlock() + } + w.dispatchCancel = nil + } + if queue != nil { + ctx, cancel := context.WithCancel(context.Background()) + w.dispatchCancel = cancel + go w.dispatchLoop(ctx) + } } func (w *Watcher) refreshAuthState() { @@ -179,12 +204,104 @@ func (w *Watcher) prepareAuthUpdatesLocked(auths []*coreauth.Auth) []AuthUpdate } func (w *Watcher) dispatchAuthUpdates(updates []AuthUpdate) { - if len(updates) == 0 || w.authQueue == nil { + if len(updates) == 0 { return } - for _, update := range updates { - w.authQueue <- update + queue := w.getAuthQueue() + if queue == nil { + return } + baseTS := time.Now().UnixNano() + w.dispatchMu.Lock() + if w.pendingUpdates == nil { + w.pendingUpdates = make(map[string]AuthUpdate) + } + for idx, update := range updates { + key := w.authUpdateKey(update, baseTS+int64(idx)) + if _, exists := w.pendingUpdates[key]; !exists { + w.pendingOrder = append(w.pendingOrder, key) + } + w.pendingUpdates[key] = update + } + if w.dispatchCond != nil { + w.dispatchCond.Signal() + } + w.dispatchMu.Unlock() +} + +func (w *Watcher) authUpdateKey(update AuthUpdate, ts int64) string { + if update.ID != "" { + return update.ID + } + return fmt.Sprintf("%s:%d", update.Action, ts) +} + +func (w *Watcher) dispatchLoop(ctx context.Context) { + for { + batch, ok := w.nextPendingBatch(ctx) + if !ok { + return + } + queue := w.getAuthQueue() + if queue == nil { + if ctx.Err() != nil { + return + } + time.Sleep(10 * time.Millisecond) + continue + } + for _, update := range batch { + select { + case queue <- update: + case <-ctx.Done(): + return + } + } + } +} + +func (w *Watcher) nextPendingBatch(ctx context.Context) ([]AuthUpdate, bool) { + w.dispatchMu.Lock() + defer w.dispatchMu.Unlock() + for len(w.pendingOrder) == 0 { + if ctx.Err() != nil { + return nil, false + } + w.dispatchCond.Wait() + if ctx.Err() != nil { + return nil, false + } + } + batch := make([]AuthUpdate, 0, len(w.pendingOrder)) + for _, key := range w.pendingOrder { + batch = append(batch, w.pendingUpdates[key]) + delete(w.pendingUpdates, key) + } + w.pendingOrder = w.pendingOrder[:0] + return batch, true +} + +func (w *Watcher) getAuthQueue() chan<- AuthUpdate { + w.clientsMutex.RLock() + defer w.clientsMutex.RUnlock() + return w.authQueue +} + +func (w *Watcher) stopDispatch() { + if w.dispatchCancel != nil { + w.dispatchCancel() + w.dispatchCancel = nil + } + w.dispatchMu.Lock() + w.pendingOrder = nil + w.pendingUpdates = nil + if w.dispatchCond != nil { + w.dispatchCond.Broadcast() + } + w.dispatchMu.Unlock() + w.clientsMutex.Lock() + w.authQueue = nil + w.clientsMutex.Unlock() } func authEqual(a, b *coreauth.Auth) bool { diff --git a/sdk/cliproxy/service.go b/sdk/cliproxy/service.go index ee7fac99..908a7a3d 100644 --- a/sdk/cliproxy/service.go +++ b/sdk/cliproxy/service.go @@ -78,7 +78,7 @@ func (s *Service) ensureAuthUpdateQueue(ctx context.Context) { return } if s.authUpdates == nil { - s.authUpdates = make(chan watcher.AuthUpdate, 64) + s.authUpdates = make(chan watcher.AuthUpdate, 256) } if s.authQueueStop != nil { return @@ -98,6 +98,14 @@ func (s *Service) consumeAuthUpdates(ctx context.Context) { return } s.handleAuthUpdate(ctx, update) + for { + select { + case nextUpdate := <-s.authUpdates: + s.handleAuthUpdate(ctx, nextUpdate) + default: + break + } + } } } }