Files
drip/internal/shared/protocol/writer.go

127 lines
2.0 KiB
Go

package protocol
import (
"errors"
"io"
"sync"
"time"
)
type FrameWriter struct {
conn io.Writer
queue chan *Frame
batch []*Frame
mu sync.Mutex
done chan struct{}
closed bool
maxBatch int
maxBatchWait time.Duration
}
func NewFrameWriter(conn io.Writer) *FrameWriter {
return NewFrameWriterWithConfig(conn, 128, 2*time.Millisecond, 1024)
}
func NewFrameWriterWithConfig(conn io.Writer, maxBatch int, maxBatchWait time.Duration, queueSize int) *FrameWriter {
w := &FrameWriter{
conn: conn,
queue: make(chan *Frame, queueSize),
batch: make([]*Frame, 0, maxBatch),
maxBatch: maxBatch,
maxBatchWait: maxBatchWait,
done: make(chan struct{}),
}
go w.writeLoop()
return w
}
func (w *FrameWriter) writeLoop() {
ticker := time.NewTicker(w.maxBatchWait)
defer ticker.Stop()
for {
select {
case frame, ok := <-w.queue:
if !ok {
w.mu.Lock()
w.flushBatchLocked()
w.mu.Unlock()
return
}
w.mu.Lock()
w.batch = append(w.batch, frame)
if len(w.batch) >= w.maxBatch {
w.flushBatchLocked()
}
w.mu.Unlock()
case <-ticker.C:
w.mu.Lock()
if len(w.batch) > 0 {
w.flushBatchLocked()
}
w.mu.Unlock()
case <-w.done:
w.mu.Lock()
w.flushBatchLocked()
w.mu.Unlock()
return
}
}
}
func (w *FrameWriter) flushBatchLocked() {
if len(w.batch) == 0 {
return
}
for _, frame := range w.batch {
_ = WriteFrame(w.conn, frame)
}
w.batch = w.batch[:0]
}
func (w *FrameWriter) WriteFrame(frame *Frame) error {
w.mu.Lock()
if w.closed {
w.mu.Unlock()
return errors.New("writer closed")
}
w.mu.Unlock()
select {
case w.queue <- frame:
return nil
case <-w.done:
return errors.New("writer closed")
default:
return WriteFrame(w.conn, frame)
}
}
func (w *FrameWriter) Close() error {
w.mu.Lock()
if w.closed {
w.mu.Unlock()
return nil
}
w.closed = true
w.mu.Unlock()
close(w.queue)
close(w.done)
return nil
}
func (w *FrameWriter) Flush() {
w.mu.Lock()
defer w.mu.Unlock()
w.flushBatchLocked()
}