Merge remote-tracking branch 'origin/main' into pr-104

# Conflicts:
#	config.example.yaml
#	internal/config/config.go
#	sdk/cliproxy/auth/model_name_mappings.go
This commit is contained in:
Luis Pater
2026-01-16 09:40:07 +08:00
76 changed files with 15505 additions and 361 deletions

View File

@@ -3,8 +3,11 @@ package amp
import (
"bytes"
"compress/gzip"
"context"
"errors"
"fmt"
"io"
"net"
"net/http"
"net/http/httputil"
"net/url"
@@ -102,7 +105,15 @@ func createReverseProxy(upstreamURL string, secretSource SecretSource) (*httputi
// Modify incoming responses to handle gzip without Content-Encoding
// This addresses the same issue as inline handler gzip handling, but at the proxy level
proxy.ModifyResponse = func(resp *http.Response) error {
// Only process successful responses
// Log upstream error responses for diagnostics (502, 503, etc.)
// These are NOT proxy connection errors - the upstream responded with an error status
if resp.StatusCode >= 500 {
log.Errorf("amp upstream responded with error [%d] for %s %s", resp.StatusCode, resp.Request.Method, resp.Request.URL.Path)
} else if resp.StatusCode >= 400 {
log.Warnf("amp upstream responded with client error [%d] for %s %s", resp.StatusCode, resp.Request.Method, resp.Request.URL.Path)
}
// Only process successful responses for gzip decompression
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return nil
}
@@ -186,9 +197,29 @@ func createReverseProxy(upstreamURL string, secretSource SecretSource) (*httputi
return nil
}
// Error handler for proxy failures
// Error handler for proxy failures with detailed error classification for diagnostics
proxy.ErrorHandler = func(rw http.ResponseWriter, req *http.Request, err error) {
log.Errorf("amp upstream proxy error for %s %s: %v", req.Method, req.URL.Path, err)
// Classify the error type for better diagnostics
var errType string
if errors.Is(err, context.DeadlineExceeded) {
errType = "timeout"
} else if errors.Is(err, context.Canceled) {
errType = "canceled"
} else if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
errType = "dial_timeout"
} else if _, ok := err.(net.Error); ok {
errType = "network_error"
} else {
errType = "connection_error"
}
// Don't log as error for context canceled - it's usually client closing connection
if errors.Is(err, context.Canceled) {
log.Debugf("amp upstream proxy [%s]: client canceled request for %s %s", errType, req.Method, req.URL.Path)
} else {
log.Errorf("amp upstream proxy error [%s] for %s %s: %v", errType, req.Method, req.URL.Path, err)
}
rw.Header().Set("Content-Type", "application/json")
rw.WriteHeader(http.StatusBadGateway)
_, _ = rw.Write([]byte(`{"error":"amp_upstream_proxy_error","message":"Failed to reach Amp upstream"}`))

View File

@@ -29,15 +29,71 @@ func NewResponseRewriter(w gin.ResponseWriter, originalModel string) *ResponseRe
}
}
const maxBufferedResponseBytes = 2 * 1024 * 1024 // 2MB safety cap
func looksLikeSSEChunk(data []byte) bool {
// Fallback detection: some upstreams may omit/lie about Content-Type, causing SSE to be buffered.
// Heuristics are intentionally simple and cheap.
return bytes.Contains(data, []byte("data:")) ||
bytes.Contains(data, []byte("event:")) ||
bytes.Contains(data, []byte("message_start")) ||
bytes.Contains(data, []byte("message_delta")) ||
bytes.Contains(data, []byte("content_block_start")) ||
bytes.Contains(data, []byte("content_block_delta")) ||
bytes.Contains(data, []byte("content_block_stop")) ||
bytes.Contains(data, []byte("\n\n"))
}
func (rw *ResponseRewriter) enableStreaming(reason string) error {
if rw.isStreaming {
return nil
}
rw.isStreaming = true
// Flush any previously buffered data to avoid reordering or data loss.
if rw.body != nil && rw.body.Len() > 0 {
buf := rw.body.Bytes()
// Copy before Reset() to keep bytes stable.
toFlush := make([]byte, len(buf))
copy(toFlush, buf)
rw.body.Reset()
if _, err := rw.ResponseWriter.Write(rw.rewriteStreamChunk(toFlush)); err != nil {
return err
}
if flusher, ok := rw.ResponseWriter.(http.Flusher); ok {
flusher.Flush()
}
}
log.Debugf("amp response rewriter: switched to streaming (%s)", reason)
return nil
}
// Write intercepts response writes and buffers them for model name replacement
func (rw *ResponseRewriter) Write(data []byte) (int, error) {
// Detect streaming on first write
if rw.body.Len() == 0 && !rw.isStreaming {
// Detect streaming on first write (header-based)
if !rw.isStreaming && rw.body.Len() == 0 {
contentType := rw.Header().Get("Content-Type")
rw.isStreaming = strings.Contains(contentType, "text/event-stream") ||
strings.Contains(contentType, "stream")
}
if !rw.isStreaming {
// Content-based fallback: detect SSE-like chunks even if Content-Type is missing/wrong.
if looksLikeSSEChunk(data) {
if err := rw.enableStreaming("sse heuristic"); err != nil {
return 0, err
}
} else if rw.body.Len()+len(data) > maxBufferedResponseBytes {
// Safety cap: avoid unbounded buffering on large responses.
log.Warnf("amp response rewriter: buffer exceeded %d bytes, switching to streaming", maxBufferedResponseBytes)
if err := rw.enableStreaming("buffer limit"); err != nil {
return 0, err
}
}
}
if rw.isStreaming {
n, err := rw.ResponseWriter.Write(rw.rewriteStreamChunk(data))
if err == nil {