From b15453c369897df02b016d1dbb2d879fe9c1c68c Mon Sep 17 00:00:00 2001 From: CharTyr Date: Mon, 30 Mar 2026 00:42:04 -0400 Subject: [PATCH] fix(amp): address PR review - stream thinking suppression, SSE detection, test init - Call suppressAmpThinking in rewriteStreamEvent for streaming path - Handle nil return from suppressAmpThinking to skip suppressed events - Narrow looksLikeSSEChunk to line-prefix detection (HasPrefix vs Contains) - Initialize suppressedContentBlock map in test --- internal/api/modules/amp/response_rewriter.go | 36 ++++++++++++------- .../api/modules/amp/response_rewriter_test.go | 2 +- 2 files changed, 24 insertions(+), 14 deletions(-) diff --git a/internal/api/modules/amp/response_rewriter.go b/internal/api/modules/amp/response_rewriter.go index fa83f7b9..64757963 100644 --- a/internal/api/modules/amp/response_rewriter.go +++ b/internal/api/modules/amp/response_rewriter.go @@ -36,14 +36,14 @@ func NewResponseRewriter(w gin.ResponseWriter, originalModel string) *ResponseRe const maxBufferedResponseBytes = 2 * 1024 * 1024 // 2MB safety cap func looksLikeSSEChunk(data []byte) bool { - return bytes.Contains(data, []byte("data:")) || - bytes.Contains(data, []byte("event:")) || - bytes.Contains(data, []byte("message_start")) || - bytes.Contains(data, []byte("message_delta")) || - bytes.Contains(data, []byte("content_block_start")) || - bytes.Contains(data, []byte("content_block_delta")) || - bytes.Contains(data, []byte("content_block_stop")) || - bytes.Contains(data, []byte("\n\n")) + for _, line := range bytes.Split(data, []byte("\n")) { + trimmed := bytes.TrimSpace(line) + if bytes.HasPrefix(trimmed, []byte("data:")) || + bytes.HasPrefix(trimmed, []byte("event:")) { + return true + } + } + return false } func (rw *ResponseRewriter) enableStreaming(reason string) error { @@ -250,11 +250,15 @@ func (rw *ResponseRewriter) rewriteStreamChunk(chunk []byte) []byte { } if dataIdx >= 0 { - // Found event+data pair - process through model rewriter only - // (no thinking suppression for streaming) + // Found event+data pair - process through rewriter jsonData := bytes.TrimPrefix(bytes.TrimSpace(lines[dataIdx]), []byte("data: ")) if len(jsonData) > 0 && jsonData[0] == '{' { rewritten := rw.rewriteStreamEvent(jsonData) + if rewritten == nil { + // Event suppressed (e.g. thinking block), skip event+data pair + i = dataIdx + 1 + continue + } // Emit event line out = append(out, line) // Emit blank lines between event and data @@ -280,7 +284,9 @@ func (rw *ResponseRewriter) rewriteStreamChunk(chunk []byte) []byte { jsonData := bytes.TrimPrefix(trimmed, []byte("data: ")) if len(jsonData) > 0 && jsonData[0] == '{' { rewritten := rw.rewriteStreamEvent(jsonData) - out = append(out, append([]byte("data: "), rewritten...)) + if rewritten != nil { + out = append(out, append([]byte("data: "), rewritten...)) + } i++ continue } @@ -296,9 +302,13 @@ func (rw *ResponseRewriter) rewriteStreamChunk(chunk []byte) []byte { // rewriteStreamEvent processes a single JSON event in the SSE stream. // It rewrites model names and ensures signature fields exist. -// Unlike rewriteModelInResponse, it does NOT suppress thinking blocks -// in streaming mode - they are passed through with signature injection. func (rw *ResponseRewriter) rewriteStreamEvent(data []byte) []byte { + // Suppress thinking blocks before any other processing. + data = rw.suppressAmpThinking(data) + if len(data) == 0 { + return nil + } + // Inject empty signature where needed data = ensureAmpSignature(data) diff --git a/internal/api/modules/amp/response_rewriter_test.go b/internal/api/modules/amp/response_rewriter_test.go index ca477d4e..2f23d74d 100644 --- a/internal/api/modules/amp/response_rewriter_test.go +++ b/internal/api/modules/amp/response_rewriter_test.go @@ -101,7 +101,7 @@ func TestRewriteStreamChunk_MessageModel(t *testing.T) { } func TestRewriteStreamChunk_SuppressesThinkingContentBlockFrames(t *testing.T) { - rw := &ResponseRewriter{} + rw := &ResponseRewriter{suppressedContentBlock: make(map[int]struct{})} chunk := []byte("event: content_block_start\ndata: {\"type\":\"content_block_start\",\"index\":0,\"content_block\":{\"type\":\"thinking\",\"thinking\":\"\"}}\n\nevent: content_block_delta\ndata: {\"type\":\"content_block_delta\",\"index\":0,\"delta\":{\"type\":\"thinking_delta\",\"thinking\":\"abc\"}}\n\nevent: content_block_stop\ndata: {\"type\":\"content_block_stop\",\"index\":0}\n\nevent: content_block_start\ndata: {\"type\":\"content_block_start\",\"index\":1,\"content_block\":{\"type\":\"tool_use\",\"name\":\"bash\",\"input\":{}}}\n\n") result := rw.rewriteStreamChunk(chunk)