From 148e5467b546ff69f581f11b770e0d163b60c98b Mon Sep 17 00:00:00 2001 From: Rudra Date: Tue, 12 May 2026 14:44:01 +0100 Subject: [PATCH] fix: align Telnyx stream lifecycle parsing with carrier docs Two parsing bugs caught by Codex review on the Telnyx Media Streaming PR: Lifecycle webhook event names had a stray `call.` prefix that never matched Telnyx's documented event types. Telnyx surfaces stream lifecycle as `streaming.started` and `streaming.stopped` (no prefix); stream errors arrive as `{event:"error"}` JSON frames over the WebSocket, not as carrier webhooks. Drop the bogus `call.streaming.failed` case from the webhook parser and add a new `error` frame kind to the StreamFrameAdapter union so the realtime handler can log failures instead of silently dropping them. Telnyx WebSocket frames carry `stream_id` at the top level of the envelope and `call_control_id` inside the `start` object; the Telnyx adapter was reading `start.stream_id` (always undefined) and defaulting `providerCallId` to the constructor-supplied value regardless of what the carrier sent. Read both fields from the documented locations and fall back to the constructor providerCallId only when the carrier frame omits them. Tests updated to reflect the carrier-documented frame shapes; new fixture covers `{event:"error"}` round-trip through the adapter. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../voice-call/src/providers/telnyx.test.ts | 38 ++--------- extensions/voice-call/src/providers/telnyx.ts | 24 +++---- .../src/webhook/realtime-handler.ts | 8 +++ .../src/webhook/stream-frame-adapter.test.ts | 65 +++++++++++++++---- .../src/webhook/stream-frame-adapter.ts | 43 +++++++++--- 5 files changed, 109 insertions(+), 69 deletions(-) diff --git a/extensions/voice-call/src/providers/telnyx.test.ts b/extensions/voice-call/src/providers/telnyx.test.ts index 7e68617b1d3..b328765adc7 100644 --- a/extensions/voice-call/src/providers/telnyx.test.ts +++ b/extensions/voice-call/src/providers/telnyx.test.ts @@ -404,42 +404,16 @@ describe("TelnyxProvider Media Streaming (PCMU)", () => { expect(body.stream_auth_token).toBe("token-xyz"); }); - it("normalizes call.streaming.failed into a non-retryable call.error event", () => { + it("silently acknowledges streaming.started and streaming.stopped webhooks", () => { const provider = new TelnyxProvider( { apiKey: "KEY123", connectionId: "CONN456", publicKey: undefined }, { skipVerification: true }, ); - const rawBody = JSON.stringify({ - data: { - event_type: "call.streaming.failed", - id: "evt-1", - payload: { - call_control_id: "call-control-1", - client_state: Buffer.from("call-1").toString("base64"), - reason: "destination_unreachable", - }, - }, - }); - const result = provider.parseWebhookEvent(createCtx({ rawBody }), { - verifiedRequestKey: "key-1", - }); - expect(result.events).toHaveLength(1); - const event = result.events[0]; - if (event?.type !== "call.error") { - throw new Error(`expected call.error, got ${event?.type}`); - } - expect(event.error).toBe("Telnyx streaming failed: destination_unreachable"); - expect(event.retryable).toBe(false); - expect(event.callId).toBe("call-1"); - expect(event.providerCallId).toBe("call-control-1"); - }); - - it("silently acknowledges call.streaming.started and call.streaming.stopped", () => { - const provider = new TelnyxProvider( - { apiKey: "KEY123", connectionId: "CONN456", publicKey: undefined }, - { skipVerification: true }, - ); - for (const eventType of ["call.streaming.started", "call.streaming.stopped"]) { + // Telnyx documents stream lifecycle webhooks as `streaming.started` and + // `streaming.stopped` (no `call.` prefix). The bridge tracks its own + // lifecycle on the WebSocket; we ack the carrier webhook with 200 and + // emit nothing to avoid duplicate signal at the manager. + for (const eventType of ["streaming.started", "streaming.stopped"]) { const rawBody = JSON.stringify({ data: { event_type: eventType, diff --git a/extensions/voice-call/src/providers/telnyx.ts b/extensions/voice-call/src/providers/telnyx.ts index 32446bed7f6..eb7f6b4395a 100644 --- a/extensions/voice-call/src/providers/telnyx.ts +++ b/extensions/voice-call/src/providers/telnyx.ts @@ -208,23 +208,17 @@ export class TelnyxProvider implements VoiceCallProvider { digits: data.payload?.digit || "", }; - case "call.streaming.started": - case "call.streaming.stopped": - // Informational. The realtime bridge tracks its own lifecycle via the - // WebSocket; we acknowledge the webhook (200) but skip event emission - // to avoid duplicate signal at the manager. + case "streaming.started": + case "streaming.stopped": + // Informational webhook acknowledgement. The realtime bridge tracks + // its own lifecycle via the WebSocket; we ack the carrier webhook with + // 200 and skip event emission to avoid duplicate signal at the + // manager. Telnyx surfaces stream errors as `{event:"error"}` JSON + // frames over the WS, not as carrier webhooks, so there is no + // matching `streaming.failed` webhook to handle here — see the + // `error` branch in `TelnyxStreamFrameAdapter`. return null; - case "call.streaming.failed": { - const reason = typeof data.payload?.reason === "string" ? data.payload.reason : undefined; - return { - ...baseEvent, - type: "call.error", - error: `Telnyx streaming failed${reason ? `: ${reason}` : ""}`, - retryable: false, - }; - } - default: return null; } diff --git a/extensions/voice-call/src/webhook/realtime-handler.ts b/extensions/voice-call/src/webhook/realtime-handler.ts index b274a35c6b0..86ece787f69 100644 --- a/extensions/voice-call/src/webhook/realtime-handler.ts +++ b/extensions/voice-call/src/webhook/realtime-handler.ts @@ -475,6 +475,14 @@ export class RealtimeCallHandler { bridge.acknowledgeMark(); return; } + if (frame.kind === "error") { + console.error( + `[voice-call] realtime WS error frame providerCallId=${activeCallSid} code=${frame.code ?? "?"} title=${frame.title ?? ""} detail=${frame.detail ?? ""}`, + ); + // Carrier closes the stream after an error frame; let the close + // handler tear the bridge down on the resulting WS close. + return; + } if (frame.kind === "stop") { stopReceived = true; this.closeTelephonyBridge(activeCallSid, bridge, "completed"); diff --git a/extensions/voice-call/src/webhook/stream-frame-adapter.test.ts b/extensions/voice-call/src/webhook/stream-frame-adapter.test.ts index c012d6bbdf5..3a41a2e6528 100644 --- a/extensions/voice-call/src/webhook/stream-frame-adapter.test.ts +++ b/extensions/voice-call/src/webhook/stream-frame-adapter.test.ts @@ -68,7 +68,31 @@ describe("TwilioStreamFrameAdapter", () => { }); describe("TelnyxStreamFrameAdapter", () => { - it("parses Telnyx start, media, mark, stop with no streamSid", () => { + it("parses Telnyx start with top-level stream_id and start.call_control_id", () => { + const adapter = new TelnyxStreamFrameAdapter("call-control-id-123"); + + // Telnyx start frame: stream_id is top-level, call_control_id lives in start. + expect( + adapter.parseInbound( + JSON.stringify({ + event: "start", + sequence_number: "1", + stream_id: "telnyx-stream-7", + start: { + call_control_id: "v3:carrier-call-id", + call_session_id: "session-1", + media_format: { encoding: "PCMU", sample_rate: 8000, channels: 1 }, + }, + }), + ), + ).toEqual({ + kind: "start", + streamId: "telnyx-stream-7", + providerCallId: "v3:carrier-call-id", + }); + }); + + it("falls back to the constructor providerCallId when start fields are absent", () => { const adapter = new TelnyxStreamFrameAdapter("call-control-id-123"); expect(adapter.parseInbound(JSON.stringify({ event: "start", start: {} }))).toEqual({ @@ -76,24 +100,16 @@ describe("TelnyxStreamFrameAdapter", () => { streamId: "call-control-id-123", providerCallId: "call-control-id-123", }); + }); - expect( - adapter.parseInbound( - JSON.stringify({ - event: "start", - start: { stream_id: "telnyx-stream-7" }, - }), - ), - ).toEqual({ - kind: "start", - streamId: "telnyx-stream-7", - providerCallId: "call-control-id-123", - }); + it("parses media, mark, and stop with no streamSid", () => { + const adapter = new TelnyxStreamFrameAdapter("call-control-id-123"); expect( adapter.parseInbound( JSON.stringify({ event: "media", + stream_id: "telnyx-stream-7", media: { payload: "AAA=", timestamp: 40, track: "inbound_track" }, }), ), @@ -111,6 +127,29 @@ describe("TelnyxStreamFrameAdapter", () => { expect(adapter.parseInbound(JSON.stringify({ event: "stop" }))).toEqual({ kind: "stop" }); }); + it("surfaces Telnyx WS error frames so failures don't get swallowed", () => { + const adapter = new TelnyxStreamFrameAdapter("call-control-id-123"); + + expect( + adapter.parseInbound( + JSON.stringify({ + event: "error", + stream_id: "telnyx-stream-7", + payload: { + code: 100002, + title: "WebSocket error", + detail: "Unable to decode payload", + }, + }), + ), + ).toEqual({ + kind: "error", + code: "100002", + title: "WebSocket error", + detail: "Unable to decode payload", + }); + }); + it("serializes outbound frames without streamSid", () => { const adapter = new TelnyxStreamFrameAdapter("call-control-id-123"); diff --git a/extensions/voice-call/src/webhook/stream-frame-adapter.ts b/extensions/voice-call/src/webhook/stream-frame-adapter.ts index 2aa3ec28d23..abc7fa9bba5 100644 --- a/extensions/voice-call/src/webhook/stream-frame-adapter.ts +++ b/extensions/voice-call/src/webhook/stream-frame-adapter.ts @@ -23,6 +23,7 @@ export type StreamFrame = } | { kind: "mark"; name?: string } | { kind: "stop" } + | { kind: "error"; code?: string; title?: string; detail?: string } | { kind: "ignored" }; export interface StreamFrameAdapter { @@ -145,10 +146,17 @@ export class TwilioStreamFrameAdapter implements StreamFrameAdapter { /** * Telnyx Media Streaming frame format (PCMU profile). * - * Inbound frames do not carry streamSid; the carrier binds the WS to a call - * via the per-call auth token in the upgrade URL. Outbound frames are minimal - * envelopes: `{event:"media", media:{payload}}`, `{event:"clear"}`, - * `{event:"mark", mark:{name}}`. + * Inbound frames carry `stream_id` at the top level of the envelope; the + * `start` frame additionally carries `start.call_control_id` (the carrier + * call id). The WS is bound to a call via the per-call auth token in the + * upgrade URL. + * + * Stream errors arrive as `{event:"error", payload:{...}}` frames over the + * WebSocket — not as a webhook event. The adapter surfaces those as a + * dedicated frame kind so the realtime bridge can log and tear down cleanly. + * + * Outbound frames are minimal envelopes: `{event:"media", media:{payload}}`, + * `{event:"clear"}`, `{event:"mark", mark:{name}}`. * * Reference: https://developers.telnyx.com/docs/voice/programmable-voice/media-streaming */ @@ -163,19 +171,21 @@ export class TelnyxStreamFrameAdapter implements StreamFrameAdapter { return { kind: "ignored" }; } const event = msg.event; + const topLevelStreamId = + typeof msg.stream_id === "string" && msg.stream_id ? msg.stream_id : undefined; if (event === "start") { const startData = typeof msg.start === "object" && msg.start !== null ? (msg.start as Record) : undefined; - const streamId = - typeof startData?.stream_id === "string" && startData.stream_id - ? startData.stream_id + const carrierCallControlId = + typeof startData?.call_control_id === "string" && startData.call_control_id + ? startData.call_control_id : this.providerCallId; return { kind: "start", - streamId, - providerCallId: this.providerCallId, + streamId: topLevelStreamId ?? this.providerCallId, + providerCallId: carrierCallControlId, }; } if (event === "media") { @@ -205,6 +215,21 @@ export class TelnyxStreamFrameAdapter implements StreamFrameAdapter { if (event === "stop") { return { kind: "stop" }; } + if (event === "error") { + const errorData = + typeof msg.payload === "object" && msg.payload !== null + ? (msg.payload as Record) + : undefined; + return { + kind: "error", + code: + typeof errorData?.code === "string" || typeof errorData?.code === "number" + ? String(errorData.code) + : undefined, + title: typeof errorData?.title === "string" ? errorData.title : undefined, + detail: typeof errorData?.detail === "string" ? errorData.detail : undefined, + }; + } return { kind: "ignored" }; }