mirror of
https://github.com/Gouryella/drip.git
synced 2026-02-23 21:00:44 +00:00
feat(protocol): Added flow control frame type and support for automatic release mechanism of security frames.
A new FlowControl frame type has been added for backpressure control between the client and server, and SafeFrame has been introduced. Frame resource management has been encapsulated to ensure that frame data is correctly returned to the memory pool after use. The frame processing logic has also been optimized. The way frame.Release() is called improves code security and maintainability. feat(client): Implements queue pressure monitoring and flow control signal sending functions. Add the `monitorQueuePressure` method to periodically check the length of the `dataFrameQueue`, and trigger an alarm when the queue usage exceeds a threshold. Automatically send flow control commands to the server to pause or resume writing, preventing data backlog and connection interruptions caused by slow consumption speed. feat(server): Supports receiving and responding to flow control requests from clients. The `handleFlowControl` and `sendWithBackpressure` methods have been added to enable backpressure control of received data frames on the server side. By blocking the sending process using a condition variable until the client releases the pause state, connection stability is ensured under high load. refactor(client): Reduces redundant resource release operations during frame processing. Use SafeFrame to manage frame lifecycles uniformly, replacing manual frame.Release() with defer sf.Close() in multiple frame handlers. This avoids the risk of memory leaks caused by unreleased abnormal paths. perf(client): Shorten the shutdown timeout to speed up resource reclamation. The forced shutdown wait time in the tunnel runner and connector has been adjusted from 5 seconds and 3 seconds to 2 seconds to improve the program exit response speed.
This commit is contained in:
@@ -145,7 +145,7 @@ func runTunnelWithUI(connConfig *tcp.ConnectorConfig, daemonInfo *DaemonInfo) er
|
||||
select {
|
||||
case <-done:
|
||||
// Closed successfully
|
||||
case <-time.After(5 * time.Second):
|
||||
case <-time.After(2 * time.Second):
|
||||
fmt.Println(ui.Warning("Force closing (timeout)..."))
|
||||
}
|
||||
|
||||
|
||||
@@ -159,6 +159,7 @@ func (c *Connector) Connect() error {
|
||||
}
|
||||
|
||||
go c.frameHandler.WarmupConnectionPool(3)
|
||||
go c.monitorQueuePressure()
|
||||
go c.handleFrames()
|
||||
|
||||
return nil
|
||||
@@ -235,18 +236,15 @@ func (c *Connector) dataFrameWorker(workerID int) {
|
||||
}
|
||||
|
||||
func() {
|
||||
defer c.recoverer.RecoverWithCallback("handleDataFrame", func(p interface{}) {
|
||||
if frame != nil {
|
||||
frame.Release()
|
||||
}
|
||||
})
|
||||
sf := protocol.WithFrame(frame)
|
||||
defer sf.Close()
|
||||
defer c.recoverer.Recover("handleDataFrame")
|
||||
|
||||
if err := c.frameHandler.HandleDataFrame(frame); err != nil {
|
||||
if err := c.frameHandler.HandleDataFrame(sf.Frame); err != nil {
|
||||
c.logger.Error("Failed to handle data frame",
|
||||
zap.Int("worker_id", workerID),
|
||||
zap.Error(err))
|
||||
}
|
||||
frame.Release()
|
||||
}()
|
||||
|
||||
case <-c.stopCh:
|
||||
@@ -282,7 +280,9 @@ func (c *Connector) handleFrames() {
|
||||
return
|
||||
}
|
||||
}
|
||||
switch frame.Type {
|
||||
sf := protocol.WithFrame(frame)
|
||||
|
||||
switch sf.Frame.Type {
|
||||
case protocol.FrameTypeHeartbeatAck:
|
||||
c.heartbeatMu.Lock()
|
||||
if !c.heartbeatSentAt.IsZero() {
|
||||
@@ -299,39 +299,39 @@ func (c *Connector) handleFrames() {
|
||||
c.heartbeatMu.Unlock()
|
||||
c.logger.Debug("Received heartbeat ack")
|
||||
}
|
||||
frame.Release()
|
||||
sf.Close()
|
||||
|
||||
case protocol.FrameTypeData:
|
||||
select {
|
||||
case c.dataFrameQueue <- frame:
|
||||
case c.dataFrameQueue <- sf.Frame:
|
||||
case <-c.stopCh:
|
||||
frame.Release()
|
||||
sf.Close()
|
||||
return
|
||||
default:
|
||||
c.logger.Warn("Data frame queue full, dropping frame")
|
||||
frame.Release()
|
||||
sf.Close()
|
||||
}
|
||||
|
||||
case protocol.FrameTypeClose:
|
||||
frame.Release()
|
||||
sf.Close()
|
||||
c.logger.Info("Server requested close")
|
||||
return
|
||||
|
||||
case protocol.FrameTypeError:
|
||||
var errMsg protocol.ErrorMessage
|
||||
if err := json.Unmarshal(frame.Payload, &errMsg); err == nil {
|
||||
if err := json.Unmarshal(sf.Frame.Payload, &errMsg); err == nil {
|
||||
c.logger.Error("Received error from server",
|
||||
zap.String("code", errMsg.Code),
|
||||
zap.String("message", errMsg.Message),
|
||||
)
|
||||
}
|
||||
frame.Release()
|
||||
sf.Close()
|
||||
return
|
||||
|
||||
default:
|
||||
frame.Release()
|
||||
sf.Close()
|
||||
c.logger.Warn("Unexpected frame type",
|
||||
zap.String("type", frame.Type.String()),
|
||||
zap.String("type", sf.Frame.Type.String()),
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -378,7 +378,7 @@ func (c *Connector) Close() error {
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(3 * time.Second):
|
||||
case <-time.After(2 * time.Second):
|
||||
c.logger.Warn("Force closing: some handlers are still active")
|
||||
}
|
||||
|
||||
@@ -440,3 +440,54 @@ func (c *Connector) IsClosed() bool {
|
||||
defer c.closedMu.RUnlock()
|
||||
return c.closed
|
||||
}
|
||||
func (c *Connector) monitorQueuePressure() {
|
||||
defer c.recoverer.Recover("monitorQueuePressure")
|
||||
|
||||
const (
|
||||
pauseThreshold = 0.80
|
||||
resumeThreshold = 0.50
|
||||
checkInterval = 100 * time.Millisecond
|
||||
)
|
||||
|
||||
ticker := time.NewTicker(checkInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
isPaused := false
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
queueLen := len(c.dataFrameQueue)
|
||||
queueCap := cap(c.dataFrameQueue)
|
||||
usage := float64(queueLen) / float64(queueCap)
|
||||
|
||||
if usage > pauseThreshold && !isPaused {
|
||||
c.sendFlowControl("*", protocol.FlowControlPause)
|
||||
isPaused = true
|
||||
c.logger.Warn("Queue pressure high, sent pause signal",
|
||||
zap.Int("queue_len", queueLen),
|
||||
zap.Int("queue_cap", queueCap),
|
||||
zap.Float64("usage", usage))
|
||||
} else if usage < resumeThreshold && isPaused {
|
||||
c.sendFlowControl("*", protocol.FlowControlResume)
|
||||
isPaused = false
|
||||
c.logger.Info("Queue pressure normal, sent resume signal",
|
||||
zap.Int("queue_len", queueLen),
|
||||
zap.Int("queue_cap", queueCap),
|
||||
zap.Float64("usage", usage))
|
||||
}
|
||||
|
||||
case <-c.stopCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Connector) sendFlowControl(streamID string, action protocol.FlowControlAction) {
|
||||
frame := protocol.NewFlowControlFrame(streamID, action)
|
||||
if err := c.SendFrame(frame); err != nil {
|
||||
c.logger.Error("Failed to send flow control",
|
||||
zap.String("action", string(action)),
|
||||
zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -44,6 +44,10 @@ type Connection struct {
|
||||
tunnelType protocol.TunnelType // Track tunnel type
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
|
||||
// Flow control
|
||||
paused bool
|
||||
pauseCond *sync.Cond
|
||||
}
|
||||
|
||||
// HTTPResponseHandler interface for response channel operations
|
||||
@@ -60,6 +64,7 @@ type HTTPResponseHandler interface {
|
||||
// NewConnection creates a new connection handler
|
||||
func NewConnection(conn net.Conn, authToken string, manager *tunnel.Manager, logger *zap.Logger, portAlloc *PortAllocator, domain string, publicPort int, httpHandler http.Handler, responseChans HTTPResponseHandler) *Connection {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
var mu sync.RWMutex
|
||||
return &Connection{
|
||||
conn: conn,
|
||||
authToken: authToken,
|
||||
@@ -74,6 +79,7 @@ func NewConnection(conn net.Conn, authToken string, manager *tunnel.Manager, log
|
||||
lastHeartbeat: time.Now(),
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
pauseCond: sync.NewCond(&mu),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -118,14 +124,15 @@ func (c *Connection) Handle() error {
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to read registration frame: %w", err)
|
||||
}
|
||||
defer frame.Release() // Return pool buffer when done
|
||||
sf := protocol.WithFrame(frame)
|
||||
defer sf.Close()
|
||||
|
||||
if frame.Type != protocol.FrameTypeRegister {
|
||||
return fmt.Errorf("expected register frame, got %s", frame.Type)
|
||||
if sf.Frame.Type != protocol.FrameTypeRegister {
|
||||
return fmt.Errorf("expected register frame, got %s", sf.Frame.Type)
|
||||
}
|
||||
|
||||
var req protocol.RegisterRequest
|
||||
if err := json.Unmarshal(frame.Payload, &req); err != nil {
|
||||
if err := json.Unmarshal(sf.Frame.Payload, &req); err != nil {
|
||||
return fmt.Errorf("failed to parse registration request: %w", err)
|
||||
}
|
||||
|
||||
@@ -390,25 +397,31 @@ func (c *Connection) handleFrames(reader *bufio.Reader) error {
|
||||
}
|
||||
|
||||
// Handle frame based on type
|
||||
switch frame.Type {
|
||||
sf := protocol.WithFrame(frame)
|
||||
|
||||
switch sf.Frame.Type {
|
||||
case protocol.FrameTypeHeartbeat:
|
||||
c.handleHeartbeat()
|
||||
frame.Release()
|
||||
sf.Close()
|
||||
|
||||
case protocol.FrameTypeData:
|
||||
// Data frame from client (response to forwarded request)
|
||||
c.handleDataFrame(frame)
|
||||
frame.Release() // Release after processing
|
||||
c.handleDataFrame(sf.Frame)
|
||||
sf.Close()
|
||||
|
||||
case protocol.FrameTypeFlowControl:
|
||||
c.handleFlowControl(sf.Frame)
|
||||
sf.Close()
|
||||
|
||||
case protocol.FrameTypeClose:
|
||||
frame.Release()
|
||||
sf.Close()
|
||||
c.logger.Info("Client requested close")
|
||||
return nil
|
||||
|
||||
default:
|
||||
frame.Release()
|
||||
sf.Close()
|
||||
c.logger.Warn("Unexpected frame type",
|
||||
zap.String("type", frame.Type.String()),
|
||||
zap.String("type", sf.Frame.Type.String()),
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -574,6 +587,9 @@ func (c *Connection) SendFrame(frame *protocol.Frame) error {
|
||||
if c.frameWriter == nil {
|
||||
return protocol.WriteFrame(c.conn, frame)
|
||||
}
|
||||
if frame.Type == protocol.FrameTypeData {
|
||||
return c.sendWithBackpressure(frame)
|
||||
}
|
||||
return c.frameWriter.WriteFrame(frame)
|
||||
}
|
||||
|
||||
@@ -681,3 +697,40 @@ func (w *httpResponseWriter) Write(data []byte) (int, error) {
|
||||
}
|
||||
return w.writer.Write(data)
|
||||
}
|
||||
|
||||
func (c *Connection) handleFlowControl(frame *protocol.Frame) {
|
||||
msg, err := protocol.DecodeFlowControlMessage(frame.Payload)
|
||||
if err != nil {
|
||||
c.logger.Error("Failed to decode flow control", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
switch msg.Action {
|
||||
case protocol.FlowControlPause:
|
||||
c.paused = true
|
||||
c.logger.Warn("Client requested pause",
|
||||
zap.String("stream", msg.StreamID))
|
||||
|
||||
case protocol.FlowControlResume:
|
||||
c.paused = false
|
||||
c.pauseCond.Broadcast()
|
||||
c.logger.Info("Client requested resume",
|
||||
zap.String("stream", msg.StreamID))
|
||||
|
||||
default:
|
||||
c.logger.Warn("Unknown flow control action",
|
||||
zap.String("action", string(msg.Action)))
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Connection) sendWithBackpressure(frame *protocol.Frame) error {
|
||||
c.mu.Lock()
|
||||
for c.paused {
|
||||
c.pauseCond.Wait()
|
||||
}
|
||||
c.mu.Unlock()
|
||||
return c.frameWriter.WriteFrame(frame)
|
||||
}
|
||||
|
||||
34
internal/shared/protocol/flow_control.go
Normal file
34
internal/shared/protocol/flow_control.go
Normal file
@@ -0,0 +1,34 @@
|
||||
package protocol
|
||||
|
||||
import (
|
||||
json "github.com/goccy/go-json"
|
||||
)
|
||||
|
||||
type FlowControlAction string
|
||||
|
||||
const (
|
||||
FlowControlPause FlowControlAction = "pause"
|
||||
FlowControlResume FlowControlAction = "resume"
|
||||
)
|
||||
|
||||
type FlowControlMessage struct {
|
||||
StreamID string `json:"stream_id"`
|
||||
Action FlowControlAction `json:"action"`
|
||||
}
|
||||
|
||||
func NewFlowControlFrame(streamID string, action FlowControlAction) *Frame {
|
||||
msg := FlowControlMessage{
|
||||
StreamID: streamID,
|
||||
Action: action,
|
||||
}
|
||||
payload, _ := json.Marshal(&msg)
|
||||
return NewFrame(FrameTypeFlowControl, payload)
|
||||
}
|
||||
|
||||
func DecodeFlowControlMessage(payload []byte) (*FlowControlMessage, error) {
|
||||
var msg FlowControlMessage
|
||||
if err := json.Unmarshal(payload, &msg); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &msg, nil
|
||||
}
|
||||
@@ -25,6 +25,7 @@ const (
|
||||
FrameTypeData FrameType = 0x05
|
||||
FrameTypeClose FrameType = 0x06
|
||||
FrameTypeError FrameType = 0x07
|
||||
FrameTypeFlowControl FrameType = 0x08
|
||||
)
|
||||
|
||||
// String returns the string representation of frame type
|
||||
@@ -44,6 +45,8 @@ func (t FrameType) String() string {
|
||||
return "Close"
|
||||
case FrameTypeError:
|
||||
return "Error"
|
||||
case FrameTypeFlowControl:
|
||||
return "FlowControl"
|
||||
default:
|
||||
return fmt.Sprintf("Unknown(%d)", t)
|
||||
}
|
||||
|
||||
40
internal/shared/protocol/safe_frame.go
Normal file
40
internal/shared/protocol/safe_frame.go
Normal file
@@ -0,0 +1,40 @@
|
||||
package protocol
|
||||
|
||||
import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
// SafeFrame wraps Frame with automatic resource cleanup
|
||||
type SafeFrame struct {
|
||||
*Frame
|
||||
once sync.Once
|
||||
}
|
||||
|
||||
// NewSafeFrame creates a SafeFrame that implements io.Closer
|
||||
func NewSafeFrame(frameType FrameType, payload []byte) *SafeFrame {
|
||||
return &SafeFrame{
|
||||
Frame: NewFrame(frameType, payload),
|
||||
}
|
||||
}
|
||||
|
||||
// Close implements io.Closer, ensures Release is called exactly once
|
||||
func (sf *SafeFrame) Close() error {
|
||||
sf.once.Do(func() {
|
||||
if sf.Frame != nil {
|
||||
sf.Frame.Release()
|
||||
}
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
// WithFrame wraps an existing Frame with automatic cleanup
|
||||
func WithFrame(frame *Frame) *SafeFrame {
|
||||
return &SafeFrame{Frame: frame}
|
||||
}
|
||||
|
||||
// MustClose is a helper that calls Close and panics on error (for defer cleanup)
|
||||
func (sf *SafeFrame) MustClose() {
|
||||
if err := sf.Close(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user