mirror of
https://github.com/router-for-me/CLIProxyAPIPlus.git
synced 2026-03-21 16:40:22 +00:00
- **Thinking Support**:
- Enabled thinking support for all Kiro Claude models, including Haiku 4.5 and agentic variants.
- Updated `model_definitions.go` with thinking configuration (Min: 1024, Max: 32000, ZeroAllowed: true).
- Fixed `extended_thinking` field names in `model_registry.go` (from `min_budget`/`max_budget` to `min`/`max`) to comply with Claude API specs, enabling thinking control in clients like Claude Code.
- **Kiro Executor Fixes**:
- Fixed `budget_tokens` handling: explicitly disable thinking when budget is 0 or negative.
- Removed aggressive duplicate content filtering logic that caused truncation/data loss.
- Enhanced thinking tag parsing with `extractThinkingFromContent` to correctly handle interleaved thinking/text blocks.
- Added EOF handling to flush pending thinking tag characters, preventing data loss at stream end.
- **Performance**:
- Optimized Claude stream handler (v6.2) with reduced buffer size (4KB) and faster flush interval (50ms) to minimize latency and prevent timeouts.
323 lines
11 KiB
Go
323 lines
11 KiB
Go
// Package claude provides HTTP handlers for Claude API code-related functionality.
|
|
// This package implements Claude-compatible streaming chat completions with sophisticated
|
|
// client rotation and quota management systems to ensure high availability and optimal
|
|
// resource utilization across multiple backend clients. It handles request translation
|
|
// between Claude API format and the underlying Gemini backend, providing seamless
|
|
// API compatibility while maintaining robust error handling and connection management.
|
|
package claude
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"compress/gzip"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"time"
|
|
|
|
"github.com/gin-gonic/gin"
|
|
. "github.com/router-for-me/CLIProxyAPI/v6/internal/constant"
|
|
"github.com/router-for-me/CLIProxyAPI/v6/internal/interfaces"
|
|
"github.com/router-for-me/CLIProxyAPI/v6/internal/registry"
|
|
"github.com/router-for-me/CLIProxyAPI/v6/sdk/api/handlers"
|
|
log "github.com/sirupsen/logrus"
|
|
"github.com/tidwall/gjson"
|
|
)
|
|
|
|
// ClaudeCodeAPIHandler contains the handlers for Claude API endpoints.
|
|
// It holds a pool of clients to interact with the backend service.
|
|
type ClaudeCodeAPIHandler struct {
|
|
*handlers.BaseAPIHandler
|
|
}
|
|
|
|
// NewClaudeCodeAPIHandler creates a new Claude API handlers instance.
|
|
// It takes an BaseAPIHandler instance as input and returns a ClaudeCodeAPIHandler.
|
|
//
|
|
// Parameters:
|
|
// - apiHandlers: The base API handler instance.
|
|
//
|
|
// Returns:
|
|
// - *ClaudeCodeAPIHandler: A new Claude code API handler instance.
|
|
func NewClaudeCodeAPIHandler(apiHandlers *handlers.BaseAPIHandler) *ClaudeCodeAPIHandler {
|
|
return &ClaudeCodeAPIHandler{
|
|
BaseAPIHandler: apiHandlers,
|
|
}
|
|
}
|
|
|
|
// HandlerType returns the identifier for this handler implementation.
|
|
func (h *ClaudeCodeAPIHandler) HandlerType() string {
|
|
return Claude
|
|
}
|
|
|
|
// Models returns a list of models supported by this handler.
|
|
func (h *ClaudeCodeAPIHandler) Models() []map[string]any {
|
|
// Get dynamic models from the global registry
|
|
modelRegistry := registry.GetGlobalRegistry()
|
|
return modelRegistry.GetAvailableModels("claude")
|
|
}
|
|
|
|
// ClaudeMessages handles Claude-compatible streaming chat completions.
|
|
// This function implements a sophisticated client rotation and quota management system
|
|
// to ensure high availability and optimal resource utilization across multiple backend clients.
|
|
//
|
|
// Parameters:
|
|
// - c: The Gin context for the request.
|
|
func (h *ClaudeCodeAPIHandler) ClaudeMessages(c *gin.Context) {
|
|
// Extract raw JSON data from the incoming request
|
|
rawJSON, err := c.GetRawData()
|
|
// If data retrieval fails, return a 400 Bad Request error.
|
|
if err != nil {
|
|
c.JSON(http.StatusBadRequest, handlers.ErrorResponse{
|
|
Error: handlers.ErrorDetail{
|
|
Message: fmt.Sprintf("Invalid request: %v", err),
|
|
Type: "invalid_request_error",
|
|
},
|
|
})
|
|
return
|
|
}
|
|
|
|
// Check if the client requested a streaming response.
|
|
streamResult := gjson.GetBytes(rawJSON, "stream")
|
|
if !streamResult.Exists() || streamResult.Type == gjson.False {
|
|
h.handleNonStreamingResponse(c, rawJSON)
|
|
} else {
|
|
h.handleStreamingResponse(c, rawJSON)
|
|
}
|
|
}
|
|
|
|
// ClaudeMessages handles Claude-compatible streaming chat completions.
|
|
// This function implements a sophisticated client rotation and quota management system
|
|
// to ensure high availability and optimal resource utilization across multiple backend clients.
|
|
//
|
|
// Parameters:
|
|
// - c: The Gin context for the request.
|
|
func (h *ClaudeCodeAPIHandler) ClaudeCountTokens(c *gin.Context) {
|
|
// Extract raw JSON data from the incoming request
|
|
rawJSON, err := c.GetRawData()
|
|
// If data retrieval fails, return a 400 Bad Request error.
|
|
if err != nil {
|
|
c.JSON(http.StatusBadRequest, handlers.ErrorResponse{
|
|
Error: handlers.ErrorDetail{
|
|
Message: fmt.Sprintf("Invalid request: %v", err),
|
|
Type: "invalid_request_error",
|
|
},
|
|
})
|
|
return
|
|
}
|
|
|
|
c.Header("Content-Type", "application/json")
|
|
|
|
alt := h.GetAlt(c)
|
|
cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background())
|
|
|
|
modelName := gjson.GetBytes(rawJSON, "model").String()
|
|
|
|
resp, errMsg := h.ExecuteCountWithAuthManager(cliCtx, h.HandlerType(), modelName, rawJSON, alt)
|
|
if errMsg != nil {
|
|
h.WriteErrorResponse(c, errMsg)
|
|
cliCancel(errMsg.Error)
|
|
return
|
|
}
|
|
_, _ = c.Writer.Write(resp)
|
|
cliCancel()
|
|
}
|
|
|
|
// ClaudeModels handles the Claude models listing endpoint.
|
|
// It returns a JSON response containing available Claude models and their specifications.
|
|
//
|
|
// Parameters:
|
|
// - c: The Gin context for the request.
|
|
func (h *ClaudeCodeAPIHandler) ClaudeModels(c *gin.Context) {
|
|
c.JSON(http.StatusOK, gin.H{
|
|
"data": h.Models(),
|
|
})
|
|
}
|
|
|
|
// handleNonStreamingResponse handles non-streaming content generation requests for Claude models.
|
|
// This function processes the request synchronously and returns the complete generated
|
|
// response in a single API call. It supports various generation parameters and
|
|
// response formats.
|
|
//
|
|
// Parameters:
|
|
// - c: The Gin context for the request
|
|
// - modelName: The name of the Gemini model to use for content generation
|
|
// - rawJSON: The raw JSON request body containing generation parameters and content
|
|
func (h *ClaudeCodeAPIHandler) handleNonStreamingResponse(c *gin.Context, rawJSON []byte) {
|
|
c.Header("Content-Type", "application/json")
|
|
alt := h.GetAlt(c)
|
|
cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background())
|
|
|
|
modelName := gjson.GetBytes(rawJSON, "model").String()
|
|
|
|
resp, errMsg := h.ExecuteWithAuthManager(cliCtx, h.HandlerType(), modelName, rawJSON, alt)
|
|
if errMsg != nil {
|
|
h.WriteErrorResponse(c, errMsg)
|
|
cliCancel(errMsg.Error)
|
|
return
|
|
}
|
|
|
|
// Decompress gzipped responses - Claude API sometimes returns gzip without Content-Encoding header
|
|
// This fixes title generation and other non-streaming responses that arrive compressed
|
|
if len(resp) >= 2 && resp[0] == 0x1f && resp[1] == 0x8b {
|
|
gzReader, err := gzip.NewReader(bytes.NewReader(resp))
|
|
if err != nil {
|
|
log.Warnf("failed to decompress gzipped Claude response: %v", err)
|
|
} else {
|
|
defer gzReader.Close()
|
|
if decompressed, err := io.ReadAll(gzReader); err != nil {
|
|
log.Warnf("failed to read decompressed Claude response: %v", err)
|
|
} else {
|
|
resp = decompressed
|
|
}
|
|
}
|
|
}
|
|
|
|
_, _ = c.Writer.Write(resp)
|
|
cliCancel()
|
|
}
|
|
|
|
// handleStreamingResponse streams Claude-compatible responses backed by Gemini.
|
|
// It sets up SSE, selects a backend client with rotation/quota logic,
|
|
// forwards chunks, and translates them to Claude CLI format.
|
|
//
|
|
// Parameters:
|
|
// - c: The Gin context for the request.
|
|
// - rawJSON: The raw JSON request body.
|
|
func (h *ClaudeCodeAPIHandler) handleStreamingResponse(c *gin.Context, rawJSON []byte) {
|
|
// Set up Server-Sent Events (SSE) headers for streaming response
|
|
// These headers are essential for maintaining a persistent connection
|
|
// and enabling real-time streaming of chat completions
|
|
c.Header("Content-Type", "text/event-stream")
|
|
c.Header("Cache-Control", "no-cache")
|
|
c.Header("Connection", "keep-alive")
|
|
c.Header("Access-Control-Allow-Origin", "*")
|
|
|
|
// Get the http.Flusher interface to manually flush the response.
|
|
// This is crucial for streaming as it allows immediate sending of data chunks
|
|
flusher, ok := c.Writer.(http.Flusher)
|
|
if !ok {
|
|
c.JSON(http.StatusInternalServerError, handlers.ErrorResponse{
|
|
Error: handlers.ErrorDetail{
|
|
Message: "Streaming not supported",
|
|
Type: "server_error",
|
|
},
|
|
})
|
|
return
|
|
}
|
|
|
|
modelName := gjson.GetBytes(rawJSON, "model").String()
|
|
|
|
// Create a cancellable context for the backend client request
|
|
// This allows proper cleanup and cancellation of ongoing requests
|
|
cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background())
|
|
|
|
dataChan, errChan := h.ExecuteStreamWithAuthManager(cliCtx, h.HandlerType(), modelName, rawJSON, "")
|
|
h.forwardClaudeStream(c, flusher, func(err error) { cliCancel(err) }, dataChan, errChan)
|
|
return
|
|
}
|
|
|
|
func (h *ClaudeCodeAPIHandler) forwardClaudeStream(c *gin.Context, flusher http.Flusher, cancel func(error), data <-chan []byte, errs <-chan *interfaces.ErrorMessage) {
|
|
// v6.2: Immediate flush strategy for SSE streams
|
|
// SSE requires immediate data delivery to prevent client timeouts.
|
|
// Previous buffering strategy (16KB buffer, 8KB threshold) caused delays
|
|
// because SSE events are typically small (< 1KB), leading to client retries.
|
|
writer := bufio.NewWriterSize(c.Writer, 4*1024) // 4KB buffer (smaller for faster flush)
|
|
ticker := time.NewTicker(50 * time.Millisecond) // 50ms interval for responsive streaming
|
|
defer ticker.Stop()
|
|
|
|
var chunkIdx int
|
|
|
|
for {
|
|
select {
|
|
case <-c.Request.Context().Done():
|
|
// Context cancelled, flush any remaining data before exit
|
|
_ = writer.Flush()
|
|
cancel(c.Request.Context().Err())
|
|
return
|
|
|
|
case <-ticker.C:
|
|
// Flush any buffered data on timer to ensure responsiveness
|
|
// For SSE, we flush whenever there's any data to prevent client timeouts
|
|
if writer.Buffered() > 0 {
|
|
if err := writer.Flush(); err != nil {
|
|
// Error flushing, cancel and return
|
|
cancel(err)
|
|
return
|
|
}
|
|
flusher.Flush() // Also flush the underlying http.ResponseWriter
|
|
}
|
|
|
|
case chunk, ok := <-data:
|
|
if !ok {
|
|
// Stream ended, flush remaining data
|
|
_ = writer.Flush()
|
|
flusher.Flush()
|
|
cancel(nil)
|
|
return
|
|
}
|
|
|
|
// Forward the complete SSE event block directly (already formatted by the translator).
|
|
// The translator returns a complete SSE-compliant event block, including event:, data:, and separators.
|
|
// The handler just needs to forward it without reassembly.
|
|
if len(chunk) > 0 {
|
|
_, _ = writer.Write(chunk)
|
|
// Immediately flush for first few chunks to establish connection quickly
|
|
// This prevents client timeout/retry on slow backends like Kiro
|
|
if chunkIdx < 3 {
|
|
_ = writer.Flush()
|
|
flusher.Flush()
|
|
}
|
|
}
|
|
chunkIdx++
|
|
|
|
case errMsg, ok := <-errs:
|
|
if !ok {
|
|
continue
|
|
}
|
|
if errMsg != nil {
|
|
status := http.StatusInternalServerError
|
|
if errMsg.StatusCode > 0 {
|
|
status = errMsg.StatusCode
|
|
}
|
|
c.Status(status)
|
|
// An error occurred: emit as a proper SSE error event
|
|
errorBytes, _ := json.Marshal(h.toClaudeError(errMsg))
|
|
_, _ = writer.WriteString("event: error\n")
|
|
_, _ = writer.WriteString("data: ")
|
|
_, _ = writer.Write(errorBytes)
|
|
_, _ = writer.WriteString("\n\n")
|
|
_ = writer.Flush()
|
|
flusher.Flush()
|
|
}
|
|
var execErr error
|
|
if errMsg != nil {
|
|
execErr = errMsg.Error
|
|
}
|
|
cancel(execErr)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
type claudeErrorDetail struct {
|
|
Type string `json:"type"`
|
|
Message string `json:"message"`
|
|
}
|
|
|
|
type claudeErrorResponse struct {
|
|
Type string `json:"type"`
|
|
Error claudeErrorDetail `json:"error"`
|
|
}
|
|
|
|
func (h *ClaudeCodeAPIHandler) toClaudeError(msg *interfaces.ErrorMessage) claudeErrorResponse {
|
|
return claudeErrorResponse{
|
|
Type: "error",
|
|
Error: claudeErrorDetail{
|
|
Type: "api_error",
|
|
Message: msg.Error.Error(),
|
|
},
|
|
}
|
|
}
|