diff --git a/config.example.yaml b/config.example.yaml index a0eacc14..3a7e7fbd 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -77,8 +77,8 @@ routing: # When true, enable authentication for the WebSocket API (/v1/ws). ws-auth: false -# When true, emit blank lines every 5s for non-streaming responses to prevent idle timeouts. -nonstream-keepalive: false +# When > 0, emit blank lines every N seconds for non-streaming responses to prevent idle timeouts. +nonstream-keepalive-interval: 0 # Streaming behavior (SSE keep-alives + safe bootstrap retries). # streaming: diff --git a/internal/config/sdk_config.go b/internal/config/sdk_config.go index 4ce35d6a..4d4abc37 100644 --- a/internal/config/sdk_config.go +++ b/internal/config/sdk_config.go @@ -26,8 +26,9 @@ type SDKConfig struct { // Streaming configures server-side streaming behavior (keep-alives and safe bootstrap retries). Streaming StreamingConfig `yaml:"streaming" json:"streaming"` - // NonStreamKeepAlive enables emitting blank lines every 5 seconds for non-streaming responses. - NonStreamKeepAlive bool `yaml:"nonstream-keepalive" json:"nonstream-keepalive"` + // NonStreamKeepAliveInterval controls how often blank lines are emitted for non-streaming responses. + // <= 0 disables keep-alives. Value is in seconds. + NonStreamKeepAliveInterval int `yaml:"nonstream-keepalive-interval,omitempty" json:"nonstream-keepalive-interval,omitempty"` } // StreamingConfig holds server streaming behavior configuration. diff --git a/internal/watcher/diff/config_diff.go b/internal/watcher/diff/config_diff.go index e21d910c..fecbc242 100644 --- a/internal/watcher/diff/config_diff.go +++ b/internal/watcher/diff/config_diff.go @@ -54,8 +54,8 @@ func BuildConfigChangeDetails(oldCfg, newCfg *config.Config) []string { if oldCfg.ForceModelPrefix != newCfg.ForceModelPrefix { changes = append(changes, fmt.Sprintf("force-model-prefix: %t -> %t", oldCfg.ForceModelPrefix, newCfg.ForceModelPrefix)) } - if oldCfg.NonStreamKeepAlive != newCfg.NonStreamKeepAlive { - changes = append(changes, fmt.Sprintf("nonstream-keepalive: %t -> %t", oldCfg.NonStreamKeepAlive, newCfg.NonStreamKeepAlive)) + if oldCfg.NonStreamKeepAliveInterval != newCfg.NonStreamKeepAliveInterval { + changes = append(changes, fmt.Sprintf("nonstream-keepalive-interval: %d -> %d", oldCfg.NonStreamKeepAliveInterval, newCfg.NonStreamKeepAliveInterval)) } // Quota-exceeded behavior diff --git a/internal/watcher/diff/config_diff_test.go b/internal/watcher/diff/config_diff_test.go index 6606c776..82486659 100644 --- a/internal/watcher/diff/config_diff_test.go +++ b/internal/watcher/diff/config_diff_test.go @@ -231,11 +231,11 @@ func TestBuildConfigChangeDetails_FlagsAndKeys(t *testing.T) { AmpCode: config.AmpCode{UpstreamAPIKey: "keep", RestrictManagementToLocalhost: false}, RemoteManagement: config.RemoteManagement{DisableControlPanel: false, PanelGitHubRepository: "old/repo", SecretKey: "keep"}, SDKConfig: sdkconfig.SDKConfig{ - RequestLog: false, - ProxyURL: "http://old-proxy", - APIKeys: []string{"key-1"}, - ForceModelPrefix: false, - NonStreamKeepAlive: false, + RequestLog: false, + ProxyURL: "http://old-proxy", + APIKeys: []string{"key-1"}, + ForceModelPrefix: false, + NonStreamKeepAliveInterval: 0, }, } newCfg := &config.Config{ @@ -268,11 +268,11 @@ func TestBuildConfigChangeDetails_FlagsAndKeys(t *testing.T) { SecretKey: "", }, SDKConfig: sdkconfig.SDKConfig{ - RequestLog: true, - ProxyURL: "http://new-proxy", - APIKeys: []string{" key-1 ", "key-2"}, - ForceModelPrefix: true, - NonStreamKeepAlive: true, + RequestLog: true, + ProxyURL: "http://new-proxy", + APIKeys: []string{" key-1 ", "key-2"}, + ForceModelPrefix: true, + NonStreamKeepAliveInterval: 5, }, } @@ -287,7 +287,7 @@ func TestBuildConfigChangeDetails_FlagsAndKeys(t *testing.T) { expectContains(t, details, "proxy-url: http://old-proxy -> http://new-proxy") expectContains(t, details, "ws-auth: false -> true") expectContains(t, details, "force-model-prefix: false -> true") - expectContains(t, details, "nonstream-keepalive: false -> true") + expectContains(t, details, "nonstream-keepalive-interval: 0 -> 5") expectContains(t, details, "quota-exceeded.switch-project: false -> true") expectContains(t, details, "quota-exceeded.switch-preview-model: false -> true") expectContains(t, details, "api-keys count: 1 -> 2") diff --git a/sdk/api/handlers/handlers.go b/sdk/api/handlers/handlers.go index 6239c20b..6160b9bd 100644 --- a/sdk/api/handlers/handlers.go +++ b/sdk/api/handlers/handlers.go @@ -49,7 +49,6 @@ const idempotencyKeyMetadataKey = "idempotency_key" const ( defaultStreamingKeepAliveSeconds = 0 defaultStreamingBootstrapRetries = 0 - nonStreamingKeepAliveInterval = 5 * time.Second ) // BuildErrorResponseBody builds an OpenAI-compatible JSON error response body. @@ -115,6 +114,19 @@ func StreamingKeepAliveInterval(cfg *config.SDKConfig) time.Duration { return time.Duration(seconds) * time.Second } +// NonStreamingKeepAliveInterval returns the keep-alive interval for non-streaming responses. +// Returning 0 disables keep-alives (default when unset). +func NonStreamingKeepAliveInterval(cfg *config.SDKConfig) time.Duration { + seconds := 0 + if cfg != nil { + seconds = cfg.NonStreamKeepAliveInterval + } + if seconds <= 0 { + return 0 + } + return time.Duration(seconds) * time.Second +} + // StreamingBootstrapRetries returns how many times a streaming request may be retried before any bytes are sent. func StreamingBootstrapRetries(cfg *config.SDKConfig) int { retries := defaultStreamingBootstrapRetries @@ -298,10 +310,11 @@ func (h *BaseAPIHandler) GetContextWithCancel(handler interfaces.APIHandler, c * // StartNonStreamingKeepAlive emits blank lines every 5 seconds while waiting for a non-streaming response. // It returns a stop function that must be called before writing the final response. func (h *BaseAPIHandler) StartNonStreamingKeepAlive(c *gin.Context, ctx context.Context) func() { - if h == nil || h.Cfg == nil || !h.Cfg.NonStreamKeepAlive { + if h == nil || c == nil { return func() {} } - if c == nil { + interval := NonStreamingKeepAliveInterval(h.Cfg) + if interval <= 0 { return func() {} } flusher, ok := c.Writer.(http.Flusher) @@ -318,7 +331,7 @@ func (h *BaseAPIHandler) StartNonStreamingKeepAlive(c *gin.Context, ctx context. wg.Add(1) go func() { defer wg.Done() - ticker := time.NewTicker(nonStreamingKeepAliveInterval) + ticker := time.NewTicker(interval) defer ticker.Stop() for { select {