diff --git a/internal/server/proxy/response_handler.go b/internal/server/proxy/response_handler.go index 192ea6d..0dddcab 100644 --- a/internal/server/proxy/response_handler.go +++ b/internal/server/proxy/response_handler.go @@ -20,12 +20,13 @@ type responseChanEntry struct { // streamingResponseEntry holds a streaming response writer type streamingResponseEntry struct { - w http.ResponseWriter - flusher http.Flusher - createdAt time.Time - headersSent bool - done chan struct{} - mu sync.Mutex + w http.ResponseWriter + flusher http.Flusher + createdAt time.Time + lastActivityAt time.Time + headersSent bool + done chan struct{} + mu sync.Mutex } // ResponseHandler manages response channels for HTTP requests over TCP/Frame protocol @@ -73,11 +74,13 @@ func (h *ResponseHandler) CreateStreamingResponse(requestID string, w http.Respo flusher, _ := w.(http.Flusher) done := make(chan struct{}) + now := time.Now() h.streamingChannels[requestID] = &streamingResponseEntry{ - w: w, - flusher: flusher, - createdAt: time.Now(), - done: done, + w: w, + flusher: flusher, + createdAt: now, + lastActivityAt: now, + done: done, } return done @@ -176,6 +179,7 @@ func (h *ResponseHandler) SendStreamingHead(requestID string, head *protocol.HTT entry.w.WriteHeader(statusCode) entry.headersSent = true + entry.lastActivityAt = time.Now() if entry.flusher != nil { entry.flusher.Flush() @@ -221,6 +225,8 @@ func (h *ResponseHandler) SendStreamingChunk(requestID string, chunk []byte, isL return nil } + entry.lastActivityAt = time.Now() + if entry.flusher != nil { entry.flusher.Flush() } @@ -321,7 +327,7 @@ func (h *ResponseHandler) cleanupExpiredChannels() { } for requestID, entry := range h.streamingChannels { - if now.Sub(entry.createdAt) > streamingTimeout { + if now.Sub(entry.lastActivityAt) > streamingTimeout { select { case <-entry.done: default: