refactor(executor, handlers): replace channel-based streams with StreamResult for consistency

- Updated `ExecuteStream` functions in executors to use `StreamResult` instead of channels.
- Enhanced upstream header handling in OpenAI handlers.
- Improved maintainability and alignment across executors and handlers.
This commit is contained in:
Luis Pater
2026-02-19 22:07:14 +08:00
parent ca2174ea48
commit 2b8c466e88
5 changed files with 37 additions and 20 deletions

View File

@@ -535,12 +535,13 @@ func (h *OpenAIAPIHandler) handleNonStreamingResponseViaResponses(c *gin.Context
modelName := gjson.GetBytes(rawJSON, "model").String()
cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background())
resp, errMsg := h.ExecuteWithAuthManager(cliCtx, OpenaiResponse, modelName, rawJSON, h.GetAlt(c))
resp, upstreamHeaders, errMsg := h.ExecuteWithAuthManager(cliCtx, OpenaiResponse, modelName, rawJSON, h.GetAlt(c))
if errMsg != nil {
h.WriteErrorResponse(c, errMsg)
cliCancel(errMsg.Error)
return
}
handlers.WriteUpstreamHeaders(c.Writer.Header(), upstreamHeaders)
converted := convertResponsesObjectToChatCompletion(cliCtx, modelName, originalChatJSON, rawJSON, resp)
if converted == nil {
h.WriteErrorResponse(c, &interfaces.ErrorMessage{
@@ -644,7 +645,7 @@ func (h *OpenAIAPIHandler) handleStreamingResponseViaResponses(c *gin.Context, r
modelName := gjson.GetBytes(rawJSON, "model").String()
cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background())
dataChan, errChan := h.ExecuteStreamWithAuthManager(cliCtx, OpenaiResponse, modelName, rawJSON, h.GetAlt(c))
dataChan, upstreamHeaders, errChan := h.ExecuteStreamWithAuthManager(cliCtx, OpenaiResponse, modelName, rawJSON, h.GetAlt(c))
var param any
setSSEHeaders := func() {
@@ -675,6 +676,7 @@ func (h *OpenAIAPIHandler) handleStreamingResponseViaResponses(c *gin.Context, r
case chunk, ok := <-dataChan:
if !ok {
setSSEHeaders()
handlers.WriteUpstreamHeaders(c.Writer.Header(), upstreamHeaders)
_, _ = fmt.Fprintf(c.Writer, "data: [DONE]\n\n")
flusher.Flush()
cliCancel(nil)
@@ -682,6 +684,7 @@ func (h *OpenAIAPIHandler) handleStreamingResponseViaResponses(c *gin.Context, r
}
setSSEHeaders()
handlers.WriteUpstreamHeaders(c.Writer.Header(), upstreamHeaders)
writeConvertedResponsesChunk(c, cliCtx, modelName, originalChatJSON, rawJSON, chunk, &param)
flusher.Flush()

View File

@@ -182,12 +182,13 @@ func (h *OpenAIResponsesAPIHandler) handleNonStreamingResponseViaChat(c *gin.Con
modelName := gjson.GetBytes(chatJSON, "model").String()
cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background())
resp, errMsg := h.ExecuteWithAuthManager(cliCtx, OpenAI, modelName, chatJSON, "")
resp, upstreamHeaders, errMsg := h.ExecuteWithAuthManager(cliCtx, OpenAI, modelName, chatJSON, "")
if errMsg != nil {
h.WriteErrorResponse(c, errMsg)
cliCancel(errMsg.Error)
return
}
handlers.WriteUpstreamHeaders(c.Writer.Header(), upstreamHeaders)
var param any
converted := responsesconverter.ConvertOpenAIChatCompletionsResponseToOpenAIResponsesNonStream(cliCtx, modelName, originalResponsesJSON, originalResponsesJSON, resp, &param)
if converted == "" {
@@ -298,7 +299,7 @@ func (h *OpenAIResponsesAPIHandler) handleStreamingResponseViaChat(c *gin.Contex
modelName := gjson.GetBytes(chatJSON, "model").String()
cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background())
dataChan, errChan := h.ExecuteStreamWithAuthManager(cliCtx, OpenAI, modelName, chatJSON, "")
dataChan, upstreamHeaders, errChan := h.ExecuteStreamWithAuthManager(cliCtx, OpenAI, modelName, chatJSON, "")
var param any
setSSEHeaders := func() {
@@ -328,6 +329,7 @@ func (h *OpenAIResponsesAPIHandler) handleStreamingResponseViaChat(c *gin.Contex
case chunk, ok := <-dataChan:
if !ok {
setSSEHeaders()
handlers.WriteUpstreamHeaders(c.Writer.Header(), upstreamHeaders)
_, _ = c.Writer.Write([]byte("\n"))
flusher.Flush()
cliCancel(nil)
@@ -335,6 +337,7 @@ func (h *OpenAIResponsesAPIHandler) handleStreamingResponseViaChat(c *gin.Contex
}
setSSEHeaders()
handlers.WriteUpstreamHeaders(c.Writer.Header(), upstreamHeaders)
writeChatAsResponsesChunk(c, cliCtx, modelName, originalResponsesJSON, chunk, &param)
flusher.Flush()