diff --git a/.github/workflows/docker-image.yml b/.github/workflows/docker-image.yml index 9bdac283..949e5f9d 100644 --- a/.github/workflows/docker-image.yml +++ b/.github/workflows/docker-image.yml @@ -10,13 +10,11 @@ env: DOCKERHUB_REPO: eceasy/cli-proxy-api-plus jobs: - docker: + docker_amd64: runs-on: ubuntu-latest steps: - name: Checkout uses: actions/checkout@v4 - - name: Set up QEMU - uses: docker/setup-qemu-action@v3 - name: Set up Docker Buildx uses: docker/setup-buildx-action@v3 - name: Login to DockerHub @@ -29,19 +27,113 @@ jobs: echo VERSION=`git describe --tags --always --dirty` >> $GITHUB_ENV echo COMMIT=`git rev-parse --short HEAD` >> $GITHUB_ENV echo BUILD_DATE=`date -u +%Y-%m-%dT%H:%M:%SZ` >> $GITHUB_ENV - - name: Build and push + - name: Build and push (amd64) uses: docker/build-push-action@v6 with: context: . - platforms: | - linux/amd64 - linux/arm64 + platforms: linux/amd64 push: true build-args: | VERSION=${{ env.VERSION }} COMMIT=${{ env.COMMIT }} BUILD_DATE=${{ env.BUILD_DATE }} tags: | - ${{ env.DOCKERHUB_REPO }}:latest - ${{ env.DOCKERHUB_REPO }}:${{ env.VERSION }} + ${{ env.DOCKERHUB_REPO }}:latest-amd64 + ${{ env.DOCKERHUB_REPO }}:${{ env.VERSION }}-amd64 + docker_arm64: + runs-on: ubuntu-24.04-arm + steps: + - name: Checkout + uses: actions/checkout@v4 + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + - name: Login to DockerHub + uses: docker/login-action@v3 + with: + username: ${{ secrets.DOCKERHUB_USERNAME }} + password: ${{ secrets.DOCKERHUB_TOKEN }} + - name: Generate Build Metadata + run: | + echo VERSION=`git describe --tags --always --dirty` >> $GITHUB_ENV + echo COMMIT=`git rev-parse --short HEAD` >> $GITHUB_ENV + echo BUILD_DATE=`date -u +%Y-%m-%dT%H:%M:%SZ` >> $GITHUB_ENV + - name: Build and push (arm64) + uses: docker/build-push-action@v6 + with: + context: . + platforms: linux/arm64 + push: true + build-args: | + VERSION=${{ env.VERSION }} + COMMIT=${{ env.COMMIT }} + BUILD_DATE=${{ env.BUILD_DATE }} + tags: | + ${{ env.DOCKERHUB_REPO }}:latest-arm64 + ${{ env.DOCKERHUB_REPO }}:${{ env.VERSION }}-arm64 + + docker_manifest: + runs-on: ubuntu-latest + needs: + - docker_amd64 + - docker_arm64 + steps: + - name: Checkout + uses: actions/checkout@v4 + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + - name: Login to DockerHub + uses: docker/login-action@v3 + with: + username: ${{ secrets.DOCKERHUB_USERNAME }} + password: ${{ secrets.DOCKERHUB_TOKEN }} + - name: Generate Build Metadata + run: | + echo VERSION=`git describe --tags --always --dirty` >> $GITHUB_ENV + echo COMMIT=`git rev-parse --short HEAD` >> $GITHUB_ENV + echo BUILD_DATE=`date -u +%Y-%m-%dT%H:%M:%SZ` >> $GITHUB_ENV + - name: Create and push multi-arch manifests + run: | + docker buildx imagetools create \ + --tag "${DOCKERHUB_REPO}:latest" \ + "${DOCKERHUB_REPO}:latest-amd64" \ + "${DOCKERHUB_REPO}:latest-arm64" + docker buildx imagetools create \ + --tag "${DOCKERHUB_REPO}:${VERSION}" \ + "${DOCKERHUB_REPO}:${VERSION}-amd64" \ + "${DOCKERHUB_REPO}:${VERSION}-arm64" + - name: Cleanup temporary tags + continue-on-error: true + env: + DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }} + DOCKERHUB_TOKEN: ${{ secrets.DOCKERHUB_TOKEN }} + run: | + set -euo pipefail + namespace="${DOCKERHUB_REPO%%/*}" + repo_name="${DOCKERHUB_REPO#*/}" + + token="$( + curl -fsSL \ + -H 'Content-Type: application/json' \ + -d "{\"username\":\"${DOCKERHUB_USERNAME}\",\"password\":\"${DOCKERHUB_TOKEN}\"}" \ + 'https://hub.docker.com/v2/users/login/' \ + | python3 -c 'import json,sys; print(json.load(sys.stdin)["token"])' + )" + + delete_tag() { + local tag="$1" + local url="https://hub.docker.com/v2/repositories/${namespace}/${repo_name}/tags/${tag}/" + local http_code + http_code="$(curl -sS -o /dev/null -w "%{http_code}" -X DELETE -H "Authorization: JWT ${token}" "${url}" || true)" + if [ "${http_code}" = "204" ] || [ "${http_code}" = "404" ]; then + echo "Docker Hub tag removed (or missing): ${DOCKERHUB_REPO}:${tag} (HTTP ${http_code})" + return 0 + fi + echo "Docker Hub tag delete failed: ${DOCKERHUB_REPO}:${tag} (HTTP ${http_code})" + return 0 + } + + delete_tag "latest-amd64" + delete_tag "latest-arm64" + delete_tag "${VERSION}-amd64" + delete_tag "${VERSION}-arm64" diff --git a/internal/api/middleware/request_logging.go b/internal/api/middleware/request_logging.go index 49f28f52..2c9fdbdd 100644 --- a/internal/api/middleware/request_logging.go +++ b/internal/api/middleware/request_logging.go @@ -8,6 +8,7 @@ import ( "io" "net/http" "strings" + "time" "github.com/gin-gonic/gin" "github.com/router-for-me/CLIProxyAPI/v6/internal/logging" @@ -103,6 +104,7 @@ func captureRequestInfo(c *gin.Context) (*RequestInfo, error) { Headers: headers, Body: body, RequestID: logging.GetGinRequestID(c), + Timestamp: time.Now(), }, nil } diff --git a/internal/api/middleware/response_writer.go b/internal/api/middleware/response_writer.go index 8029e50a..50fa1c69 100644 --- a/internal/api/middleware/response_writer.go +++ b/internal/api/middleware/response_writer.go @@ -7,6 +7,7 @@ import ( "bytes" "net/http" "strings" + "time" "github.com/gin-gonic/gin" "github.com/router-for-me/CLIProxyAPI/v6/internal/interfaces" @@ -20,22 +21,24 @@ type RequestInfo struct { Headers map[string][]string // Headers contains the request headers. Body []byte // Body is the raw request body. RequestID string // RequestID is the unique identifier for the request. + Timestamp time.Time // Timestamp is when the request was received. } // ResponseWriterWrapper wraps the standard gin.ResponseWriter to intercept and log response data. // It is designed to handle both standard and streaming responses, ensuring that logging operations do not block the client response. type ResponseWriterWrapper struct { gin.ResponseWriter - body *bytes.Buffer // body is a buffer to store the response body for non-streaming responses. - isStreaming bool // isStreaming indicates whether the response is a streaming type (e.g., text/event-stream). - streamWriter logging.StreamingLogWriter // streamWriter is a writer for handling streaming log entries. - chunkChannel chan []byte // chunkChannel is a channel for asynchronously passing response chunks to the logger. - streamDone chan struct{} // streamDone signals when the streaming goroutine completes. - logger logging.RequestLogger // logger is the instance of the request logger service. - requestInfo *RequestInfo // requestInfo holds the details of the original request. - statusCode int // statusCode stores the HTTP status code of the response. - headers map[string][]string // headers stores the response headers. - logOnErrorOnly bool // logOnErrorOnly enables logging only when an error response is detected. + body *bytes.Buffer // body is a buffer to store the response body for non-streaming responses. + isStreaming bool // isStreaming indicates whether the response is a streaming type (e.g., text/event-stream). + streamWriter logging.StreamingLogWriter // streamWriter is a writer for handling streaming log entries. + chunkChannel chan []byte // chunkChannel is a channel for asynchronously passing response chunks to the logger. + streamDone chan struct{} // streamDone signals when the streaming goroutine completes. + logger logging.RequestLogger // logger is the instance of the request logger service. + requestInfo *RequestInfo // requestInfo holds the details of the original request. + statusCode int // statusCode stores the HTTP status code of the response. + headers map[string][]string // headers stores the response headers. + logOnErrorOnly bool // logOnErrorOnly enables logging only when an error response is detected. + firstChunkTimestamp time.Time // firstChunkTimestamp captures TTFB for streaming responses. } // NewResponseWriterWrapper creates and initializes a new ResponseWriterWrapper. @@ -73,6 +76,10 @@ func (w *ResponseWriterWrapper) Write(data []byte) (int, error) { // THEN: Handle logging based on response type if w.isStreaming && w.chunkChannel != nil { + // Capture TTFB on first chunk (synchronous, before async channel send) + if w.firstChunkTimestamp.IsZero() { + w.firstChunkTimestamp = time.Now() + } // For streaming responses: Send to async logging channel (non-blocking) select { case w.chunkChannel <- append([]byte(nil), data...): // Non-blocking send with copy @@ -117,6 +124,10 @@ func (w *ResponseWriterWrapper) WriteString(data string) (int, error) { // THEN: Capture for logging if w.isStreaming && w.chunkChannel != nil { + // Capture TTFB on first chunk (synchronous, before async channel send) + if w.firstChunkTimestamp.IsZero() { + w.firstChunkTimestamp = time.Now() + } select { case w.chunkChannel <- []byte(data): default: @@ -280,6 +291,8 @@ func (w *ResponseWriterWrapper) Finalize(c *gin.Context) error { w.streamDone = nil } + w.streamWriter.SetFirstChunkTimestamp(w.firstChunkTimestamp) + // Write API Request and Response to the streaming log before closing apiRequest := w.extractAPIRequest(c) if len(apiRequest) > 0 { @@ -297,7 +310,7 @@ func (w *ResponseWriterWrapper) Finalize(c *gin.Context) error { return nil } - return w.logRequest(finalStatusCode, w.cloneHeaders(), w.body.Bytes(), w.extractAPIRequest(c), w.extractAPIResponse(c), slicesAPIResponseError, forceLog) + return w.logRequest(finalStatusCode, w.cloneHeaders(), w.body.Bytes(), w.extractAPIRequest(c), w.extractAPIResponse(c), w.extractAPIResponseTimestamp(c), slicesAPIResponseError, forceLog) } func (w *ResponseWriterWrapper) cloneHeaders() map[string][]string { @@ -337,7 +350,18 @@ func (w *ResponseWriterWrapper) extractAPIResponse(c *gin.Context) []byte { return data } -func (w *ResponseWriterWrapper) logRequest(statusCode int, headers map[string][]string, body []byte, apiRequestBody, apiResponseBody []byte, apiResponseErrors []*interfaces.ErrorMessage, forceLog bool) error { +func (w *ResponseWriterWrapper) extractAPIResponseTimestamp(c *gin.Context) time.Time { + ts, isExist := c.Get("API_RESPONSE_TIMESTAMP") + if !isExist { + return time.Time{} + } + if t, ok := ts.(time.Time); ok { + return t + } + return time.Time{} +} + +func (w *ResponseWriterWrapper) logRequest(statusCode int, headers map[string][]string, body []byte, apiRequestBody, apiResponseBody []byte, apiResponseTimestamp time.Time, apiResponseErrors []*interfaces.ErrorMessage, forceLog bool) error { if w.requestInfo == nil { return nil } @@ -348,7 +372,7 @@ func (w *ResponseWriterWrapper) logRequest(statusCode int, headers map[string][] } if loggerWithOptions, ok := w.logger.(interface { - LogRequestWithOptions(string, string, map[string][]string, []byte, int, map[string][]string, []byte, []byte, []byte, []*interfaces.ErrorMessage, bool, string) error + LogRequestWithOptions(string, string, map[string][]string, []byte, int, map[string][]string, []byte, []byte, []byte, []*interfaces.ErrorMessage, bool, string, time.Time, time.Time) error }); ok { return loggerWithOptions.LogRequestWithOptions( w.requestInfo.URL, @@ -363,6 +387,8 @@ func (w *ResponseWriterWrapper) logRequest(statusCode int, headers map[string][] apiResponseErrors, forceLog, w.requestInfo.RequestID, + w.requestInfo.Timestamp, + apiResponseTimestamp, ) } @@ -378,5 +404,7 @@ func (w *ResponseWriterWrapper) logRequest(statusCode int, headers map[string][] apiResponseBody, apiResponseErrors, w.requestInfo.RequestID, + w.requestInfo.Timestamp, + apiResponseTimestamp, ) } diff --git a/internal/api/server.go b/internal/api/server.go index 28875726..723aca0a 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -12,6 +12,7 @@ import ( "net/http" "os" "path/filepath" + "reflect" "strings" "sync" "sync/atomic" @@ -1018,14 +1019,17 @@ func (s *Server) UpdateClients(cfg *config.Config) { s.mgmt.SetAuthManager(s.handlers.AuthManager) } - // Notify Amp module of config changes (for model mapping hot-reload) - if s.ampModule != nil { - log.Debugf("triggering amp module config update") - if err := s.ampModule.OnConfigUpdated(cfg); err != nil { - log.Errorf("failed to update Amp module config: %v", err) + // Notify Amp module only when Amp config has changed. + ampConfigChanged := oldCfg == nil || !reflect.DeepEqual(oldCfg.AmpCode, cfg.AmpCode) + if ampConfigChanged { + if s.ampModule != nil { + log.Debugf("triggering amp module config update") + if err := s.ampModule.OnConfigUpdated(cfg); err != nil { + log.Errorf("failed to update Amp module config: %v", err) + } + } else { + log.Warnf("amp module is nil, skipping config update") } - } else { - log.Warnf("amp module is nil, skipping config update") } // Count client sources from configuration and auth store. diff --git a/internal/config/config.go b/internal/config/config.go index 1dd50975..58656a07 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -986,6 +986,7 @@ func SaveConfigPreserveComments(configFile string, cfg *Config) error { removeLegacyGenerativeLanguageKeys(original.Content[0]) pruneMappingToGeneratedKeys(original.Content[0], generated.Content[0], "oauth-excluded-models") + pruneMappingToGeneratedKeys(original.Content[0], generated.Content[0], "oauth-model-alias") // Merge generated into original in-place, preserving comments/order of existing nodes. mergeMappingPreserve(original.Content[0], generated.Content[0]) @@ -1476,6 +1477,16 @@ func pruneMappingToGeneratedKeys(dstRoot, srcRoot *yaml.Node, key string) { } srcIdx := findMapKeyIndex(srcRoot, key) if srcIdx < 0 { + // Keep an explicit empty mapping for oauth-model-alias when it was previously present. + // + // Rationale: LoadConfig runs MigrateOAuthModelAlias before unmarshalling. If the + // oauth-model-alias key is missing, migration will add the default antigravity aliases. + // When users delete the last channel from oauth-model-alias via the management API, + // we want that deletion to persist across hot reloads and restarts. + if key == "oauth-model-alias" { + dstRoot.Content[dstIdx+1] = &yaml.Node{Kind: yaml.MappingNode, Tag: "!!map"} + return + } removeMapKey(dstRoot, key) return } diff --git a/internal/logging/request_logger.go b/internal/logging/request_logger.go index 397a4a08..cf9b4d5c 100644 --- a/internal/logging/request_logger.go +++ b/internal/logging/request_logger.go @@ -44,10 +44,12 @@ type RequestLogger interface { // - apiRequest: The API request data // - apiResponse: The API response data // - requestID: Optional request ID for log file naming + // - requestTimestamp: When the request was received + // - apiResponseTimestamp: When the API response was received // // Returns: // - error: An error if logging fails, nil otherwise - LogRequest(url, method string, requestHeaders map[string][]string, body []byte, statusCode int, responseHeaders map[string][]string, response, apiRequest, apiResponse []byte, apiResponseErrors []*interfaces.ErrorMessage, requestID string) error + LogRequest(url, method string, requestHeaders map[string][]string, body []byte, statusCode int, responseHeaders map[string][]string, response, apiRequest, apiResponse []byte, apiResponseErrors []*interfaces.ErrorMessage, requestID string, requestTimestamp, apiResponseTimestamp time.Time) error // LogStreamingRequest initiates logging for a streaming request and returns a writer for chunks. // @@ -109,6 +111,12 @@ type StreamingLogWriter interface { // - error: An error if writing fails, nil otherwise WriteAPIResponse(apiResponse []byte) error + // SetFirstChunkTimestamp sets the TTFB timestamp captured when first chunk was received. + // + // Parameters: + // - timestamp: The time when first response chunk was received + SetFirstChunkTimestamp(timestamp time.Time) + // Close finalizes the log file and cleans up resources. // // Returns: @@ -180,20 +188,22 @@ func (l *FileRequestLogger) SetEnabled(enabled bool) { // - apiRequest: The API request data // - apiResponse: The API response data // - requestID: Optional request ID for log file naming +// - requestTimestamp: When the request was received +// - apiResponseTimestamp: When the API response was received // // Returns: // - error: An error if logging fails, nil otherwise -func (l *FileRequestLogger) LogRequest(url, method string, requestHeaders map[string][]string, body []byte, statusCode int, responseHeaders map[string][]string, response, apiRequest, apiResponse []byte, apiResponseErrors []*interfaces.ErrorMessage, requestID string) error { - return l.logRequest(url, method, requestHeaders, body, statusCode, responseHeaders, response, apiRequest, apiResponse, apiResponseErrors, false, requestID) +func (l *FileRequestLogger) LogRequest(url, method string, requestHeaders map[string][]string, body []byte, statusCode int, responseHeaders map[string][]string, response, apiRequest, apiResponse []byte, apiResponseErrors []*interfaces.ErrorMessage, requestID string, requestTimestamp, apiResponseTimestamp time.Time) error { + return l.logRequest(url, method, requestHeaders, body, statusCode, responseHeaders, response, apiRequest, apiResponse, apiResponseErrors, false, requestID, requestTimestamp, apiResponseTimestamp) } // LogRequestWithOptions logs a request with optional forced logging behavior. // The force flag allows writing error logs even when regular request logging is disabled. -func (l *FileRequestLogger) LogRequestWithOptions(url, method string, requestHeaders map[string][]string, body []byte, statusCode int, responseHeaders map[string][]string, response, apiRequest, apiResponse []byte, apiResponseErrors []*interfaces.ErrorMessage, force bool, requestID string) error { - return l.logRequest(url, method, requestHeaders, body, statusCode, responseHeaders, response, apiRequest, apiResponse, apiResponseErrors, force, requestID) +func (l *FileRequestLogger) LogRequestWithOptions(url, method string, requestHeaders map[string][]string, body []byte, statusCode int, responseHeaders map[string][]string, response, apiRequest, apiResponse []byte, apiResponseErrors []*interfaces.ErrorMessage, force bool, requestID string, requestTimestamp, apiResponseTimestamp time.Time) error { + return l.logRequest(url, method, requestHeaders, body, statusCode, responseHeaders, response, apiRequest, apiResponse, apiResponseErrors, force, requestID, requestTimestamp, apiResponseTimestamp) } -func (l *FileRequestLogger) logRequest(url, method string, requestHeaders map[string][]string, body []byte, statusCode int, responseHeaders map[string][]string, response, apiRequest, apiResponse []byte, apiResponseErrors []*interfaces.ErrorMessage, force bool, requestID string) error { +func (l *FileRequestLogger) logRequest(url, method string, requestHeaders map[string][]string, body []byte, statusCode int, responseHeaders map[string][]string, response, apiRequest, apiResponse []byte, apiResponseErrors []*interfaces.ErrorMessage, force bool, requestID string, requestTimestamp, apiResponseTimestamp time.Time) error { if !l.enabled && !force { return nil } @@ -247,6 +257,8 @@ func (l *FileRequestLogger) logRequest(url, method string, requestHeaders map[st responseHeaders, responseToWrite, decompressErr, + requestTimestamp, + apiResponseTimestamp, ) if errClose := logFile.Close(); errClose != nil { log.WithError(errClose).Warn("failed to close request log file") @@ -499,17 +511,22 @@ func (l *FileRequestLogger) writeNonStreamingLog( responseHeaders map[string][]string, response []byte, decompressErr error, + requestTimestamp time.Time, + apiResponseTimestamp time.Time, ) error { - if errWrite := writeRequestInfoWithBody(w, url, method, requestHeaders, requestBody, requestBodyPath, time.Now()); errWrite != nil { + if requestTimestamp.IsZero() { + requestTimestamp = time.Now() + } + if errWrite := writeRequestInfoWithBody(w, url, method, requestHeaders, requestBody, requestBodyPath, requestTimestamp); errWrite != nil { return errWrite } - if errWrite := writeAPISection(w, "=== API REQUEST ===\n", "=== API REQUEST", apiRequest); errWrite != nil { + if errWrite := writeAPISection(w, "=== API REQUEST ===\n", "=== API REQUEST", apiRequest, time.Time{}); errWrite != nil { return errWrite } if errWrite := writeAPIErrorResponses(w, apiResponseErrors); errWrite != nil { return errWrite } - if errWrite := writeAPISection(w, "=== API RESPONSE ===\n", "=== API RESPONSE", apiResponse); errWrite != nil { + if errWrite := writeAPISection(w, "=== API RESPONSE ===\n", "=== API RESPONSE", apiResponse, apiResponseTimestamp); errWrite != nil { return errWrite } return writeResponseSection(w, statusCode, true, responseHeaders, bytes.NewReader(response), decompressErr, true) @@ -583,7 +600,7 @@ func writeRequestInfoWithBody( return nil } -func writeAPISection(w io.Writer, sectionHeader string, sectionPrefix string, payload []byte) error { +func writeAPISection(w io.Writer, sectionHeader string, sectionPrefix string, payload []byte, timestamp time.Time) error { if len(payload) == 0 { return nil } @@ -601,6 +618,11 @@ func writeAPISection(w io.Writer, sectionHeader string, sectionPrefix string, pa if _, errWrite := io.WriteString(w, sectionHeader); errWrite != nil { return errWrite } + if !timestamp.IsZero() { + if _, errWrite := io.WriteString(w, fmt.Sprintf("Timestamp: %s\n", timestamp.Format(time.RFC3339Nano))); errWrite != nil { + return errWrite + } + } if _, errWrite := w.Write(payload); errWrite != nil { return errWrite } @@ -974,6 +996,9 @@ type FileStreamingLogWriter struct { // apiResponse stores the upstream API response data. apiResponse []byte + + // apiResponseTimestamp captures when the API response was received. + apiResponseTimestamp time.Time } // WriteChunkAsync writes a response chunk asynchronously (non-blocking). @@ -1053,6 +1078,12 @@ func (w *FileStreamingLogWriter) WriteAPIResponse(apiResponse []byte) error { return nil } +func (w *FileStreamingLogWriter) SetFirstChunkTimestamp(timestamp time.Time) { + if !timestamp.IsZero() { + w.apiResponseTimestamp = timestamp + } +} + // Close finalizes the log file and cleans up resources. // It writes all buffered data to the file in the correct order: // API REQUEST -> API RESPONSE -> RESPONSE (status, headers, body chunks) @@ -1140,10 +1171,10 @@ func (w *FileStreamingLogWriter) writeFinalLog(logFile *os.File) error { if errWrite := writeRequestInfoWithBody(logFile, w.url, w.method, w.requestHeaders, nil, w.requestBodyPath, w.timestamp); errWrite != nil { return errWrite } - if errWrite := writeAPISection(logFile, "=== API REQUEST ===\n", "=== API REQUEST", w.apiRequest); errWrite != nil { + if errWrite := writeAPISection(logFile, "=== API REQUEST ===\n", "=== API REQUEST", w.apiRequest, time.Time{}); errWrite != nil { return errWrite } - if errWrite := writeAPISection(logFile, "=== API RESPONSE ===\n", "=== API RESPONSE", w.apiResponse); errWrite != nil { + if errWrite := writeAPISection(logFile, "=== API RESPONSE ===\n", "=== API RESPONSE", w.apiResponse, w.apiResponseTimestamp); errWrite != nil { return errWrite } @@ -1220,6 +1251,8 @@ func (w *NoOpStreamingLogWriter) WriteAPIResponse(_ []byte) error { return nil } +func (w *NoOpStreamingLogWriter) SetFirstChunkTimestamp(_ time.Time) {} + // Close is a no-op implementation that does nothing and always returns nil. // // Returns: diff --git a/internal/registry/model_definitions_static_data.go b/internal/registry/model_definitions_static_data.go index b1a524fb..cf5f1402 100644 --- a/internal/registry/model_definitions_static_data.go +++ b/internal/registry/model_definitions_static_data.go @@ -784,7 +784,7 @@ func GetIFlowModels() []*ModelInfo { {ID: "qwen3-coder-plus", DisplayName: "Qwen3-Coder-Plus", Description: "Qwen3 Coder Plus code generation", Created: 1753228800}, {ID: "qwen3-max", DisplayName: "Qwen3-Max", Description: "Qwen3 flagship model", Created: 1758672000}, {ID: "qwen3-vl-plus", DisplayName: "Qwen3-VL-Plus", Description: "Qwen3 multimodal vision-language", Created: 1758672000}, - {ID: "qwen3-max-preview", DisplayName: "Qwen3-Max-Preview", Description: "Qwen3 Max preview build", Created: 1757030400}, + {ID: "qwen3-max-preview", DisplayName: "Qwen3-Max-Preview", Description: "Qwen3 Max preview build", Created: 1757030400, Thinking: iFlowThinkingSupport}, {ID: "kimi-k2-0905", DisplayName: "Kimi-K2-Instruct-0905", Description: "Moonshot Kimi K2 instruct 0905", Created: 1757030400}, {ID: "glm-4.6", DisplayName: "GLM-4.6", Description: "Zhipu GLM 4.6 general model", Created: 1759190400, Thinking: iFlowThinkingSupport}, {ID: "glm-4.7", DisplayName: "GLM-4.7", Description: "Zhipu GLM 4.7 general model", Created: 1766448000, Thinking: iFlowThinkingSupport}, @@ -792,8 +792,8 @@ func GetIFlowModels() []*ModelInfo { {ID: "kimi-k2-thinking", DisplayName: "Kimi-K2-Thinking", Description: "Moonshot Kimi K2 thinking model", Created: 1762387200}, {ID: "deepseek-v3.2-chat", DisplayName: "DeepSeek-V3.2", Description: "DeepSeek V3.2 Chat", Created: 1764576000}, {ID: "deepseek-v3.2-reasoner", DisplayName: "DeepSeek-V3.2", Description: "DeepSeek V3.2 Reasoner", Created: 1764576000}, - {ID: "deepseek-v3.2", DisplayName: "DeepSeek-V3.2-Exp", Description: "DeepSeek V3.2 experimental", Created: 1759104000}, - {ID: "deepseek-v3.1", DisplayName: "DeepSeek-V3.1-Terminus", Description: "DeepSeek V3.1 Terminus", Created: 1756339200}, + {ID: "deepseek-v3.2", DisplayName: "DeepSeek-V3.2-Exp", Description: "DeepSeek V3.2 experimental", Created: 1759104000, Thinking: iFlowThinkingSupport}, + {ID: "deepseek-v3.1", DisplayName: "DeepSeek-V3.1-Terminus", Description: "DeepSeek V3.1 Terminus", Created: 1756339200, Thinking: iFlowThinkingSupport}, {ID: "deepseek-r1", DisplayName: "DeepSeek-R1", Description: "DeepSeek reasoning model R1", Created: 1737331200}, {ID: "deepseek-v3", DisplayName: "DeepSeek-V3-671B", Description: "DeepSeek V3 671B", Created: 1734307200}, {ID: "qwen3-32b", DisplayName: "Qwen3-32B", Description: "Qwen3 32B", Created: 1747094400}, diff --git a/internal/thinking/provider/iflow/apply.go b/internal/thinking/provider/iflow/apply.go index da986d22..35d13f59 100644 --- a/internal/thinking/provider/iflow/apply.go +++ b/internal/thinking/provider/iflow/apply.go @@ -1,7 +1,7 @@ -// Package iflow implements thinking configuration for iFlow models (GLM, MiniMax). +// Package iflow implements thinking configuration for iFlow models. // // iFlow models use boolean toggle semantics: -// - GLM models: chat_template_kwargs.enable_thinking (boolean) +// - Models using chat_template_kwargs.enable_thinking (boolean toggle) // - MiniMax models: reasoning_split (boolean) // // Level values are converted to boolean: none=false, all others=true @@ -20,6 +20,7 @@ import ( // Applier implements thinking.ProviderApplier for iFlow models. // // iFlow-specific behavior: +// - enable_thinking toggle models: enable_thinking boolean // - GLM models: enable_thinking boolean + clear_thinking=false // - MiniMax models: reasoning_split boolean // - Level to boolean: none=false, others=true @@ -61,8 +62,8 @@ func (a *Applier) Apply(body []byte, config thinking.ThinkingConfig, modelInfo * return body, nil } - if isGLMModel(modelInfo.ID) { - return applyGLM(body, config), nil + if isEnableThinkingModel(modelInfo.ID) { + return applyEnableThinking(body, config, isGLMModel(modelInfo.ID)), nil } if isMiniMaxModel(modelInfo.ID) { @@ -97,7 +98,8 @@ func configToBoolean(config thinking.ThinkingConfig) bool { } } -// applyGLM applies thinking configuration for GLM models. +// applyEnableThinking applies thinking configuration for models that use +// chat_template_kwargs.enable_thinking format. // // Output format when enabled: // @@ -107,9 +109,8 @@ func configToBoolean(config thinking.ThinkingConfig) bool { // // {"chat_template_kwargs": {"enable_thinking": false}} // -// Note: clear_thinking is only set when thinking is enabled, to preserve -// thinking output in the response. -func applyGLM(body []byte, config thinking.ThinkingConfig) []byte { +// Note: clear_thinking is only set for GLM models when thinking is enabled. +func applyEnableThinking(body []byte, config thinking.ThinkingConfig, setClearThinking bool) []byte { enableThinking := configToBoolean(config) if len(body) == 0 || !gjson.ValidBytes(body) { @@ -118,8 +119,11 @@ func applyGLM(body []byte, config thinking.ThinkingConfig) []byte { result, _ := sjson.SetBytes(body, "chat_template_kwargs.enable_thinking", enableThinking) + // clear_thinking is a GLM-only knob, strip it for other models. + result, _ = sjson.DeleteBytes(result, "chat_template_kwargs.clear_thinking") + // clear_thinking only needed when thinking is enabled - if enableThinking { + if enableThinking && setClearThinking { result, _ = sjson.SetBytes(result, "chat_template_kwargs.clear_thinking", false) } @@ -143,8 +147,21 @@ func applyMiniMax(body []byte, config thinking.ThinkingConfig) []byte { return result } +// isEnableThinkingModel determines if the model uses chat_template_kwargs.enable_thinking format. +func isEnableThinkingModel(modelID string) bool { + if isGLMModel(modelID) { + return true + } + id := strings.ToLower(modelID) + switch id { + case "qwen3-max-preview", "deepseek-v3.2", "deepseek-v3.1": + return true + default: + return false + } +} + // isGLMModel determines if the model is a GLM series model. -// GLM models use chat_template_kwargs.enable_thinking format. func isGLMModel(modelID string) bool { return strings.HasPrefix(strings.ToLower(modelID), "glm") } diff --git a/internal/translator/antigravity/gemini/antigravity_gemini_response.go b/internal/translator/antigravity/gemini/antigravity_gemini_response.go index 6f9d9791..874dc283 100644 --- a/internal/translator/antigravity/gemini/antigravity_gemini_response.go +++ b/internal/translator/antigravity/gemini/antigravity_gemini_response.go @@ -41,6 +41,7 @@ func ConvertAntigravityResponseToGemini(ctx context.Context, _ string, originalR responseResult := gjson.GetBytes(rawJSON, "response") if responseResult.Exists() { chunk = []byte(responseResult.Raw) + chunk = restoreUsageMetadata(chunk) } } else { chunkTemplate := "[]" @@ -76,7 +77,8 @@ func ConvertAntigravityResponseToGemini(ctx context.Context, _ string, originalR func ConvertAntigravityResponseToGeminiNonStream(_ context.Context, _ string, originalRequestRawJSON, requestRawJSON, rawJSON []byte, _ *any) string { responseResult := gjson.GetBytes(rawJSON, "response") if responseResult.Exists() { - return responseResult.Raw + chunk := restoreUsageMetadata([]byte(responseResult.Raw)) + return string(chunk) } return string(rawJSON) } @@ -84,3 +86,15 @@ func ConvertAntigravityResponseToGeminiNonStream(_ context.Context, _ string, or func GeminiTokenCount(ctx context.Context, count int64) string { return fmt.Sprintf(`{"totalTokens":%d,"promptTokensDetails":[{"modality":"TEXT","tokenCount":%d}]}`, count, count) } + +// restoreUsageMetadata renames cpaUsageMetadata back to usageMetadata. +// The executor renames usageMetadata to cpaUsageMetadata in non-terminal chunks +// to preserve usage data while hiding it from clients that don't expect it. +// When returning standard Gemini API format, we must restore the original name. +func restoreUsageMetadata(chunk []byte) []byte { + if cpaUsage := gjson.GetBytes(chunk, "cpaUsageMetadata"); cpaUsage.Exists() { + chunk, _ = sjson.SetRawBytes(chunk, "usageMetadata", []byte(cpaUsage.Raw)) + chunk, _ = sjson.DeleteBytes(chunk, "cpaUsageMetadata") + } + return chunk +} diff --git a/internal/translator/antigravity/gemini/antigravity_gemini_response_test.go b/internal/translator/antigravity/gemini/antigravity_gemini_response_test.go new file mode 100644 index 00000000..5f96012a --- /dev/null +++ b/internal/translator/antigravity/gemini/antigravity_gemini_response_test.go @@ -0,0 +1,95 @@ +package gemini + +import ( + "context" + "testing" +) + +func TestRestoreUsageMetadata(t *testing.T) { + tests := []struct { + name string + input []byte + expected string + }{ + { + name: "cpaUsageMetadata renamed to usageMetadata", + input: []byte(`{"modelVersion":"gemini-3-pro","cpaUsageMetadata":{"promptTokenCount":100,"candidatesTokenCount":200}}`), + expected: `{"modelVersion":"gemini-3-pro","usageMetadata":{"promptTokenCount":100,"candidatesTokenCount":200}}`, + }, + { + name: "no cpaUsageMetadata unchanged", + input: []byte(`{"modelVersion":"gemini-3-pro","usageMetadata":{"promptTokenCount":100}}`), + expected: `{"modelVersion":"gemini-3-pro","usageMetadata":{"promptTokenCount":100}}`, + }, + { + name: "empty input", + input: []byte(`{}`), + expected: `{}`, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := restoreUsageMetadata(tt.input) + if string(result) != tt.expected { + t.Errorf("restoreUsageMetadata() = %s, want %s", string(result), tt.expected) + } + }) + } +} + +func TestConvertAntigravityResponseToGeminiNonStream(t *testing.T) { + tests := []struct { + name string + input []byte + expected string + }{ + { + name: "cpaUsageMetadata restored in response", + input: []byte(`{"response":{"modelVersion":"gemini-3-pro","cpaUsageMetadata":{"promptTokenCount":100}}}`), + expected: `{"modelVersion":"gemini-3-pro","usageMetadata":{"promptTokenCount":100}}`, + }, + { + name: "usageMetadata preserved", + input: []byte(`{"response":{"modelVersion":"gemini-3-pro","usageMetadata":{"promptTokenCount":100}}}`), + expected: `{"modelVersion":"gemini-3-pro","usageMetadata":{"promptTokenCount":100}}`, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := ConvertAntigravityResponseToGeminiNonStream(context.Background(), "", nil, nil, tt.input, nil) + if result != tt.expected { + t.Errorf("ConvertAntigravityResponseToGeminiNonStream() = %s, want %s", result, tt.expected) + } + }) + } +} + +func TestConvertAntigravityResponseToGeminiStream(t *testing.T) { + ctx := context.WithValue(context.Background(), "alt", "") + + tests := []struct { + name string + input []byte + expected string + }{ + { + name: "cpaUsageMetadata restored in streaming response", + input: []byte(`data: {"response":{"modelVersion":"gemini-3-pro","cpaUsageMetadata":{"promptTokenCount":100}}}`), + expected: `{"modelVersion":"gemini-3-pro","usageMetadata":{"promptTokenCount":100}}`, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + results := ConvertAntigravityResponseToGemini(ctx, "", nil, nil, tt.input, nil) + if len(results) != 1 { + t.Fatalf("expected 1 result, got %d", len(results)) + } + if results[0] != tt.expected { + t.Errorf("ConvertAntigravityResponseToGemini() = %s, want %s", results[0], tt.expected) + } + }) + } +} diff --git a/internal/watcher/synthesizer/file.go b/internal/watcher/synthesizer/file.go index ef0eb8c9..c80ebc66 100644 --- a/internal/watcher/synthesizer/file.go +++ b/internal/watcher/synthesizer/file.go @@ -86,12 +86,19 @@ func (s *FileSynthesizer) Synthesize(ctx *SynthesisContext) ([]*coreauth.Auth, e } } + disabled, _ := metadata["disabled"].(bool) + status := coreauth.StatusActive + if disabled { + status = coreauth.StatusDisabled + } + a := &coreauth.Auth{ ID: id, Provider: provider, Label: label, Prefix: prefix, - Status: coreauth.StatusActive, + Status: status, + Disabled: disabled, Attributes: map[string]string{ "source": full, "path": full, diff --git a/internal/wsrelay/http.go b/internal/wsrelay/http.go index 52ea2a1d..abdb277c 100644 --- a/internal/wsrelay/http.go +++ b/internal/wsrelay/http.go @@ -124,32 +124,47 @@ func (m *Manager) Stream(ctx context.Context, provider string, req *HTTPRequest) out := make(chan StreamEvent) go func() { defer close(out) + send := func(ev StreamEvent) bool { + if ctx == nil { + out <- ev + return true + } + select { + case <-ctx.Done(): + return false + case out <- ev: + return true + } + } for { select { case <-ctx.Done(): - out <- StreamEvent{Err: ctx.Err()} return case msg, ok := <-respCh: if !ok { - out <- StreamEvent{Err: errors.New("wsrelay: stream closed")} + _ = send(StreamEvent{Err: errors.New("wsrelay: stream closed")}) return } switch msg.Type { case MessageTypeStreamStart: resp := decodeResponse(msg.Payload) - out <- StreamEvent{Type: MessageTypeStreamStart, Status: resp.Status, Headers: resp.Headers} + if okSend := send(StreamEvent{Type: MessageTypeStreamStart, Status: resp.Status, Headers: resp.Headers}); !okSend { + return + } case MessageTypeStreamChunk: chunk := decodeChunk(msg.Payload) - out <- StreamEvent{Type: MessageTypeStreamChunk, Payload: chunk} + if okSend := send(StreamEvent{Type: MessageTypeStreamChunk, Payload: chunk}); !okSend { + return + } case MessageTypeStreamEnd: - out <- StreamEvent{Type: MessageTypeStreamEnd} + _ = send(StreamEvent{Type: MessageTypeStreamEnd}) return case MessageTypeError: - out <- StreamEvent{Type: MessageTypeError, Err: decodeError(msg.Payload)} + _ = send(StreamEvent{Type: MessageTypeError, Err: decodeError(msg.Payload)}) return case MessageTypeHTTPResp: resp := decodeResponse(msg.Payload) - out <- StreamEvent{Type: MessageTypeHTTPResp, Status: resp.Status, Headers: resp.Headers, Payload: resp.Body} + _ = send(StreamEvent{Type: MessageTypeHTTPResp, Status: resp.Status, Headers: resp.Headers, Payload: resp.Body}) return default: } diff --git a/sdk/api/handlers/gemini/gemini-cli_handlers.go b/sdk/api/handlers/gemini/gemini-cli_handlers.go index ea78657d..917902e7 100644 --- a/sdk/api/handlers/gemini/gemini-cli_handlers.go +++ b/sdk/api/handlers/gemini/gemini-cli_handlers.go @@ -124,6 +124,7 @@ func (h *GeminiCLIAPIHandler) CLIHandler(c *gin.Context) { log.Errorf("Failed to read response body: %v", err) return } + c.Set("API_RESPONSE_TIMESTAMP", time.Now()) _, _ = c.Writer.Write(output) c.Set("API_RESPONSE", output) } diff --git a/sdk/api/handlers/handlers.go b/sdk/api/handlers/handlers.go index 1c00420f..d8965187 100644 --- a/sdk/api/handlers/handlers.go +++ b/sdk/api/handlers/handlers.go @@ -362,6 +362,11 @@ func appendAPIResponse(c *gin.Context, data []byte) { return } + // Capture timestamp on first API response + if _, exists := c.Get("API_RESPONSE_TIMESTAMP"); !exists { + c.Set("API_RESPONSE_TIMESTAMP", time.Now()) + } + if existing, exists := c.Get("API_RESPONSE"); exists { if existingBytes, ok := existing.([]byte); ok && len(existingBytes) > 0 { combined := make([]byte, 0, len(existingBytes)+len(data)+1) @@ -507,6 +512,32 @@ func (h *BaseAPIHandler) ExecuteStreamWithAuthManager(ctx context.Context, handl bootstrapRetries := 0 maxBootstrapRetries := StreamingBootstrapRetries(h.Cfg) + sendErr := func(msg *interfaces.ErrorMessage) bool { + if ctx == nil { + errChan <- msg + return true + } + select { + case <-ctx.Done(): + return false + case errChan <- msg: + return true + } + } + + sendData := func(chunk []byte) bool { + if ctx == nil { + dataChan <- chunk + return true + } + select { + case <-ctx.Done(): + return false + case dataChan <- chunk: + return true + } + } + bootstrapEligible := func(err error) bool { status := statusFromError(err) if status == 0 { @@ -566,12 +597,14 @@ func (h *BaseAPIHandler) ExecuteStreamWithAuthManager(ctx context.Context, handl addon = hdr.Clone() } } - errChan <- &interfaces.ErrorMessage{StatusCode: status, Error: streamErr, Addon: addon} + _ = sendErr(&interfaces.ErrorMessage{StatusCode: status, Error: streamErr, Addon: addon}) return } if len(chunk.Payload) > 0 { sentPayload = true - dataChan <- cloneBytes(chunk.Payload) + if okSendData := sendData(cloneBytes(chunk.Payload)); !okSendData { + return + } } } } diff --git a/sdk/api/handlers/handlers_stream_bootstrap_test.go b/sdk/api/handlers/handlers_stream_bootstrap_test.go index 3851746d..7814ff1b 100644 --- a/sdk/api/handlers/handlers_stream_bootstrap_test.go +++ b/sdk/api/handlers/handlers_stream_bootstrap_test.go @@ -70,6 +70,58 @@ func (e *failOnceStreamExecutor) Calls() int { return e.calls } +type payloadThenErrorStreamExecutor struct { + mu sync.Mutex + calls int +} + +func (e *payloadThenErrorStreamExecutor) Identifier() string { return "codex" } + +func (e *payloadThenErrorStreamExecutor) Execute(context.Context, *coreauth.Auth, coreexecutor.Request, coreexecutor.Options) (coreexecutor.Response, error) { + return coreexecutor.Response{}, &coreauth.Error{Code: "not_implemented", Message: "Execute not implemented"} +} + +func (e *payloadThenErrorStreamExecutor) ExecuteStream(context.Context, *coreauth.Auth, coreexecutor.Request, coreexecutor.Options) (<-chan coreexecutor.StreamChunk, error) { + e.mu.Lock() + e.calls++ + e.mu.Unlock() + + ch := make(chan coreexecutor.StreamChunk, 2) + ch <- coreexecutor.StreamChunk{Payload: []byte("partial")} + ch <- coreexecutor.StreamChunk{ + Err: &coreauth.Error{ + Code: "upstream_closed", + Message: "upstream closed", + Retryable: false, + HTTPStatus: http.StatusBadGateway, + }, + } + close(ch) + return ch, nil +} + +func (e *payloadThenErrorStreamExecutor) Refresh(ctx context.Context, auth *coreauth.Auth) (*coreauth.Auth, error) { + return auth, nil +} + +func (e *payloadThenErrorStreamExecutor) CountTokens(context.Context, *coreauth.Auth, coreexecutor.Request, coreexecutor.Options) (coreexecutor.Response, error) { + return coreexecutor.Response{}, &coreauth.Error{Code: "not_implemented", Message: "CountTokens not implemented"} +} + +func (e *payloadThenErrorStreamExecutor) HttpRequest(ctx context.Context, auth *coreauth.Auth, req *http.Request) (*http.Response, error) { + return nil, &coreauth.Error{ + Code: "not_implemented", + Message: "HttpRequest not implemented", + HTTPStatus: http.StatusNotImplemented, + } +} + +func (e *payloadThenErrorStreamExecutor) Calls() int { + e.mu.Lock() + defer e.mu.Unlock() + return e.calls +} + func TestExecuteStreamWithAuthManager_RetriesBeforeFirstByte(t *testing.T) { executor := &failOnceStreamExecutor{} manager := coreauth.NewManager(nil, nil, nil) @@ -130,3 +182,73 @@ func TestExecuteStreamWithAuthManager_RetriesBeforeFirstByte(t *testing.T) { t.Fatalf("expected 2 stream attempts, got %d", executor.Calls()) } } + +func TestExecuteStreamWithAuthManager_DoesNotRetryAfterFirstByte(t *testing.T) { + executor := &payloadThenErrorStreamExecutor{} + manager := coreauth.NewManager(nil, nil, nil) + manager.RegisterExecutor(executor) + + auth1 := &coreauth.Auth{ + ID: "auth1", + Provider: "codex", + Status: coreauth.StatusActive, + Metadata: map[string]any{"email": "test1@example.com"}, + } + if _, err := manager.Register(context.Background(), auth1); err != nil { + t.Fatalf("manager.Register(auth1): %v", err) + } + + auth2 := &coreauth.Auth{ + ID: "auth2", + Provider: "codex", + Status: coreauth.StatusActive, + Metadata: map[string]any{"email": "test2@example.com"}, + } + if _, err := manager.Register(context.Background(), auth2); err != nil { + t.Fatalf("manager.Register(auth2): %v", err) + } + + registry.GetGlobalRegistry().RegisterClient(auth1.ID, auth1.Provider, []*registry.ModelInfo{{ID: "test-model"}}) + registry.GetGlobalRegistry().RegisterClient(auth2.ID, auth2.Provider, []*registry.ModelInfo{{ID: "test-model"}}) + t.Cleanup(func() { + registry.GetGlobalRegistry().UnregisterClient(auth1.ID) + registry.GetGlobalRegistry().UnregisterClient(auth2.ID) + }) + + handler := NewBaseAPIHandlers(&sdkconfig.SDKConfig{ + Streaming: sdkconfig.StreamingConfig{ + BootstrapRetries: 1, + }, + }, manager) + dataChan, errChan := handler.ExecuteStreamWithAuthManager(context.Background(), "openai", "test-model", []byte(`{"model":"test-model"}`), "") + if dataChan == nil || errChan == nil { + t.Fatalf("expected non-nil channels") + } + + var got []byte + for chunk := range dataChan { + got = append(got, chunk...) + } + + var gotErr error + var gotStatus int + for msg := range errChan { + if msg != nil && msg.Error != nil { + gotErr = msg.Error + gotStatus = msg.StatusCode + } + } + + if string(got) != "partial" { + t.Fatalf("expected payload partial, got %q", string(got)) + } + if gotErr == nil { + t.Fatalf("expected terminal error, got nil") + } + if gotStatus != http.StatusBadGateway { + t.Fatalf("expected status %d, got %d", http.StatusBadGateway, gotStatus) + } + if executor.Calls() != 1 { + t.Fatalf("expected 1 stream attempt, got %d", executor.Calls()) + } +} diff --git a/sdk/auth/filestore.go b/sdk/auth/filestore.go index 1658d857..b1147e9f 100644 --- a/sdk/auth/filestore.go +++ b/sdk/auth/filestore.go @@ -68,6 +68,7 @@ func (s *FileTokenStore) Save(ctx context.Context, auth *cliproxyauth.Auth) (str return "", err } case auth.Metadata != nil: + auth.Metadata["disabled"] = auth.Disabled raw, errMarshal := json.Marshal(auth.Metadata) if errMarshal != nil { return "", fmt.Errorf("auth filestore: marshal metadata failed: %w", errMarshal) @@ -214,6 +215,11 @@ func (s *FileTokenStore) readAuthFile(path, baseDir string) (*cliproxyauth.Auth, return nil, fmt.Errorf("stat file: %w", err) } id := s.idFor(path, baseDir) + disabled, _ := metadata["disabled"].(bool) + status := cliproxyauth.StatusActive + if disabled { + status = cliproxyauth.StatusDisabled + } // Calculate NextRefreshAfter from expires_at (20 minutes before expiry) var nextRefreshAfter time.Time @@ -228,7 +234,8 @@ func (s *FileTokenStore) readAuthFile(path, baseDir string) (*cliproxyauth.Auth, Provider: provider, FileName: id, Label: s.labelFor(metadata), - Status: cliproxyauth.StatusActive, + Status: status, + Disabled: disabled, Attributes: map[string]string{"path": path}, Metadata: metadata, CreatedAt: info.ModTime(), diff --git a/sdk/cliproxy/auth/conductor.go b/sdk/cliproxy/auth/conductor.go index adf1d7c7..f6d0806d 100644 --- a/sdk/cliproxy/auth/conductor.go +++ b/sdk/cliproxy/auth/conductor.go @@ -718,6 +718,7 @@ func (m *Manager) executeStreamMixedOnce(ctx context.Context, providers []string go func(streamCtx context.Context, streamAuth *Auth, streamProvider string, streamChunks <-chan cliproxyexecutor.StreamChunk) { defer close(out) var failed bool + forward := true for chunk := range streamChunks { if chunk.Err != nil && !failed { failed = true @@ -728,7 +729,18 @@ func (m *Manager) executeStreamMixedOnce(ctx context.Context, providers []string } m.MarkResult(streamCtx, Result{AuthID: streamAuth.ID, Provider: streamProvider, Model: routeModel, Success: false, Error: rerr}) } - out <- chunk + if !forward { + continue + } + if streamCtx == nil { + out <- chunk + continue + } + select { + case <-streamCtx.Done(): + forward = false + case out <- chunk: + } } if !failed { m.MarkResult(streamCtx, Result{AuthID: streamAuth.ID, Provider: streamProvider, Model: routeModel, Success: true}) diff --git a/sdk/cliproxy/service.go b/sdk/cliproxy/service.go index 1627ecbe..bccb9ec9 100644 --- a/sdk/cliproxy/service.go +++ b/sdk/cliproxy/service.go @@ -708,6 +708,10 @@ func (s *Service) registerModelsForAuth(a *coreauth.Auth) { if a == nil || a.ID == "" { return } + if a.Disabled { + GlobalModelRegistry().UnregisterClient(a.ID) + return + } authKind := strings.ToLower(strings.TrimSpace(a.Attributes["auth_kind"])) if authKind == "" { if kind, _ := a.AccountInfo(); strings.EqualFold(kind, "api_key") { diff --git a/test/thinking_conversion_test.go b/test/thinking_conversion_test.go index 3ad26ea6..fc20199e 100644 --- a/test/thinking_conversion_test.go +++ b/test/thinking_conversion_test.go @@ -2,6 +2,7 @@ package test import ( "fmt" + "strings" "testing" "time" @@ -2778,12 +2779,18 @@ func runThinkingTests(t *testing.T, cases []thinkingTestCase) { // Verify clear_thinking for iFlow GLM models when enable_thinking=true if tc.to == "iflow" && tc.expectField == "chat_template_kwargs.enable_thinking" && tc.expectValue == "true" { + baseModel := thinking.ParseSuffix(tc.model).ModelName + isGLM := strings.HasPrefix(strings.ToLower(baseModel), "glm") ctVal := gjson.GetBytes(body, "chat_template_kwargs.clear_thinking") - if !ctVal.Exists() { - t.Fatalf("expected clear_thinking field not found for GLM model, body=%s", string(body)) - } - if ctVal.Bool() != false { - t.Fatalf("clear_thinking: expected false, got %v, body=%s", ctVal.Bool(), string(body)) + if isGLM { + if !ctVal.Exists() { + t.Fatalf("expected clear_thinking field not found for GLM model, body=%s", string(body)) + } + if ctVal.Bool() != false { + t.Fatalf("clear_thinking: expected false, got %v, body=%s", ctVal.Bool(), string(body)) + } + } else if ctVal.Exists() { + t.Fatalf("expected no clear_thinking field for non-GLM enable_thinking model, body=%s", string(body)) } } })