mirror of
https://github.com/router-for-me/CLIProxyAPIPlus.git
synced 2026-05-02 06:36:14 +00:00
Follow-up review found two real framing hazards in the handler-layer framer: it could flush a partial `data:` payload before the JSON was complete, and it could inject an extra newline before chunks that already began with `\n`/`\r\n`. This commit tightens the framer so it only emits undelimited events when the buffered `data:` payload is already valid JSON (or `[DONE]`), skips newline injection for chunks that already start with a line break, and avoids the heavier `bytes.Split` path while scanning SSE fields. The regression suite now covers split `data:` payload chunks, newline-prefixed chunks, and dropping incomplete trailing data on flush, so the original Responses fix remains intact while the review concerns are explicitly locked down. Constraint: Keep the follow-up limited to handler-layer framing and tests Rejected: Ignore the review and rely on current executor chunk shapes | leaves partial data payload corruption possible Rejected: Build a fully generic SSE parser | wider change than needed for the identified risks Confidence: high Scope-risk: narrow Reversibility: clean Directive: Do not emit undelimited Responses SSE events unless buffered `data:` content is already complete and valid Tested: /tmp/go1.26.1/go/bin/go test ./sdk/api/handlers/openai -count=1 Tested: /tmp/go1.26.1/go/bin/go test ./sdk/api/handlers -count=1 Tested: /tmp/go1.26.1/go/bin/go vet ./sdk/api/handlers/... Not-tested: Full repository test suite outside sdk/api/handlers packages
143 lines
4.9 KiB
Go
143 lines
4.9 KiB
Go
package openai
|
|
|
|
import (
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"strings"
|
|
"testing"
|
|
|
|
"github.com/gin-gonic/gin"
|
|
"github.com/router-for-me/CLIProxyAPI/v6/internal/interfaces"
|
|
"github.com/router-for-me/CLIProxyAPI/v6/sdk/api/handlers"
|
|
sdkconfig "github.com/router-for-me/CLIProxyAPI/v6/sdk/config"
|
|
)
|
|
|
|
func newResponsesStreamTestHandler(t *testing.T) (*OpenAIResponsesAPIHandler, *httptest.ResponseRecorder, *gin.Context, http.Flusher) {
|
|
t.Helper()
|
|
|
|
gin.SetMode(gin.TestMode)
|
|
base := handlers.NewBaseAPIHandlers(&sdkconfig.SDKConfig{}, nil)
|
|
h := NewOpenAIResponsesAPIHandler(base)
|
|
|
|
recorder := httptest.NewRecorder()
|
|
c, _ := gin.CreateTestContext(recorder)
|
|
c.Request = httptest.NewRequest(http.MethodPost, "/v1/responses", nil)
|
|
|
|
flusher, ok := c.Writer.(http.Flusher)
|
|
if !ok {
|
|
t.Fatalf("expected gin writer to implement http.Flusher")
|
|
}
|
|
|
|
return h, recorder, c, flusher
|
|
}
|
|
|
|
func TestForwardResponsesStreamSeparatesDataOnlySSEChunks(t *testing.T) {
|
|
h, recorder, c, flusher := newResponsesStreamTestHandler(t)
|
|
|
|
data := make(chan []byte, 2)
|
|
errs := make(chan *interfaces.ErrorMessage)
|
|
data <- []byte("data: {\"type\":\"response.output_item.done\",\"item\":{\"type\":\"function_call\",\"arguments\":\"{}\"}}")
|
|
data <- []byte("data: {\"type\":\"response.completed\",\"response\":{\"id\":\"resp-1\",\"output\":[]}}")
|
|
close(data)
|
|
close(errs)
|
|
|
|
h.forwardResponsesStream(c, flusher, func(error) {}, data, errs, nil)
|
|
body := recorder.Body.String()
|
|
parts := strings.Split(strings.TrimSpace(body), "\n\n")
|
|
if len(parts) != 2 {
|
|
t.Fatalf("expected 2 SSE events, got %d. Body: %q", len(parts), body)
|
|
}
|
|
|
|
expectedPart1 := "data: {\"type\":\"response.output_item.done\",\"item\":{\"type\":\"function_call\",\"arguments\":\"{}\"}}"
|
|
if parts[0] != expectedPart1 {
|
|
t.Errorf("unexpected first event.\nGot: %q\nWant: %q", parts[0], expectedPart1)
|
|
}
|
|
|
|
expectedPart2 := "data: {\"type\":\"response.completed\",\"response\":{\"id\":\"resp-1\",\"output\":[]}}"
|
|
if parts[1] != expectedPart2 {
|
|
t.Errorf("unexpected second event.\nGot: %q\nWant: %q", parts[1], expectedPart2)
|
|
}
|
|
}
|
|
|
|
func TestForwardResponsesStreamReassemblesSplitSSEEventChunks(t *testing.T) {
|
|
h, recorder, c, flusher := newResponsesStreamTestHandler(t)
|
|
|
|
data := make(chan []byte, 3)
|
|
errs := make(chan *interfaces.ErrorMessage)
|
|
data <- []byte("event: response.created")
|
|
data <- []byte("data: {\"type\":\"response.created\",\"response\":{\"id\":\"resp-1\"}}")
|
|
data <- []byte("\n")
|
|
close(data)
|
|
close(errs)
|
|
|
|
h.forwardResponsesStream(c, flusher, func(error) {}, data, errs, nil)
|
|
|
|
got := strings.TrimSuffix(recorder.Body.String(), "\n")
|
|
want := "event: response.created\ndata: {\"type\":\"response.created\",\"response\":{\"id\":\"resp-1\"}}\n\n"
|
|
if got != want {
|
|
t.Fatalf("unexpected split-event framing.\nGot: %q\nWant: %q", got, want)
|
|
}
|
|
}
|
|
|
|
func TestForwardResponsesStreamPreservesValidFullSSEEventChunks(t *testing.T) {
|
|
h, recorder, c, flusher := newResponsesStreamTestHandler(t)
|
|
|
|
data := make(chan []byte, 1)
|
|
errs := make(chan *interfaces.ErrorMessage)
|
|
chunk := []byte("event: response.created\ndata: {\"type\":\"response.created\",\"response\":{\"id\":\"resp-1\"}}\n\n")
|
|
data <- chunk
|
|
close(data)
|
|
close(errs)
|
|
|
|
h.forwardResponsesStream(c, flusher, func(error) {}, data, errs, nil)
|
|
|
|
got := strings.TrimSuffix(recorder.Body.String(), "\n")
|
|
if got != string(chunk) {
|
|
t.Fatalf("unexpected full-event framing.\nGot: %q\nWant: %q", got, string(chunk))
|
|
}
|
|
}
|
|
|
|
func TestForwardResponsesStreamBuffersSplitDataPayloadChunks(t *testing.T) {
|
|
h, recorder, c, flusher := newResponsesStreamTestHandler(t)
|
|
|
|
data := make(chan []byte, 2)
|
|
errs := make(chan *interfaces.ErrorMessage)
|
|
data <- []byte("data: {\"type\":\"response.created\"")
|
|
data <- []byte(",\"response\":{\"id\":\"resp-1\"}}")
|
|
close(data)
|
|
close(errs)
|
|
|
|
h.forwardResponsesStream(c, flusher, func(error) {}, data, errs, nil)
|
|
|
|
got := recorder.Body.String()
|
|
want := "data: {\"type\":\"response.created\",\"response\":{\"id\":\"resp-1\"}}\n\n\n"
|
|
if got != want {
|
|
t.Fatalf("unexpected split-data framing.\nGot: %q\nWant: %q", got, want)
|
|
}
|
|
}
|
|
|
|
func TestResponsesSSENeedsLineBreakSkipsChunksThatAlreadyStartWithNewline(t *testing.T) {
|
|
if responsesSSENeedsLineBreak([]byte("event: response.created"), []byte("\n")) {
|
|
t.Fatal("expected no injected newline before newline-only chunk")
|
|
}
|
|
if responsesSSENeedsLineBreak([]byte("event: response.created"), []byte("\r\n")) {
|
|
t.Fatal("expected no injected newline before CRLF chunk")
|
|
}
|
|
}
|
|
|
|
func TestForwardResponsesStreamDropsIncompleteTrailingDataChunkOnFlush(t *testing.T) {
|
|
h, recorder, c, flusher := newResponsesStreamTestHandler(t)
|
|
|
|
data := make(chan []byte, 1)
|
|
errs := make(chan *interfaces.ErrorMessage)
|
|
data <- []byte("data: {\"type\":\"response.created\"")
|
|
close(data)
|
|
close(errs)
|
|
|
|
h.forwardResponsesStream(c, flusher, func(error) {}, data, errs, nil)
|
|
|
|
if got := recorder.Body.String(); got != "\n" {
|
|
t.Fatalf("expected incomplete trailing data to be dropped on flush.\nGot: %q", got)
|
|
}
|
|
}
|