From 1e477879c5a54443d226b4149822e17215538f62 Mon Sep 17 00:00:00 2001 From: Gouryella Date: Fri, 5 Dec 2025 22:20:04 +0800 Subject: [PATCH] feat(proxy): Adds last activity time tracking for streaming responses Adds a `lastActivityAt` field to the `streamingResponseEntry` structure to record the last active time of the streaming response. Updates this timestamp when creating the streaming response, sending headers, and sending data blocks. Modifies the logic for cleaning up timeout channels, using the last activity time instead of the creation time to determine if a timeout has occurred, improving accuracy in long-connection scenarios. --- internal/server/proxy/response_handler.go | 28 ++++++++++++++--------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/internal/server/proxy/response_handler.go b/internal/server/proxy/response_handler.go index 192ea6d..0dddcab 100644 --- a/internal/server/proxy/response_handler.go +++ b/internal/server/proxy/response_handler.go @@ -20,12 +20,13 @@ type responseChanEntry struct { // streamingResponseEntry holds a streaming response writer type streamingResponseEntry struct { - w http.ResponseWriter - flusher http.Flusher - createdAt time.Time - headersSent bool - done chan struct{} - mu sync.Mutex + w http.ResponseWriter + flusher http.Flusher + createdAt time.Time + lastActivityAt time.Time + headersSent bool + done chan struct{} + mu sync.Mutex } // ResponseHandler manages response channels for HTTP requests over TCP/Frame protocol @@ -73,11 +74,13 @@ func (h *ResponseHandler) CreateStreamingResponse(requestID string, w http.Respo flusher, _ := w.(http.Flusher) done := make(chan struct{}) + now := time.Now() h.streamingChannels[requestID] = &streamingResponseEntry{ - w: w, - flusher: flusher, - createdAt: time.Now(), - done: done, + w: w, + flusher: flusher, + createdAt: now, + lastActivityAt: now, + done: done, } return done @@ -176,6 +179,7 @@ func (h *ResponseHandler) SendStreamingHead(requestID string, head *protocol.HTT entry.w.WriteHeader(statusCode) entry.headersSent = true + entry.lastActivityAt = time.Now() if entry.flusher != nil { entry.flusher.Flush() @@ -221,6 +225,8 @@ func (h *ResponseHandler) SendStreamingChunk(requestID string, chunk []byte, isL return nil } + entry.lastActivityAt = time.Now() + if entry.flusher != nil { entry.flusher.Flush() } @@ -321,7 +327,7 @@ func (h *ResponseHandler) cleanupExpiredChannels() { } for requestID, entry := range h.streamingChannels { - if now.Sub(entry.createdAt) > streamingTimeout { + if now.Sub(entry.lastActivityAt) > streamingTimeout { select { case <-entry.done: default: