From 108895fc04cce5d9e55fb2f0cad60807b3bef4b9 Mon Sep 17 00:00:00 2001 From: davidwushi1145 Date: Thu, 2 Apr 2026 20:39:49 +0800 Subject: [PATCH] Harden Responses SSE framing against partial chunk boundaries Follow-up review found two real framing hazards in the handler-layer framer: it could flush a partial `data:` payload before the JSON was complete, and it could inject an extra newline before chunks that already began with `\n`/`\r\n`. This commit tightens the framer so it only emits undelimited events when the buffered `data:` payload is already valid JSON (or `[DONE]`), skips newline injection for chunks that already start with a line break, and avoids the heavier `bytes.Split` path while scanning SSE fields. The regression suite now covers split `data:` payload chunks, newline-prefixed chunks, and dropping incomplete trailing data on flush, so the original Responses fix remains intact while the review concerns are explicitly locked down. Constraint: Keep the follow-up limited to handler-layer framing and tests Rejected: Ignore the review and rely on current executor chunk shapes | leaves partial data payload corruption possible Rejected: Build a fully generic SSE parser | wider change than needed for the identified risks Confidence: high Scope-risk: narrow Reversibility: clean Directive: Do not emit undelimited Responses SSE events unless buffered `data:` content is already complete and valid Tested: /tmp/go1.26.1/go/bin/go test ./sdk/api/handlers/openai -count=1 Tested: /tmp/go1.26.1/go/bin/go test ./sdk/api/handlers -count=1 Tested: /tmp/go1.26.1/go/bin/go vet ./sdk/api/handlers/... Not-tested: Full repository test suite outside sdk/api/handlers packages --- .../openai/openai_responses_handlers.go | 55 +++++++++++++++++-- .../openai_responses_handlers_stream_test.go | 44 +++++++++++++++ 2 files changed, 94 insertions(+), 5 deletions(-) diff --git a/sdk/api/handlers/openai/openai_responses_handlers.go b/sdk/api/handlers/openai/openai_responses_handlers.go index cdb8bfdf..8969ce2f 100644 --- a/sdk/api/handlers/openai/openai_responses_handlers.go +++ b/sdk/api/handlers/openai/openai_responses_handlers.go @@ -9,6 +9,7 @@ package openai import ( "bytes" "context" + "encoding/json" "fmt" "io" "net/http" @@ -68,7 +69,7 @@ func (f *responsesSSEFramer) WriteChunk(w io.Writer, chunk []byte) { f.pending = f.pending[:0] return } - if len(f.pending) == 0 || responsesSSENeedsMoreData(f.pending) { + if len(f.pending) == 0 || !responsesSSECanEmitWithoutDelimiter(f.pending) { return } writeResponsesSSEChunk(w, f.pending) @@ -83,7 +84,7 @@ func (f *responsesSSEFramer) Flush(w io.Writer) { f.pending = f.pending[:0] return } - if responsesSSENeedsMoreData(f.pending) { + if !responsesSSECanEmitWithoutDelimiter(f.pending) { f.pending = f.pending[:0] return } @@ -121,7 +122,15 @@ func responsesSSENeedsMoreData(chunk []byte) bool { } func responsesSSEHasField(chunk []byte, prefix []byte) bool { - for _, line := range bytes.Split(chunk, []byte("\n")) { + s := chunk + for len(s) > 0 { + line := s + if i := bytes.IndexByte(s, '\n'); i >= 0 { + line = s[:i] + s = s[i+1:] + } else { + s = nil + } line = bytes.TrimSpace(line) if bytes.HasPrefix(line, prefix) { return true @@ -130,6 +139,39 @@ func responsesSSEHasField(chunk []byte, prefix []byte) bool { return false } +func responsesSSECanEmitWithoutDelimiter(chunk []byte) bool { + trimmed := bytes.TrimSpace(chunk) + if len(trimmed) == 0 || responsesSSENeedsMoreData(trimmed) || !responsesSSEHasField(trimmed, []byte("data:")) { + return false + } + return responsesSSEDataLinesValid(trimmed) +} + +func responsesSSEDataLinesValid(chunk []byte) bool { + s := chunk + for len(s) > 0 { + line := s + if i := bytes.IndexByte(s, '\n'); i >= 0 { + line = s[:i] + s = s[i+1:] + } else { + s = nil + } + line = bytes.TrimSpace(line) + if len(line) == 0 || !bytes.HasPrefix(line, []byte("data:")) { + continue + } + data := bytes.TrimSpace(line[len("data:"):]) + if len(data) == 0 || bytes.Equal(data, []byte("[DONE]")) { + continue + } + if !json.Valid(data) { + return false + } + } + return true +} + func responsesSSENeedsLineBreak(pending, chunk []byte) bool { if len(pending) == 0 || len(chunk) == 0 { return false @@ -137,9 +179,12 @@ func responsesSSENeedsLineBreak(pending, chunk []byte) bool { if bytes.HasSuffix(pending, []byte("\n")) || bytes.HasSuffix(pending, []byte("\r")) { return false } - trimmed := bytes.TrimSpace(chunk) + if chunk[0] == '\n' || chunk[0] == '\r' { + return false + } + trimmed := bytes.TrimLeft(chunk, " \t") if len(trimmed) == 0 { - return true + return false } for _, prefix := range [][]byte{[]byte("data:"), []byte("event:"), []byte("id:"), []byte("retry:"), []byte(":")} { if bytes.HasPrefix(trimmed, prefix) { diff --git a/sdk/api/handlers/openai/openai_responses_handlers_stream_test.go b/sdk/api/handlers/openai/openai_responses_handlers_stream_test.go index e6efaa4a..ef16fe80 100644 --- a/sdk/api/handlers/openai/openai_responses_handlers_stream_test.go +++ b/sdk/api/handlers/openai/openai_responses_handlers_stream_test.go @@ -96,3 +96,47 @@ func TestForwardResponsesStreamPreservesValidFullSSEEventChunks(t *testing.T) { t.Fatalf("unexpected full-event framing.\nGot: %q\nWant: %q", got, string(chunk)) } } + +func TestForwardResponsesStreamBuffersSplitDataPayloadChunks(t *testing.T) { + h, recorder, c, flusher := newResponsesStreamTestHandler(t) + + data := make(chan []byte, 2) + errs := make(chan *interfaces.ErrorMessage) + data <- []byte("data: {\"type\":\"response.created\"") + data <- []byte(",\"response\":{\"id\":\"resp-1\"}}") + close(data) + close(errs) + + h.forwardResponsesStream(c, flusher, func(error) {}, data, errs, nil) + + got := recorder.Body.String() + want := "data: {\"type\":\"response.created\",\"response\":{\"id\":\"resp-1\"}}\n\n\n" + if got != want { + t.Fatalf("unexpected split-data framing.\nGot: %q\nWant: %q", got, want) + } +} + +func TestResponsesSSENeedsLineBreakSkipsChunksThatAlreadyStartWithNewline(t *testing.T) { + if responsesSSENeedsLineBreak([]byte("event: response.created"), []byte("\n")) { + t.Fatal("expected no injected newline before newline-only chunk") + } + if responsesSSENeedsLineBreak([]byte("event: response.created"), []byte("\r\n")) { + t.Fatal("expected no injected newline before CRLF chunk") + } +} + +func TestForwardResponsesStreamDropsIncompleteTrailingDataChunkOnFlush(t *testing.T) { + h, recorder, c, flusher := newResponsesStreamTestHandler(t) + + data := make(chan []byte, 1) + errs := make(chan *interfaces.ErrorMessage) + data <- []byte("data: {\"type\":\"response.created\"") + close(data) + close(errs) + + h.forwardResponsesStream(c, flusher, func(error) {}, data, errs, nil) + + if got := recorder.Body.String(); got != "\n" { + t.Fatalf("expected incomplete trailing data to be dropped on flush.\nGot: %q", got) + } +}