From 6c324f2c8b1d1a90e4c31ae8c1fa01daeafea60b Mon Sep 17 00:00:00 2001 From: Luis Pater Date: Mon, 12 Jan 2026 10:40:34 +0800 Subject: [PATCH 1/5] Fixed: #936 feat(cliproxy): support multiple aliases for OAuth model mappings - Updated mapping logic to allow multiple aliases per upstream model name. - Adjusted `SanitizeOAuthModelMappings` to ensure aliases remain unique within channels. - Added test cases to validate multi-alias scenarios. - Updated example config to clarify multi-alias support. --- config.example.yaml | 1 + internal/config/config.go | 8 +-- internal/config/oauth_model_mappings_test.go | 29 +++++++++ sdk/cliproxy/service.go | 61 +++++++++---------- .../service_oauth_model_mappings_test.go | 34 +++++++++++ 5 files changed, 95 insertions(+), 38 deletions(-) diff --git a/config.example.yaml b/config.example.yaml index 332fba70..bf813433 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -202,6 +202,7 @@ ws-auth: false # These mappings rename model IDs for both model listing and request routing. # Supported channels: gemini-cli, vertex, aistudio, antigravity, claude, codex, qwen, iflow. # NOTE: Mappings do not apply to gemini-api-key, codex-api-key, claude-api-key, openai-compatibility, vertex-api-key, or ampcode. +# You can repeat the same name with different aliases to expose multiple client model names. # oauth-model-mappings: # gemini-cli: # - name: "gemini-2.5-pro" # original model name under this channel diff --git a/internal/config/config.go b/internal/config/config.go index e8ae3554..99beb481 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -521,7 +521,7 @@ func LoadConfigOptional(configFile string, optional bool) (*Config, error) { // SanitizeOAuthModelMappings normalizes and deduplicates global OAuth model name mappings. // It trims whitespace, normalizes channel keys to lower-case, drops empty entries, -// and ensures (From, To) pairs are unique within each channel. +// allows multiple aliases per upstream name, and ensures aliases are unique within each channel. func (cfg *Config) SanitizeOAuthModelMappings() { if cfg == nil || len(cfg.OAuthModelMappings) == 0 { return @@ -532,7 +532,6 @@ func (cfg *Config) SanitizeOAuthModelMappings() { if channel == "" || len(mappings) == 0 { continue } - seenName := make(map[string]struct{}, len(mappings)) seenAlias := make(map[string]struct{}, len(mappings)) clean := make([]ModelNameMapping, 0, len(mappings)) for _, mapping := range mappings { @@ -544,15 +543,10 @@ func (cfg *Config) SanitizeOAuthModelMappings() { if strings.EqualFold(name, alias) { continue } - nameKey := strings.ToLower(name) aliasKey := strings.ToLower(alias) - if _, ok := seenName[nameKey]; ok { - continue - } if _, ok := seenAlias[aliasKey]; ok { continue } - seenName[nameKey] = struct{}{} seenAlias[aliasKey] = struct{}{} clean = append(clean, ModelNameMapping{Name: name, Alias: alias, Fork: mapping.Fork}) } diff --git a/internal/config/oauth_model_mappings_test.go b/internal/config/oauth_model_mappings_test.go index 7b801a79..10bfe165 100644 --- a/internal/config/oauth_model_mappings_test.go +++ b/internal/config/oauth_model_mappings_test.go @@ -25,3 +25,32 @@ func TestSanitizeOAuthModelMappings_PreservesForkFlag(t *testing.T) { t.Fatalf("expected second mapping to be gpt-6->g6 fork=false, got name=%q alias=%q fork=%v", mappings[1].Name, mappings[1].Alias, mappings[1].Fork) } } + +func TestSanitizeOAuthModelMappings_AllowsMultipleAliasesForSameName(t *testing.T) { + cfg := &Config{ + OAuthModelMappings: map[string][]ModelNameMapping{ + "antigravity": { + {Name: "gemini-claude-opus-4-5-thinking", Alias: "claude-opus-4-5-20251101", Fork: true}, + {Name: "gemini-claude-opus-4-5-thinking", Alias: "claude-opus-4-5-20251101-thinking", Fork: true}, + {Name: "gemini-claude-opus-4-5-thinking", Alias: "claude-opus-4-5", Fork: true}, + }, + }, + } + + cfg.SanitizeOAuthModelMappings() + + mappings := cfg.OAuthModelMappings["antigravity"] + expected := []ModelNameMapping{ + {Name: "gemini-claude-opus-4-5-thinking", Alias: "claude-opus-4-5-20251101", Fork: true}, + {Name: "gemini-claude-opus-4-5-thinking", Alias: "claude-opus-4-5-20251101-thinking", Fork: true}, + {Name: "gemini-claude-opus-4-5-thinking", Alias: "claude-opus-4-5", Fork: true}, + } + if len(mappings) != len(expected) { + t.Fatalf("expected %d sanitized mappings, got %d", len(expected), len(mappings)) + } + for i, exp := range expected { + if mappings[i].Name != exp.Name || mappings[i].Alias != exp.Alias || mappings[i].Fork != exp.Fork { + t.Fatalf("expected mapping %d to be name=%q alias=%q fork=%v, got name=%q alias=%q fork=%v", i, exp.Name, exp.Alias, exp.Fork, mappings[i].Name, mappings[i].Alias, mappings[i].Fork) + } + } +} diff --git a/sdk/cliproxy/service.go b/sdk/cliproxy/service.go index b91ce015..695a77c8 100644 --- a/sdk/cliproxy/service.go +++ b/sdk/cliproxy/service.go @@ -1237,7 +1237,7 @@ func applyOAuthModelMappings(cfg *config.Config, provider, authKind string, mode fork bool } - forward := make(map[string]mappingEntry, len(mappings)) + forward := make(map[string][]mappingEntry, len(mappings)) for i := range mappings { name := strings.TrimSpace(mappings[i].Name) alias := strings.TrimSpace(mappings[i].Alias) @@ -1248,14 +1248,12 @@ func applyOAuthModelMappings(cfg *config.Config, provider, authKind string, mode continue } key := strings.ToLower(name) - if _, exists := forward[key]; exists { - continue - } - forward[key] = mappingEntry{alias: alias, fork: mappings[i].Fork} + forward[key] = append(forward[key], mappingEntry{alias: alias, fork: mappings[i].Fork}) } if len(forward) == 0 { return models } + out := make([]*ModelInfo, 0, len(models)) seen := make(map[string]struct{}, len(models)) for _, model := range models { @@ -1267,17 +1265,8 @@ func applyOAuthModelMappings(cfg *config.Config, provider, authKind string, mode continue } key := strings.ToLower(id) - entry, ok := forward[key] - if !ok { - if _, exists := seen[key]; exists { - continue - } - seen[key] = struct{}{} - out = append(out, model) - continue - } - mappedID := strings.TrimSpace(entry.alias) - if mappedID == "" { + entries := forward[key] + if len(entries) == 0 { if _, exists := seen[key]; exists { continue } @@ -1286,11 +1275,29 @@ func applyOAuthModelMappings(cfg *config.Config, provider, authKind string, mode continue } - if entry.fork { + keepOriginal := false + for _, entry := range entries { + if entry.fork { + keepOriginal = true + break + } + } + if keepOriginal { if _, exists := seen[key]; !exists { seen[key] = struct{}{} out = append(out, model) } + } + + addedAlias := false + for _, entry := range entries { + mappedID := strings.TrimSpace(entry.alias) + if mappedID == "" { + continue + } + if strings.EqualFold(mappedID, id) { + continue + } aliasKey := strings.ToLower(mappedID) if _, exists := seen[aliasKey]; exists { continue @@ -1302,24 +1309,16 @@ func applyOAuthModelMappings(cfg *config.Config, provider, authKind string, mode clone.Name = rewriteModelInfoName(clone.Name, id, mappedID) } out = append(out, &clone) - continue + addedAlias = true } - uniqueKey := strings.ToLower(mappedID) - if _, exists := seen[uniqueKey]; exists { - continue - } - seen[uniqueKey] = struct{}{} - if mappedID == id { + if !keepOriginal && !addedAlias { + if _, exists := seen[key]; exists { + continue + } + seen[key] = struct{}{} out = append(out, model) - continue } - clone := *model - clone.ID = mappedID - if clone.Name != "" { - clone.Name = rewriteModelInfoName(clone.Name, id, mappedID) - } - out = append(out, &clone) } return out } diff --git a/sdk/cliproxy/service_oauth_model_mappings_test.go b/sdk/cliproxy/service_oauth_model_mappings_test.go index 7d8da08a..ca9ff35a 100644 --- a/sdk/cliproxy/service_oauth_model_mappings_test.go +++ b/sdk/cliproxy/service_oauth_model_mappings_test.go @@ -56,3 +56,37 @@ func TestApplyOAuthModelMappings_ForkAddsAlias(t *testing.T) { t.Fatalf("expected forked model name %q, got %q", "models/g5", out[1].Name) } } + +func TestApplyOAuthModelMappings_ForkAddsMultipleAliases(t *testing.T) { + cfg := &config.Config{ + OAuthModelMappings: map[string][]config.ModelNameMapping{ + "codex": { + {Name: "gpt-5", Alias: "g5", Fork: true}, + {Name: "gpt-5", Alias: "g5-2", Fork: true}, + }, + }, + } + models := []*ModelInfo{ + {ID: "gpt-5", Name: "models/gpt-5"}, + } + + out := applyOAuthModelMappings(cfg, "codex", "oauth", models) + if len(out) != 3 { + t.Fatalf("expected 3 models, got %d", len(out)) + } + if out[0].ID != "gpt-5" { + t.Fatalf("expected first model id %q, got %q", "gpt-5", out[0].ID) + } + if out[1].ID != "g5" { + t.Fatalf("expected second model id %q, got %q", "g5", out[1].ID) + } + if out[1].Name != "models/g5" { + t.Fatalf("expected forked model name %q, got %q", "models/g5", out[1].Name) + } + if out[2].ID != "g5-2" { + t.Fatalf("expected third model id %q, got %q", "g5-2", out[2].ID) + } + if out[2].Name != "models/g5-2" { + t.Fatalf("expected forked model name %q, got %q", "models/g5-2", out[2].Name) + } +} From 94e979865eada0cf9b34ca7b099e421044f5f596 Mon Sep 17 00:00:00 2001 From: Luis Pater Date: Mon, 12 Jan 2026 10:46:47 +0800 Subject: [PATCH 2/5] Fixed: #897 refactor(executor): remove `prompt_cache_retention` from request payloads --- internal/runtime/executor/codex_executor.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/internal/runtime/executor/codex_executor.go b/internal/runtime/executor/codex_executor.go index e8dc3f8b..2f4c6295 100644 --- a/internal/runtime/executor/codex_executor.go +++ b/internal/runtime/executor/codex_executor.go @@ -106,6 +106,7 @@ func (e *CodexExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, re body, _ = sjson.SetBytes(body, "model", model) body, _ = sjson.SetBytes(body, "stream", true) body, _ = sjson.DeleteBytes(body, "previous_response_id") + body, _ = sjson.DeleteBytes(body, "prompt_cache_retention") url := strings.TrimSuffix(baseURL, "/") + "/responses" httpReq, err := e.cacheHelper(ctx, from, url, req, body) @@ -214,6 +215,7 @@ func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au } body = applyPayloadConfigWithRoot(e.cfg, model, to.String(), "", body, originalTranslated) body, _ = sjson.DeleteBytes(body, "previous_response_id") + body, _ = sjson.DeleteBytes(body, "prompt_cache_retention") body, _ = sjson.SetBytes(body, "model", model) url := strings.TrimSuffix(baseURL, "/") + "/responses" @@ -316,6 +318,7 @@ func (e *CodexExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth body = ApplyReasoningEffortMetadata(body, req.Metadata, model, "reasoning.effort", false) body, _ = sjson.SetBytes(body, "model", model) body, _ = sjson.DeleteBytes(body, "previous_response_id") + body, _ = sjson.DeleteBytes(body, "prompt_cache_retention") body, _ = sjson.SetBytes(body, "stream", false) enc, err := tokenizerForCodexModel(model) From 21ac161b21c759e624a3ca07533f9eb426d34347 Mon Sep 17 00:00:00 2001 From: hkfires <10558748+hkfires@users.noreply.github.com> Date: Mon, 12 Jan 2026 16:33:43 +0800 Subject: [PATCH 3/5] fix(test): implement missing HttpRequest method in stream bootstrap mock --- sdk/api/handlers/handlers_stream_bootstrap_test.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/sdk/api/handlers/handlers_stream_bootstrap_test.go b/sdk/api/handlers/handlers_stream_bootstrap_test.go index f8ce6aea..3851746d 100644 --- a/sdk/api/handlers/handlers_stream_bootstrap_test.go +++ b/sdk/api/handlers/handlers_stream_bootstrap_test.go @@ -56,6 +56,14 @@ func (e *failOnceStreamExecutor) CountTokens(context.Context, *coreauth.Auth, co return coreexecutor.Response{}, &coreauth.Error{Code: "not_implemented", Message: "CountTokens not implemented"} } +func (e *failOnceStreamExecutor) HttpRequest(ctx context.Context, auth *coreauth.Auth, req *http.Request) (*http.Response, error) { + return nil, &coreauth.Error{ + Code: "not_implemented", + Message: "HttpRequest not implemented", + HTTPStatus: http.StatusNotImplemented, + } +} + func (e *failOnceStreamExecutor) Calls() int { e.mu.Lock() defer e.mu.Unlock() From b1b379ea18cf10a98e87e8c9c261c8ff0eaa4df7 Mon Sep 17 00:00:00 2001 From: Luis Pater Date: Tue, 13 Jan 2026 02:36:07 +0800 Subject: [PATCH 4/5] feat(api): add non-streaming keep-alive support for idle timeout prevention - Introduced `StartNonStreamingKeepAlive` to emit periodic blank lines during non-streaming responses. - Added `nonstream-keepalive` configuration option in `SDKConfig`. - Updated handlers to utilize `StartNonStreamingKeepAlive` and ensure proper cleanup. - Extended config diff and tests to include `nonstream-keepalive` changes. --- config.example.yaml | 3 ++ internal/config/sdk_config.go | 3 ++ internal/watcher/diff/config_diff.go | 3 ++ internal/watcher/diff/config_diff_test.go | 19 ++++---- sdk/api/handlers/claude/code_handlers.go | 19 +++++--- sdk/api/handlers/gemini/gemini_handlers.go | 2 + sdk/api/handlers/handlers.go | 48 +++++++++++++++++++ sdk/api/handlers/openai/openai_handlers.go | 2 + .../openai/openai_responses_handlers.go | 11 ++--- 9 files changed, 89 insertions(+), 21 deletions(-) diff --git a/config.example.yaml b/config.example.yaml index bf813433..a0eacc14 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -77,6 +77,9 @@ routing: # When true, enable authentication for the WebSocket API (/v1/ws). ws-auth: false +# When true, emit blank lines every 5s for non-streaming responses to prevent idle timeouts. +nonstream-keepalive: false + # Streaming behavior (SSE keep-alives + safe bootstrap retries). # streaming: # keepalive-seconds: 15 # Default: 0 (disabled). <= 0 disables keep-alives. diff --git a/internal/config/sdk_config.go b/internal/config/sdk_config.go index 596cbb2c..4ce35d6a 100644 --- a/internal/config/sdk_config.go +++ b/internal/config/sdk_config.go @@ -25,6 +25,9 @@ type SDKConfig struct { // Streaming configures server-side streaming behavior (keep-alives and safe bootstrap retries). Streaming StreamingConfig `yaml:"streaming" json:"streaming"` + + // NonStreamKeepAlive enables emitting blank lines every 5 seconds for non-streaming responses. + NonStreamKeepAlive bool `yaml:"nonstream-keepalive" json:"nonstream-keepalive"` } // StreamingConfig holds server streaming behavior configuration. diff --git a/internal/watcher/diff/config_diff.go b/internal/watcher/diff/config_diff.go index e24fc893..e21d910c 100644 --- a/internal/watcher/diff/config_diff.go +++ b/internal/watcher/diff/config_diff.go @@ -54,6 +54,9 @@ func BuildConfigChangeDetails(oldCfg, newCfg *config.Config) []string { if oldCfg.ForceModelPrefix != newCfg.ForceModelPrefix { changes = append(changes, fmt.Sprintf("force-model-prefix: %t -> %t", oldCfg.ForceModelPrefix, newCfg.ForceModelPrefix)) } + if oldCfg.NonStreamKeepAlive != newCfg.NonStreamKeepAlive { + changes = append(changes, fmt.Sprintf("nonstream-keepalive: %t -> %t", oldCfg.NonStreamKeepAlive, newCfg.NonStreamKeepAlive)) + } // Quota-exceeded behavior if oldCfg.QuotaExceeded.SwitchProject != newCfg.QuotaExceeded.SwitchProject { diff --git a/internal/watcher/diff/config_diff_test.go b/internal/watcher/diff/config_diff_test.go index 6848f1d5..6606c776 100644 --- a/internal/watcher/diff/config_diff_test.go +++ b/internal/watcher/diff/config_diff_test.go @@ -231,10 +231,11 @@ func TestBuildConfigChangeDetails_FlagsAndKeys(t *testing.T) { AmpCode: config.AmpCode{UpstreamAPIKey: "keep", RestrictManagementToLocalhost: false}, RemoteManagement: config.RemoteManagement{DisableControlPanel: false, PanelGitHubRepository: "old/repo", SecretKey: "keep"}, SDKConfig: sdkconfig.SDKConfig{ - RequestLog: false, - ProxyURL: "http://old-proxy", - APIKeys: []string{"key-1"}, - ForceModelPrefix: false, + RequestLog: false, + ProxyURL: "http://old-proxy", + APIKeys: []string{"key-1"}, + ForceModelPrefix: false, + NonStreamKeepAlive: false, }, } newCfg := &config.Config{ @@ -267,10 +268,11 @@ func TestBuildConfigChangeDetails_FlagsAndKeys(t *testing.T) { SecretKey: "", }, SDKConfig: sdkconfig.SDKConfig{ - RequestLog: true, - ProxyURL: "http://new-proxy", - APIKeys: []string{" key-1 ", "key-2"}, - ForceModelPrefix: true, + RequestLog: true, + ProxyURL: "http://new-proxy", + APIKeys: []string{" key-1 ", "key-2"}, + ForceModelPrefix: true, + NonStreamKeepAlive: true, }, } @@ -285,6 +287,7 @@ func TestBuildConfigChangeDetails_FlagsAndKeys(t *testing.T) { expectContains(t, details, "proxy-url: http://old-proxy -> http://new-proxy") expectContains(t, details, "ws-auth: false -> true") expectContains(t, details, "force-model-prefix: false -> true") + expectContains(t, details, "nonstream-keepalive: false -> true") expectContains(t, details, "quota-exceeded.switch-project: false -> true") expectContains(t, details, "quota-exceeded.switch-preview-model: false -> true") expectContains(t, details, "api-keys count: 1 -> 2") diff --git a/sdk/api/handlers/claude/code_handlers.go b/sdk/api/handlers/claude/code_handlers.go index 6554cc9a..30ff228d 100644 --- a/sdk/api/handlers/claude/code_handlers.go +++ b/sdk/api/handlers/claude/code_handlers.go @@ -146,10 +146,12 @@ func (h *ClaudeCodeAPIHandler) handleNonStreamingResponse(c *gin.Context, rawJSO c.Header("Content-Type", "application/json") alt := h.GetAlt(c) cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background()) + stopKeepAlive := h.StartNonStreamingKeepAlive(c, cliCtx) modelName := gjson.GetBytes(rawJSON, "model").String() resp, errMsg := h.ExecuteWithAuthManager(cliCtx, h.HandlerType(), modelName, rawJSON, alt) + stopKeepAlive() if errMsg != nil { h.WriteErrorResponse(c, errMsg) cliCancel(errMsg.Error) @@ -159,13 +161,18 @@ func (h *ClaudeCodeAPIHandler) handleNonStreamingResponse(c *gin.Context, rawJSO // Decompress gzipped responses - Claude API sometimes returns gzip without Content-Encoding header // This fixes title generation and other non-streaming responses that arrive compressed if len(resp) >= 2 && resp[0] == 0x1f && resp[1] == 0x8b { - gzReader, err := gzip.NewReader(bytes.NewReader(resp)) - if err != nil { - log.Warnf("failed to decompress gzipped Claude response: %v", err) + gzReader, errGzip := gzip.NewReader(bytes.NewReader(resp)) + if errGzip != nil { + log.Warnf("failed to decompress gzipped Claude response: %v", errGzip) } else { - defer gzReader.Close() - if decompressed, err := io.ReadAll(gzReader); err != nil { - log.Warnf("failed to read decompressed Claude response: %v", err) + defer func() { + if errClose := gzReader.Close(); errClose != nil { + log.Warnf("failed to close Claude gzip reader: %v", errClose) + } + }() + decompressed, errRead := io.ReadAll(gzReader) + if errRead != nil { + log.Warnf("failed to read decompressed Claude response: %v", errRead) } else { resp = decompressed } diff --git a/sdk/api/handlers/gemini/gemini_handlers.go b/sdk/api/handlers/gemini/gemini_handlers.go index 2b17a9f2..f2bdb058 100644 --- a/sdk/api/handlers/gemini/gemini_handlers.go +++ b/sdk/api/handlers/gemini/gemini_handlers.go @@ -336,7 +336,9 @@ func (h *GeminiAPIHandler) handleGenerateContent(c *gin.Context, modelName strin c.Header("Content-Type", "application/json") alt := h.GetAlt(c) cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background()) + stopKeepAlive := h.StartNonStreamingKeepAlive(c, cliCtx) resp, errMsg := h.ExecuteWithAuthManager(cliCtx, h.HandlerType(), modelName, rawJSON, alt) + stopKeepAlive() if errMsg != nil { h.WriteErrorResponse(c, errMsg) cliCancel(errMsg.Error) diff --git a/sdk/api/handlers/handlers.go b/sdk/api/handlers/handlers.go index 5a24c63a..6239c20b 100644 --- a/sdk/api/handlers/handlers.go +++ b/sdk/api/handlers/handlers.go @@ -9,6 +9,7 @@ import ( "fmt" "net/http" "strings" + "sync" "time" "github.com/gin-gonic/gin" @@ -48,6 +49,7 @@ const idempotencyKeyMetadataKey = "idempotency_key" const ( defaultStreamingKeepAliveSeconds = 0 defaultStreamingBootstrapRetries = 0 + nonStreamingKeepAliveInterval = 5 * time.Second ) // BuildErrorResponseBody builds an OpenAI-compatible JSON error response body. @@ -293,6 +295,52 @@ func (h *BaseAPIHandler) GetContextWithCancel(handler interfaces.APIHandler, c * } } +// StartNonStreamingKeepAlive emits blank lines every 5 seconds while waiting for a non-streaming response. +// It returns a stop function that must be called before writing the final response. +func (h *BaseAPIHandler) StartNonStreamingKeepAlive(c *gin.Context, ctx context.Context) func() { + if h == nil || h.Cfg == nil || !h.Cfg.NonStreamKeepAlive { + return func() {} + } + if c == nil { + return func() {} + } + flusher, ok := c.Writer.(http.Flusher) + if !ok { + return func() {} + } + if ctx == nil { + ctx = context.Background() + } + + stopChan := make(chan struct{}) + var stopOnce sync.Once + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + ticker := time.NewTicker(nonStreamingKeepAliveInterval) + defer ticker.Stop() + for { + select { + case <-stopChan: + return + case <-ctx.Done(): + return + case <-ticker.C: + _, _ = c.Writer.Write([]byte("\n")) + flusher.Flush() + } + } + }() + + return func() { + stopOnce.Do(func() { + close(stopChan) + }) + wg.Wait() + } +} + // appendAPIResponse preserves any previously captured API response and appends new data. func appendAPIResponse(c *gin.Context, data []byte) { if c == nil || len(data) == 0 { diff --git a/sdk/api/handlers/openai/openai_handlers.go b/sdk/api/handlers/openai/openai_handlers.go index 65936be7..09471ce1 100644 --- a/sdk/api/handlers/openai/openai_handlers.go +++ b/sdk/api/handlers/openai/openai_handlers.go @@ -524,7 +524,9 @@ func (h *OpenAIAPIHandler) handleCompletionsNonStreamingResponse(c *gin.Context, modelName := gjson.GetBytes(chatCompletionsJSON, "model").String() cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background()) + stopKeepAlive := h.StartNonStreamingKeepAlive(c, cliCtx) resp, errMsg := h.ExecuteWithAuthManager(cliCtx, h.HandlerType(), modelName, chatCompletionsJSON, "") + stopKeepAlive() if errMsg != nil { h.WriteErrorResponse(c, errMsg) cliCancel(errMsg.Error) diff --git a/sdk/api/handlers/openai/openai_responses_handlers.go b/sdk/api/handlers/openai/openai_responses_handlers.go index b6d7c8f2..31099f81 100644 --- a/sdk/api/handlers/openai/openai_responses_handlers.go +++ b/sdk/api/handlers/openai/openai_responses_handlers.go @@ -103,20 +103,17 @@ func (h *OpenAIResponsesAPIHandler) handleNonStreamingResponse(c *gin.Context, r modelName := gjson.GetBytes(rawJSON, "model").String() cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background()) - defer func() { - cliCancel() - }() + stopKeepAlive := h.StartNonStreamingKeepAlive(c, cliCtx) resp, errMsg := h.ExecuteWithAuthManager(cliCtx, h.HandlerType(), modelName, rawJSON, "") + stopKeepAlive() if errMsg != nil { h.WriteErrorResponse(c, errMsg) + cliCancel(errMsg.Error) return } _, _ = c.Writer.Write(resp) - return - - // no legacy fallback - + cliCancel() } // handleStreamingResponse handles streaming responses for Gemini models. From 43652d044c5b84117aeaef90390a967e4ee29970 Mon Sep 17 00:00:00 2001 From: Luis Pater Date: Tue, 13 Jan 2026 03:14:38 +0800 Subject: [PATCH 5/5] refactor(config): replace `nonstream-keepalive` with `nonstream-keepalive-interval` - Updated `SDKConfig` to use `nonstream-keepalive-interval` (seconds) instead of the boolean `nonstream-keepalive`. - Refactored handlers and logic to incorporate the new interval-based configuration. - Updated config diff, tests, and example YAML to reflect the changes. --- config.example.yaml | 4 ++-- internal/config/sdk_config.go | 5 +++-- internal/watcher/diff/config_diff.go | 4 ++-- internal/watcher/diff/config_diff_test.go | 22 +++++++++++----------- sdk/api/handlers/handlers.go | 21 +++++++++++++++++---- 5 files changed, 35 insertions(+), 21 deletions(-) diff --git a/config.example.yaml b/config.example.yaml index a0eacc14..3a7e7fbd 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -77,8 +77,8 @@ routing: # When true, enable authentication for the WebSocket API (/v1/ws). ws-auth: false -# When true, emit blank lines every 5s for non-streaming responses to prevent idle timeouts. -nonstream-keepalive: false +# When > 0, emit blank lines every N seconds for non-streaming responses to prevent idle timeouts. +nonstream-keepalive-interval: 0 # Streaming behavior (SSE keep-alives + safe bootstrap retries). # streaming: diff --git a/internal/config/sdk_config.go b/internal/config/sdk_config.go index 4ce35d6a..4d4abc37 100644 --- a/internal/config/sdk_config.go +++ b/internal/config/sdk_config.go @@ -26,8 +26,9 @@ type SDKConfig struct { // Streaming configures server-side streaming behavior (keep-alives and safe bootstrap retries). Streaming StreamingConfig `yaml:"streaming" json:"streaming"` - // NonStreamKeepAlive enables emitting blank lines every 5 seconds for non-streaming responses. - NonStreamKeepAlive bool `yaml:"nonstream-keepalive" json:"nonstream-keepalive"` + // NonStreamKeepAliveInterval controls how often blank lines are emitted for non-streaming responses. + // <= 0 disables keep-alives. Value is in seconds. + NonStreamKeepAliveInterval int `yaml:"nonstream-keepalive-interval,omitempty" json:"nonstream-keepalive-interval,omitempty"` } // StreamingConfig holds server streaming behavior configuration. diff --git a/internal/watcher/diff/config_diff.go b/internal/watcher/diff/config_diff.go index e21d910c..fecbc242 100644 --- a/internal/watcher/diff/config_diff.go +++ b/internal/watcher/diff/config_diff.go @@ -54,8 +54,8 @@ func BuildConfigChangeDetails(oldCfg, newCfg *config.Config) []string { if oldCfg.ForceModelPrefix != newCfg.ForceModelPrefix { changes = append(changes, fmt.Sprintf("force-model-prefix: %t -> %t", oldCfg.ForceModelPrefix, newCfg.ForceModelPrefix)) } - if oldCfg.NonStreamKeepAlive != newCfg.NonStreamKeepAlive { - changes = append(changes, fmt.Sprintf("nonstream-keepalive: %t -> %t", oldCfg.NonStreamKeepAlive, newCfg.NonStreamKeepAlive)) + if oldCfg.NonStreamKeepAliveInterval != newCfg.NonStreamKeepAliveInterval { + changes = append(changes, fmt.Sprintf("nonstream-keepalive-interval: %d -> %d", oldCfg.NonStreamKeepAliveInterval, newCfg.NonStreamKeepAliveInterval)) } // Quota-exceeded behavior diff --git a/internal/watcher/diff/config_diff_test.go b/internal/watcher/diff/config_diff_test.go index 6606c776..82486659 100644 --- a/internal/watcher/diff/config_diff_test.go +++ b/internal/watcher/diff/config_diff_test.go @@ -231,11 +231,11 @@ func TestBuildConfigChangeDetails_FlagsAndKeys(t *testing.T) { AmpCode: config.AmpCode{UpstreamAPIKey: "keep", RestrictManagementToLocalhost: false}, RemoteManagement: config.RemoteManagement{DisableControlPanel: false, PanelGitHubRepository: "old/repo", SecretKey: "keep"}, SDKConfig: sdkconfig.SDKConfig{ - RequestLog: false, - ProxyURL: "http://old-proxy", - APIKeys: []string{"key-1"}, - ForceModelPrefix: false, - NonStreamKeepAlive: false, + RequestLog: false, + ProxyURL: "http://old-proxy", + APIKeys: []string{"key-1"}, + ForceModelPrefix: false, + NonStreamKeepAliveInterval: 0, }, } newCfg := &config.Config{ @@ -268,11 +268,11 @@ func TestBuildConfigChangeDetails_FlagsAndKeys(t *testing.T) { SecretKey: "", }, SDKConfig: sdkconfig.SDKConfig{ - RequestLog: true, - ProxyURL: "http://new-proxy", - APIKeys: []string{" key-1 ", "key-2"}, - ForceModelPrefix: true, - NonStreamKeepAlive: true, + RequestLog: true, + ProxyURL: "http://new-proxy", + APIKeys: []string{" key-1 ", "key-2"}, + ForceModelPrefix: true, + NonStreamKeepAliveInterval: 5, }, } @@ -287,7 +287,7 @@ func TestBuildConfigChangeDetails_FlagsAndKeys(t *testing.T) { expectContains(t, details, "proxy-url: http://old-proxy -> http://new-proxy") expectContains(t, details, "ws-auth: false -> true") expectContains(t, details, "force-model-prefix: false -> true") - expectContains(t, details, "nonstream-keepalive: false -> true") + expectContains(t, details, "nonstream-keepalive-interval: 0 -> 5") expectContains(t, details, "quota-exceeded.switch-project: false -> true") expectContains(t, details, "quota-exceeded.switch-preview-model: false -> true") expectContains(t, details, "api-keys count: 1 -> 2") diff --git a/sdk/api/handlers/handlers.go b/sdk/api/handlers/handlers.go index 6239c20b..6160b9bd 100644 --- a/sdk/api/handlers/handlers.go +++ b/sdk/api/handlers/handlers.go @@ -49,7 +49,6 @@ const idempotencyKeyMetadataKey = "idempotency_key" const ( defaultStreamingKeepAliveSeconds = 0 defaultStreamingBootstrapRetries = 0 - nonStreamingKeepAliveInterval = 5 * time.Second ) // BuildErrorResponseBody builds an OpenAI-compatible JSON error response body. @@ -115,6 +114,19 @@ func StreamingKeepAliveInterval(cfg *config.SDKConfig) time.Duration { return time.Duration(seconds) * time.Second } +// NonStreamingKeepAliveInterval returns the keep-alive interval for non-streaming responses. +// Returning 0 disables keep-alives (default when unset). +func NonStreamingKeepAliveInterval(cfg *config.SDKConfig) time.Duration { + seconds := 0 + if cfg != nil { + seconds = cfg.NonStreamKeepAliveInterval + } + if seconds <= 0 { + return 0 + } + return time.Duration(seconds) * time.Second +} + // StreamingBootstrapRetries returns how many times a streaming request may be retried before any bytes are sent. func StreamingBootstrapRetries(cfg *config.SDKConfig) int { retries := defaultStreamingBootstrapRetries @@ -298,10 +310,11 @@ func (h *BaseAPIHandler) GetContextWithCancel(handler interfaces.APIHandler, c * // StartNonStreamingKeepAlive emits blank lines every 5 seconds while waiting for a non-streaming response. // It returns a stop function that must be called before writing the final response. func (h *BaseAPIHandler) StartNonStreamingKeepAlive(c *gin.Context, ctx context.Context) func() { - if h == nil || h.Cfg == nil || !h.Cfg.NonStreamKeepAlive { + if h == nil || c == nil { return func() {} } - if c == nil { + interval := NonStreamingKeepAliveInterval(h.Cfg) + if interval <= 0 { return func() {} } flusher, ok := c.Writer.(http.Flusher) @@ -318,7 +331,7 @@ func (h *BaseAPIHandler) StartNonStreamingKeepAlive(c *gin.Context, ctx context. wg.Add(1) go func() { defer wg.Done() - ticker := time.NewTicker(nonStreamingKeepAliveInterval) + ticker := time.NewTicker(interval) defer ticker.Stop() for { select {