From 295f34d7f0cd466ee17715026cba641253de1de8 Mon Sep 17 00:00:00 2001 From: sususu98 Date: Thu, 29 Jan 2026 22:22:09 +0800 Subject: [PATCH] fix(logging): capture streaming TTFB on first chunk and make timestamps required - Add firstChunkTimestamp field to ResponseWriterWrapper for sync capture - Capture TTFB in Write() and WriteString() before async channel send - Add SetFirstChunkTimestamp() to StreamingLogWriter interface - Make requestTimestamp/apiResponseTimestamp required in LogRequest() - Remove timestamp capture from WriteAPIResponse() (now via setter) - Fix Gemini handler to set API_RESPONSE_TIMESTAMP before writing response This ensures accurate TTFB measurement for all streaming API formats (OpenAI, Gemini, Claude) by capturing timestamp synchronously when the first response chunk arrives, not when the stream finalizes. --- internal/api/middleware/response_writer.go | 33 +++++++++++++------ internal/logging/request_logger.go | 25 +++++++++++--- .../handlers/gemini/gemini-cli_handlers.go | 2 +- 3 files changed, 45 insertions(+), 15 deletions(-) diff --git a/internal/api/middleware/response_writer.go b/internal/api/middleware/response_writer.go index 8272c868..50fa1c69 100644 --- a/internal/api/middleware/response_writer.go +++ b/internal/api/middleware/response_writer.go @@ -28,16 +28,17 @@ type RequestInfo struct { // It is designed to handle both standard and streaming responses, ensuring that logging operations do not block the client response. type ResponseWriterWrapper struct { gin.ResponseWriter - body *bytes.Buffer // body is a buffer to store the response body for non-streaming responses. - isStreaming bool // isStreaming indicates whether the response is a streaming type (e.g., text/event-stream). - streamWriter logging.StreamingLogWriter // streamWriter is a writer for handling streaming log entries. - chunkChannel chan []byte // chunkChannel is a channel for asynchronously passing response chunks to the logger. - streamDone chan struct{} // streamDone signals when the streaming goroutine completes. - logger logging.RequestLogger // logger is the instance of the request logger service. - requestInfo *RequestInfo // requestInfo holds the details of the original request. - statusCode int // statusCode stores the HTTP status code of the response. - headers map[string][]string // headers stores the response headers. - logOnErrorOnly bool // logOnErrorOnly enables logging only when an error response is detected. + body *bytes.Buffer // body is a buffer to store the response body for non-streaming responses. + isStreaming bool // isStreaming indicates whether the response is a streaming type (e.g., text/event-stream). + streamWriter logging.StreamingLogWriter // streamWriter is a writer for handling streaming log entries. + chunkChannel chan []byte // chunkChannel is a channel for asynchronously passing response chunks to the logger. + streamDone chan struct{} // streamDone signals when the streaming goroutine completes. + logger logging.RequestLogger // logger is the instance of the request logger service. + requestInfo *RequestInfo // requestInfo holds the details of the original request. + statusCode int // statusCode stores the HTTP status code of the response. + headers map[string][]string // headers stores the response headers. + logOnErrorOnly bool // logOnErrorOnly enables logging only when an error response is detected. + firstChunkTimestamp time.Time // firstChunkTimestamp captures TTFB for streaming responses. } // NewResponseWriterWrapper creates and initializes a new ResponseWriterWrapper. @@ -75,6 +76,10 @@ func (w *ResponseWriterWrapper) Write(data []byte) (int, error) { // THEN: Handle logging based on response type if w.isStreaming && w.chunkChannel != nil { + // Capture TTFB on first chunk (synchronous, before async channel send) + if w.firstChunkTimestamp.IsZero() { + w.firstChunkTimestamp = time.Now() + } // For streaming responses: Send to async logging channel (non-blocking) select { case w.chunkChannel <- append([]byte(nil), data...): // Non-blocking send with copy @@ -119,6 +124,10 @@ func (w *ResponseWriterWrapper) WriteString(data string) (int, error) { // THEN: Capture for logging if w.isStreaming && w.chunkChannel != nil { + // Capture TTFB on first chunk (synchronous, before async channel send) + if w.firstChunkTimestamp.IsZero() { + w.firstChunkTimestamp = time.Now() + } select { case w.chunkChannel <- []byte(data): default: @@ -282,6 +291,8 @@ func (w *ResponseWriterWrapper) Finalize(c *gin.Context) error { w.streamDone = nil } + w.streamWriter.SetFirstChunkTimestamp(w.firstChunkTimestamp) + // Write API Request and Response to the streaming log before closing apiRequest := w.extractAPIRequest(c) if len(apiRequest) > 0 { @@ -393,5 +404,7 @@ func (w *ResponseWriterWrapper) logRequest(statusCode int, headers map[string][] apiResponseBody, apiResponseErrors, w.requestInfo.RequestID, + w.requestInfo.Timestamp, + apiResponseTimestamp, ) } diff --git a/internal/logging/request_logger.go b/internal/logging/request_logger.go index 44df43d3..cf9b4d5c 100644 --- a/internal/logging/request_logger.go +++ b/internal/logging/request_logger.go @@ -44,10 +44,12 @@ type RequestLogger interface { // - apiRequest: The API request data // - apiResponse: The API response data // - requestID: Optional request ID for log file naming + // - requestTimestamp: When the request was received + // - apiResponseTimestamp: When the API response was received // // Returns: // - error: An error if logging fails, nil otherwise - LogRequest(url, method string, requestHeaders map[string][]string, body []byte, statusCode int, responseHeaders map[string][]string, response, apiRequest, apiResponse []byte, apiResponseErrors []*interfaces.ErrorMessage, requestID string) error + LogRequest(url, method string, requestHeaders map[string][]string, body []byte, statusCode int, responseHeaders map[string][]string, response, apiRequest, apiResponse []byte, apiResponseErrors []*interfaces.ErrorMessage, requestID string, requestTimestamp, apiResponseTimestamp time.Time) error // LogStreamingRequest initiates logging for a streaming request and returns a writer for chunks. // @@ -109,6 +111,12 @@ type StreamingLogWriter interface { // - error: An error if writing fails, nil otherwise WriteAPIResponse(apiResponse []byte) error + // SetFirstChunkTimestamp sets the TTFB timestamp captured when first chunk was received. + // + // Parameters: + // - timestamp: The time when first response chunk was received + SetFirstChunkTimestamp(timestamp time.Time) + // Close finalizes the log file and cleans up resources. // // Returns: @@ -180,11 +188,13 @@ func (l *FileRequestLogger) SetEnabled(enabled bool) { // - apiRequest: The API request data // - apiResponse: The API response data // - requestID: Optional request ID for log file naming +// - requestTimestamp: When the request was received +// - apiResponseTimestamp: When the API response was received // // Returns: // - error: An error if logging fails, nil otherwise -func (l *FileRequestLogger) LogRequest(url, method string, requestHeaders map[string][]string, body []byte, statusCode int, responseHeaders map[string][]string, response, apiRequest, apiResponse []byte, apiResponseErrors []*interfaces.ErrorMessage, requestID string) error { - return l.logRequest(url, method, requestHeaders, body, statusCode, responseHeaders, response, apiRequest, apiResponse, apiResponseErrors, false, requestID, time.Time{}, time.Time{}) +func (l *FileRequestLogger) LogRequest(url, method string, requestHeaders map[string][]string, body []byte, statusCode int, responseHeaders map[string][]string, response, apiRequest, apiResponse []byte, apiResponseErrors []*interfaces.ErrorMessage, requestID string, requestTimestamp, apiResponseTimestamp time.Time) error { + return l.logRequest(url, method, requestHeaders, body, statusCode, responseHeaders, response, apiRequest, apiResponse, apiResponseErrors, false, requestID, requestTimestamp, apiResponseTimestamp) } // LogRequestWithOptions logs a request with optional forced logging behavior. @@ -1065,10 +1075,15 @@ func (w *FileStreamingLogWriter) WriteAPIResponse(apiResponse []byte) error { return nil } w.apiResponse = bytes.Clone(apiResponse) - w.apiResponseTimestamp = time.Now() return nil } +func (w *FileStreamingLogWriter) SetFirstChunkTimestamp(timestamp time.Time) { + if !timestamp.IsZero() { + w.apiResponseTimestamp = timestamp + } +} + // Close finalizes the log file and cleans up resources. // It writes all buffered data to the file in the correct order: // API REQUEST -> API RESPONSE -> RESPONSE (status, headers, body chunks) @@ -1236,6 +1251,8 @@ func (w *NoOpStreamingLogWriter) WriteAPIResponse(_ []byte) error { return nil } +func (w *NoOpStreamingLogWriter) SetFirstChunkTimestamp(_ time.Time) {} + // Close is a no-op implementation that does nothing and always returns nil. // // Returns: diff --git a/sdk/api/handlers/gemini/gemini-cli_handlers.go b/sdk/api/handlers/gemini/gemini-cli_handlers.go index 8c85b39c..917902e7 100644 --- a/sdk/api/handlers/gemini/gemini-cli_handlers.go +++ b/sdk/api/handlers/gemini/gemini-cli_handlers.go @@ -124,8 +124,8 @@ func (h *GeminiCLIAPIHandler) CLIHandler(c *gin.Context) { log.Errorf("Failed to read response body: %v", err) return } - _, _ = c.Writer.Write(output) c.Set("API_RESPONSE_TIMESTAMP", time.Now()) + _, _ = c.Writer.Write(output) c.Set("API_RESPONSE", output) } }