Files
drip/internal/shared/protocol/writer.go
Gouryella 7283180e6a perf(client): Optimize client performance and introduce a data frame processing worker pool
- Add runtime performance optimization configurations to main.go, including setting GOMAXPROCS, adjusting GC frequency, and memory limits.

- Implement a worker pool-based data frame processing mechanism in connector.go to improve processing capabilities under high concurrency.

- Adjust frame writer configuration to improve batch write efficiency and enable adaptive refresh strategy.

- Add callback handling support for write errors to enhance connection stability.

refactor(server): Introduce an adaptive buffer pool to optimize memory usage

- Add adaptive_buffer_pool.go to implement large and small buffer reuse, reducing memory allocation overhead.

- Apply buffer pool management for large/medium temporary buffers in proxy handlers and TCP connections.

- Change the HTTP response writer to a cached bufio.Writer to improve I/O performance.

- Optimize HTTP request reading logic and response sending process.

build(docker): Update mount paths and remove unused named volumes

- Modify the data directory mount method in docker-compose.release.yml. ./data:/app/data

- Remove the unnecessary drip-data named volume definition

test(script): Add performance testing and profiling scripts

- Add profile-test.sh script for automating stress testing and performance data collection

- Supports collecting pprof data such as CPU, stack traces, and coroutines and generating analysis reports
2025-12-08 12:24:42 +08:00

267 lines
4.8 KiB
Go

package protocol
import (
"errors"
"io"
"sync"
"time"
)
type FrameWriter struct {
conn io.Writer
queue chan *Frame
batch []*Frame
mu sync.Mutex
done chan struct{}
closed bool
maxBatch int
maxBatchWait time.Duration
heartbeatInterval time.Duration
heartbeatCallback func() *Frame
heartbeatEnabled bool
heartbeatControl chan struct{}
// Error handling
writeErr error
errOnce sync.Once
onWriteError func(error) // Callback for write errors
// Adaptive flushing
adaptiveFlush bool // Enable adaptive flush based on queue depth
lowConcurrencyThreshold int // Queue depth threshold for immediate flush
}
func NewFrameWriter(conn io.Writer) *FrameWriter {
w := NewFrameWriterWithConfig(conn, 256, 2*time.Millisecond, 4096)
w.EnableAdaptiveFlush(16)
return w
}
func NewFrameWriterWithConfig(conn io.Writer, maxBatch int, maxBatchWait time.Duration, queueSize int) *FrameWriter {
w := &FrameWriter{
conn: conn,
queue: make(chan *Frame, queueSize),
batch: make([]*Frame, 0, maxBatch),
maxBatch: maxBatch,
maxBatchWait: maxBatchWait,
done: make(chan struct{}),
heartbeatControl: make(chan struct{}, 1),
}
go w.writeLoop()
return w
}
func (w *FrameWriter) writeLoop() {
batchTicker := time.NewTicker(w.maxBatchWait)
defer batchTicker.Stop()
var heartbeatTicker *time.Ticker
var heartbeatCh <-chan time.Time
w.mu.Lock()
if w.heartbeatEnabled && w.heartbeatInterval > 0 {
heartbeatTicker = time.NewTicker(w.heartbeatInterval)
heartbeatCh = heartbeatTicker.C
}
w.mu.Unlock()
defer func() {
if heartbeatTicker != nil {
heartbeatTicker.Stop()
}
}()
for {
select {
case frame, ok := <-w.queue:
if !ok {
w.mu.Lock()
w.flushBatchLocked()
w.mu.Unlock()
return
}
w.mu.Lock()
w.batch = append(w.batch, frame)
shouldFlushNow := len(w.batch) >= w.maxBatch ||
(w.adaptiveFlush && len(w.queue) <= w.lowConcurrencyThreshold)
if shouldFlushNow {
w.flushBatchLocked()
}
w.mu.Unlock()
case <-batchTicker.C:
w.mu.Lock()
if len(w.batch) > 0 {
w.flushBatchLocked()
}
w.mu.Unlock()
case <-heartbeatCh:
w.mu.Lock()
if w.heartbeatCallback != nil {
if frame := w.heartbeatCallback(); frame != nil {
w.batch = append(w.batch, frame)
w.flushBatchLocked()
}
}
w.mu.Unlock()
case <-w.heartbeatControl:
w.mu.Lock()
if heartbeatTicker != nil {
heartbeatTicker.Stop()
heartbeatTicker = nil
heartbeatCh = nil
}
if w.heartbeatEnabled && w.heartbeatInterval > 0 {
heartbeatTicker = time.NewTicker(w.heartbeatInterval)
heartbeatCh = heartbeatTicker.C
}
w.mu.Unlock()
case <-w.done:
w.mu.Lock()
w.flushBatchLocked()
w.mu.Unlock()
return
}
}
}
func (w *FrameWriter) flushBatchLocked() {
if len(w.batch) == 0 {
return
}
for _, frame := range w.batch {
if err := WriteFrame(w.conn, frame); err != nil {
w.errOnce.Do(func() {
w.writeErr = err
if w.onWriteError != nil {
go w.onWriteError(err)
}
w.closed = true
})
}
frame.Release()
}
w.batch = w.batch[:0]
}
func (w *FrameWriter) WriteFrame(frame *Frame) error {
w.mu.Lock()
if w.closed {
w.mu.Unlock()
if w.writeErr != nil {
return w.writeErr
}
return errors.New("writer closed")
}
w.mu.Unlock()
select {
case w.queue <- frame:
return nil
case <-w.done:
w.mu.Lock()
err := w.writeErr
w.mu.Unlock()
if err != nil {
return err
}
return errors.New("writer closed")
}
}
func (w *FrameWriter) Close() error {
w.mu.Lock()
if w.closed {
w.mu.Unlock()
return nil
}
w.closed = true
w.mu.Unlock()
close(w.queue)
for frame := range w.queue {
frame.Release()
}
close(w.done)
return nil
}
func (w *FrameWriter) Flush() {
w.mu.Lock()
if w.closed {
w.mu.Unlock()
return
}
for {
select {
case frame, ok := <-w.queue:
if !ok {
break
}
w.batch = append(w.batch, frame)
default:
goto done
}
}
done:
w.flushBatchLocked()
w.mu.Unlock()
}
func (w *FrameWriter) EnableHeartbeat(interval time.Duration, callback func() *Frame) {
w.mu.Lock()
w.heartbeatInterval = interval
w.heartbeatCallback = callback
w.heartbeatEnabled = true
w.mu.Unlock()
select {
case w.heartbeatControl <- struct{}{}:
default:
}
}
func (w *FrameWriter) DisableHeartbeat() {
w.mu.Lock()
w.heartbeatEnabled = false
w.mu.Unlock()
select {
case w.heartbeatControl <- struct{}{}:
default:
}
}
func (w *FrameWriter) SetWriteErrorHandler(handler func(error)) {
w.mu.Lock()
w.onWriteError = handler
w.mu.Unlock()
}
func (w *FrameWriter) EnableAdaptiveFlush(lowConcurrencyThreshold int) {
w.mu.Lock()
w.adaptiveFlush = true
w.lowConcurrencyThreshold = lowConcurrencyThreshold
w.mu.Unlock()
}
func (w *FrameWriter) DisableAdaptiveFlush() {
w.mu.Lock()
w.adaptiveFlush = false
w.mu.Unlock()
}