Refactor websocket logging and error handling

- Introduced new logging functions for websocket requests, handshakes, errors, and responses in `logging_helpers.go`.
- Updated `CodexWebsocketsExecutor` to utilize the new logging functions for improved clarity and consistency in websocket operations.
- Modified the handling of websocket upgrade rejections to log relevant metadata.
- Changed the request body key to a timeline body key in `openai_responses_websocket.go` to better reflect its purpose.
- Enhanced tests to verify the correct logging of websocket events and responses, including disconnect events and error handling scenarios.
This commit is contained in:
hkfires
2026-04-02 17:30:51 +08:00
parent 4f99bc54f1
commit 34339f61ee
8 changed files with 911 additions and 120 deletions

View File

@@ -219,7 +219,7 @@ func (e *CodexWebsocketsExecutor) Execute(ctx context.Context, auth *cliproxyaut
}
wsReqBody := buildCodexWebsocketRequestBody(body)
helps.RecordAPIRequest(ctx, e.cfg, helps.UpstreamRequestLog{
wsReqLog := helps.UpstreamRequestLog{
URL: wsURL,
Method: "WEBSOCKET",
Headers: wsHeaders.Clone(),
@@ -229,16 +229,14 @@ func (e *CodexWebsocketsExecutor) Execute(ctx context.Context, auth *cliproxyaut
AuthLabel: authLabel,
AuthType: authType,
AuthValue: authValue,
})
}
helps.RecordAPIWebsocketRequest(ctx, e.cfg, wsReqLog)
conn, respHS, errDial := e.ensureUpstreamConn(ctx, auth, sess, authID, wsURL, wsHeaders)
if respHS != nil {
helps.RecordAPIResponseMetadata(ctx, e.cfg, respHS.StatusCode, respHS.Header.Clone())
}
if errDial != nil {
bodyErr := websocketHandshakeBody(respHS)
if len(bodyErr) > 0 {
helps.AppendAPIResponseChunk(ctx, e.cfg, bodyErr)
if respHS != nil {
helps.RecordAPIWebsocketUpgradeRejection(ctx, e.cfg, websocketUpgradeRequestLog(wsReqLog), respHS.StatusCode, respHS.Header.Clone(), bodyErr)
}
if respHS != nil && respHS.StatusCode == http.StatusUpgradeRequired {
return e.CodexExecutor.Execute(ctx, auth, req, opts)
@@ -246,9 +244,12 @@ func (e *CodexWebsocketsExecutor) Execute(ctx context.Context, auth *cliproxyaut
if respHS != nil && respHS.StatusCode > 0 {
return resp, statusErr{code: respHS.StatusCode, msg: string(bodyErr)}
}
helps.RecordAPIResponseError(ctx, e.cfg, errDial)
helps.RecordAPIWebsocketError(ctx, e.cfg, "dial", errDial)
return resp, errDial
}
if respHS != nil {
helps.RecordAPIWebsocketHandshake(ctx, e.cfg, respHS.StatusCode, respHS.Header.Clone())
}
closeHTTPResponseBody(respHS, "codex websockets executor: close handshake response body error")
if sess == nil {
logCodexWebsocketConnected(executionSessionID, authID, wsURL)
@@ -281,7 +282,7 @@ func (e *CodexWebsocketsExecutor) Execute(ctx context.Context, auth *cliproxyaut
connRetry, _, errDialRetry := e.ensureUpstreamConn(ctx, auth, sess, authID, wsURL, wsHeaders)
if errDialRetry == nil && connRetry != nil {
wsReqBodyRetry := buildCodexWebsocketRequestBody(body)
helps.RecordAPIRequest(ctx, e.cfg, helps.UpstreamRequestLog{
helps.RecordAPIWebsocketRequest(ctx, e.cfg, helps.UpstreamRequestLog{
URL: wsURL,
Method: "WEBSOCKET",
Headers: wsHeaders.Clone(),
@@ -297,15 +298,15 @@ func (e *CodexWebsocketsExecutor) Execute(ctx context.Context, auth *cliproxyaut
wsReqBody = wsReqBodyRetry
} else {
e.invalidateUpstreamConn(sess, connRetry, "send_error", errSendRetry)
helps.RecordAPIResponseError(ctx, e.cfg, errSendRetry)
helps.RecordAPIWebsocketError(ctx, e.cfg, "send_retry", errSendRetry)
return resp, errSendRetry
}
} else {
helps.RecordAPIResponseError(ctx, e.cfg, errDialRetry)
helps.RecordAPIWebsocketError(ctx, e.cfg, "dial_retry", errDialRetry)
return resp, errDialRetry
}
} else {
helps.RecordAPIResponseError(ctx, e.cfg, errSend)
helps.RecordAPIWebsocketError(ctx, e.cfg, "send", errSend)
return resp, errSend
}
}
@@ -316,7 +317,7 @@ func (e *CodexWebsocketsExecutor) Execute(ctx context.Context, auth *cliproxyaut
}
msgType, payload, errRead := readCodexWebsocketMessage(ctx, sess, conn, readCh)
if errRead != nil {
helps.RecordAPIResponseError(ctx, e.cfg, errRead)
helps.RecordAPIWebsocketError(ctx, e.cfg, "read", errRead)
return resp, errRead
}
if msgType != websocket.TextMessage {
@@ -325,7 +326,7 @@ func (e *CodexWebsocketsExecutor) Execute(ctx context.Context, auth *cliproxyaut
if sess != nil {
e.invalidateUpstreamConn(sess, conn, "unexpected_binary", err)
}
helps.RecordAPIResponseError(ctx, e.cfg, err)
helps.RecordAPIWebsocketError(ctx, e.cfg, "unexpected_binary", err)
return resp, err
}
continue
@@ -335,13 +336,13 @@ func (e *CodexWebsocketsExecutor) Execute(ctx context.Context, auth *cliproxyaut
if len(payload) == 0 {
continue
}
helps.AppendAPIResponseChunk(ctx, e.cfg, payload)
helps.AppendAPIWebsocketResponse(ctx, e.cfg, payload)
if wsErr, ok := parseCodexWebsocketError(payload); ok {
if sess != nil {
e.invalidateUpstreamConn(sess, conn, "upstream_error", wsErr)
}
helps.RecordAPIResponseError(ctx, e.cfg, wsErr)
helps.RecordAPIWebsocketError(ctx, e.cfg, "upstream_error", wsErr)
return resp, wsErr
}
@@ -413,7 +414,7 @@ func (e *CodexWebsocketsExecutor) ExecuteStream(ctx context.Context, auth *clipr
}
wsReqBody := buildCodexWebsocketRequestBody(body)
helps.RecordAPIRequest(ctx, e.cfg, helps.UpstreamRequestLog{
wsReqLog := helps.UpstreamRequestLog{
URL: wsURL,
Method: "WEBSOCKET",
Headers: wsHeaders.Clone(),
@@ -423,18 +424,18 @@ func (e *CodexWebsocketsExecutor) ExecuteStream(ctx context.Context, auth *clipr
AuthLabel: authLabel,
AuthType: authType,
AuthValue: authValue,
})
}
helps.RecordAPIWebsocketRequest(ctx, e.cfg, wsReqLog)
conn, respHS, errDial := e.ensureUpstreamConn(ctx, auth, sess, authID, wsURL, wsHeaders)
var upstreamHeaders http.Header
if respHS != nil {
upstreamHeaders = respHS.Header.Clone()
helps.RecordAPIResponseMetadata(ctx, e.cfg, respHS.StatusCode, respHS.Header.Clone())
}
if errDial != nil {
bodyErr := websocketHandshakeBody(respHS)
if len(bodyErr) > 0 {
helps.AppendAPIResponseChunk(ctx, e.cfg, bodyErr)
if respHS != nil {
helps.RecordAPIWebsocketUpgradeRejection(ctx, e.cfg, websocketUpgradeRequestLog(wsReqLog), respHS.StatusCode, respHS.Header.Clone(), bodyErr)
}
if respHS != nil && respHS.StatusCode == http.StatusUpgradeRequired {
return e.CodexExecutor.ExecuteStream(ctx, auth, req, opts)
@@ -442,12 +443,15 @@ func (e *CodexWebsocketsExecutor) ExecuteStream(ctx context.Context, auth *clipr
if respHS != nil && respHS.StatusCode > 0 {
return nil, statusErr{code: respHS.StatusCode, msg: string(bodyErr)}
}
helps.RecordAPIResponseError(ctx, e.cfg, errDial)
helps.RecordAPIWebsocketError(ctx, e.cfg, "dial", errDial)
if sess != nil {
sess.reqMu.Unlock()
}
return nil, errDial
}
if respHS != nil {
helps.RecordAPIWebsocketHandshake(ctx, e.cfg, respHS.StatusCode, respHS.Header.Clone())
}
closeHTTPResponseBody(respHS, "codex websockets executor: close handshake response body error")
if sess == nil {
@@ -461,20 +465,20 @@ func (e *CodexWebsocketsExecutor) ExecuteStream(ctx context.Context, auth *clipr
}
if errSend := writeCodexWebsocketMessage(sess, conn, wsReqBody); errSend != nil {
helps.RecordAPIResponseError(ctx, e.cfg, errSend)
helps.RecordAPIWebsocketError(ctx, e.cfg, "send", errSend)
if sess != nil {
e.invalidateUpstreamConn(sess, conn, "send_error", errSend)
// Retry once with a new websocket connection for the same execution session.
connRetry, _, errDialRetry := e.ensureUpstreamConn(ctx, auth, sess, authID, wsURL, wsHeaders)
if errDialRetry != nil || connRetry == nil {
helps.RecordAPIResponseError(ctx, e.cfg, errDialRetry)
helps.RecordAPIWebsocketError(ctx, e.cfg, "dial_retry", errDialRetry)
sess.clearActive(readCh)
sess.reqMu.Unlock()
return nil, errDialRetry
}
wsReqBodyRetry := buildCodexWebsocketRequestBody(body)
helps.RecordAPIRequest(ctx, e.cfg, helps.UpstreamRequestLog{
helps.RecordAPIWebsocketRequest(ctx, e.cfg, helps.UpstreamRequestLog{
URL: wsURL,
Method: "WEBSOCKET",
Headers: wsHeaders.Clone(),
@@ -486,7 +490,7 @@ func (e *CodexWebsocketsExecutor) ExecuteStream(ctx context.Context, auth *clipr
AuthValue: authValue,
})
if errSendRetry := writeCodexWebsocketMessage(sess, connRetry, wsReqBodyRetry); errSendRetry != nil {
helps.RecordAPIResponseError(ctx, e.cfg, errSendRetry)
helps.RecordAPIWebsocketError(ctx, e.cfg, "send_retry", errSendRetry)
e.invalidateUpstreamConn(sess, connRetry, "send_error", errSendRetry)
sess.clearActive(readCh)
sess.reqMu.Unlock()
@@ -552,7 +556,7 @@ func (e *CodexWebsocketsExecutor) ExecuteStream(ctx context.Context, auth *clipr
}
terminateReason = "read_error"
terminateErr = errRead
helps.RecordAPIResponseError(ctx, e.cfg, errRead)
helps.RecordAPIWebsocketError(ctx, e.cfg, "read", errRead)
reporter.PublishFailure(ctx)
_ = send(cliproxyexecutor.StreamChunk{Err: errRead})
return
@@ -562,7 +566,7 @@ func (e *CodexWebsocketsExecutor) ExecuteStream(ctx context.Context, auth *clipr
err = fmt.Errorf("codex websockets executor: unexpected binary message")
terminateReason = "unexpected_binary"
terminateErr = err
helps.RecordAPIResponseError(ctx, e.cfg, err)
helps.RecordAPIWebsocketError(ctx, e.cfg, "unexpected_binary", err)
reporter.PublishFailure(ctx)
if sess != nil {
e.invalidateUpstreamConn(sess, conn, "unexpected_binary", err)
@@ -577,12 +581,12 @@ func (e *CodexWebsocketsExecutor) ExecuteStream(ctx context.Context, auth *clipr
if len(payload) == 0 {
continue
}
helps.AppendAPIResponseChunk(ctx, e.cfg, payload)
helps.AppendAPIWebsocketResponse(ctx, e.cfg, payload)
if wsErr, ok := parseCodexWebsocketError(payload); ok {
terminateReason = "upstream_error"
terminateErr = wsErr
helps.RecordAPIResponseError(ctx, e.cfg, wsErr)
helps.RecordAPIWebsocketError(ctx, e.cfg, "upstream_error", wsErr)
reporter.PublishFailure(ctx)
if sess != nil {
e.invalidateUpstreamConn(sess, conn, "upstream_error", wsErr)
@@ -1022,6 +1026,24 @@ func encodeCodexWebsocketAsSSE(payload []byte) []byte {
return line
}
func websocketUpgradeRequestLog(info helps.UpstreamRequestLog) helps.UpstreamRequestLog {
upgradeInfo := info
upgradeInfo.URL = helps.WebsocketUpgradeRequestURL(info.URL)
upgradeInfo.Method = http.MethodGet
upgradeInfo.Body = nil
upgradeInfo.Headers = info.Headers.Clone()
if upgradeInfo.Headers == nil {
upgradeInfo.Headers = make(http.Header)
}
if strings.TrimSpace(upgradeInfo.Headers.Get("Connection")) == "" {
upgradeInfo.Headers.Set("Connection", "Upgrade")
}
if strings.TrimSpace(upgradeInfo.Headers.Get("Upgrade")) == "" {
upgradeInfo.Headers.Set("Upgrade", "websocket")
}
return upgradeInfo
}
func websocketHandshakeBody(resp *http.Response) []byte {
if resp == nil || resp.Body == nil {
return nil