diff --git a/sdk/cliproxy/usage/manager.go b/sdk/cliproxy/usage/manager.go index f97c3598..48f0c003 100644 --- a/sdk/cliproxy/usage/manager.go +++ b/sdk/cliproxy/usage/manager.go @@ -42,7 +42,11 @@ type Manager struct { once sync.Once stopOnce sync.Once cancel context.CancelFunc - queue chan queueItem + + mu sync.Mutex + cond *sync.Cond + queue []queueItem + closed bool pluginsMu sync.RWMutex plugins []Plugin @@ -50,10 +54,9 @@ type Manager struct { // NewManager constructs a manager with a buffered queue. func NewManager(buffer int) *Manager { - if buffer <= 0 { - buffer = 256 - } - return &Manager{queue: make(chan queueItem, buffer)} + m := &Manager{} + m.cond = sync.NewCond(&m.mu) + return m } // Start launches the background dispatcher. Calling Start multiple times is safe. @@ -80,7 +83,10 @@ func (m *Manager) Stop() { if m.cancel != nil { m.cancel() } - close(m.queue) + m.mu.Lock() + m.closed = true + m.mu.Unlock() + m.cond.Broadcast() }) } @@ -102,40 +108,30 @@ func (m *Manager) Publish(ctx context.Context, record Record) { } // ensure worker is running even if Start was not called explicitly m.Start(context.Background()) - select { - case m.queue <- queueItem{ctx: ctx, record: record}: - default: - // queue is full; drop the record to avoid blocking runtime paths - log.Debugf("usage: queue full, dropping record for provider %s", record.Provider) + m.mu.Lock() + if m.closed { + m.mu.Unlock() + return } + m.queue = append(m.queue, queueItem{ctx: ctx, record: record}) + m.mu.Unlock() + m.cond.Signal() } func (m *Manager) run(ctx context.Context) { for { - select { - case <-ctx.Done(): - m.drain() - return - case item, ok := <-m.queue: - if !ok { - return - } - m.dispatch(item) + m.mu.Lock() + for !m.closed && len(m.queue) == 0 { + m.cond.Wait() } - } -} - -func (m *Manager) drain() { - for { - select { - case item, ok := <-m.queue: - if !ok { - return - } - m.dispatch(item) - default: + if len(m.queue) == 0 && m.closed { + m.mu.Unlock() return } + item := m.queue[0] + m.queue = m.queue[1:] + m.mu.Unlock() + m.dispatch(item) } }