mirror of
https://github.com/moltbot/moltbot.git
synced 2026-05-21 21:56:46 +00:00
fix: align Telnyx stream lifecycle parsing with carrier docs
Two parsing bugs caught by Codex review on the Telnyx Media Streaming
PR:
Lifecycle webhook event names had a stray `call.` prefix that never
matched Telnyx's documented event types. Telnyx surfaces stream
lifecycle as `streaming.started` and `streaming.stopped` (no prefix);
stream errors arrive as `{event:"error"}` JSON frames over the
WebSocket, not as carrier webhooks. Drop the bogus
`call.streaming.failed` case from the webhook parser and add a new
`error` frame kind to the StreamFrameAdapter union so the realtime
handler can log failures instead of silently dropping them.
Telnyx WebSocket frames carry `stream_id` at the top level of the
envelope and `call_control_id` inside the `start` object; the
Telnyx adapter was reading `start.stream_id` (always undefined) and
defaulting `providerCallId` to the constructor-supplied value
regardless of what the carrier sent. Read both fields from the
documented locations and fall back to the constructor providerCallId
only when the carrier frame omits them.
Tests updated to reflect the carrier-documented frame shapes; new
fixture covers `{event:"error"}` round-trip through the adapter.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -404,42 +404,16 @@ describe("TelnyxProvider Media Streaming (PCMU)", () => {
|
||||
expect(body.stream_auth_token).toBe("token-xyz");
|
||||
});
|
||||
|
||||
it("normalizes call.streaming.failed into a non-retryable call.error event", () => {
|
||||
it("silently acknowledges streaming.started and streaming.stopped webhooks", () => {
|
||||
const provider = new TelnyxProvider(
|
||||
{ apiKey: "KEY123", connectionId: "CONN456", publicKey: undefined },
|
||||
{ skipVerification: true },
|
||||
);
|
||||
const rawBody = JSON.stringify({
|
||||
data: {
|
||||
event_type: "call.streaming.failed",
|
||||
id: "evt-1",
|
||||
payload: {
|
||||
call_control_id: "call-control-1",
|
||||
client_state: Buffer.from("call-1").toString("base64"),
|
||||
reason: "destination_unreachable",
|
||||
},
|
||||
},
|
||||
});
|
||||
const result = provider.parseWebhookEvent(createCtx({ rawBody }), {
|
||||
verifiedRequestKey: "key-1",
|
||||
});
|
||||
expect(result.events).toHaveLength(1);
|
||||
const event = result.events[0];
|
||||
if (event?.type !== "call.error") {
|
||||
throw new Error(`expected call.error, got ${event?.type}`);
|
||||
}
|
||||
expect(event.error).toBe("Telnyx streaming failed: destination_unreachable");
|
||||
expect(event.retryable).toBe(false);
|
||||
expect(event.callId).toBe("call-1");
|
||||
expect(event.providerCallId).toBe("call-control-1");
|
||||
});
|
||||
|
||||
it("silently acknowledges call.streaming.started and call.streaming.stopped", () => {
|
||||
const provider = new TelnyxProvider(
|
||||
{ apiKey: "KEY123", connectionId: "CONN456", publicKey: undefined },
|
||||
{ skipVerification: true },
|
||||
);
|
||||
for (const eventType of ["call.streaming.started", "call.streaming.stopped"]) {
|
||||
// Telnyx documents stream lifecycle webhooks as `streaming.started` and
|
||||
// `streaming.stopped` (no `call.` prefix). The bridge tracks its own
|
||||
// lifecycle on the WebSocket; we ack the carrier webhook with 200 and
|
||||
// emit nothing to avoid duplicate signal at the manager.
|
||||
for (const eventType of ["streaming.started", "streaming.stopped"]) {
|
||||
const rawBody = JSON.stringify({
|
||||
data: {
|
||||
event_type: eventType,
|
||||
|
||||
@@ -208,23 +208,17 @@ export class TelnyxProvider implements VoiceCallProvider {
|
||||
digits: data.payload?.digit || "",
|
||||
};
|
||||
|
||||
case "call.streaming.started":
|
||||
case "call.streaming.stopped":
|
||||
// Informational. The realtime bridge tracks its own lifecycle via the
|
||||
// WebSocket; we acknowledge the webhook (200) but skip event emission
|
||||
// to avoid duplicate signal at the manager.
|
||||
case "streaming.started":
|
||||
case "streaming.stopped":
|
||||
// Informational webhook acknowledgement. The realtime bridge tracks
|
||||
// its own lifecycle via the WebSocket; we ack the carrier webhook with
|
||||
// 200 and skip event emission to avoid duplicate signal at the
|
||||
// manager. Telnyx surfaces stream errors as `{event:"error"}` JSON
|
||||
// frames over the WS, not as carrier webhooks, so there is no
|
||||
// matching `streaming.failed` webhook to handle here — see the
|
||||
// `error` branch in `TelnyxStreamFrameAdapter`.
|
||||
return null;
|
||||
|
||||
case "call.streaming.failed": {
|
||||
const reason = typeof data.payload?.reason === "string" ? data.payload.reason : undefined;
|
||||
return {
|
||||
...baseEvent,
|
||||
type: "call.error",
|
||||
error: `Telnyx streaming failed${reason ? `: ${reason}` : ""}`,
|
||||
retryable: false,
|
||||
};
|
||||
}
|
||||
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
|
||||
@@ -475,6 +475,14 @@ export class RealtimeCallHandler {
|
||||
bridge.acknowledgeMark();
|
||||
return;
|
||||
}
|
||||
if (frame.kind === "error") {
|
||||
console.error(
|
||||
`[voice-call] realtime WS error frame providerCallId=${activeCallSid} code=${frame.code ?? "?"} title=${frame.title ?? ""} detail=${frame.detail ?? ""}`,
|
||||
);
|
||||
// Carrier closes the stream after an error frame; let the close
|
||||
// handler tear the bridge down on the resulting WS close.
|
||||
return;
|
||||
}
|
||||
if (frame.kind === "stop") {
|
||||
stopReceived = true;
|
||||
this.closeTelephonyBridge(activeCallSid, bridge, "completed");
|
||||
|
||||
@@ -68,7 +68,31 @@ describe("TwilioStreamFrameAdapter", () => {
|
||||
});
|
||||
|
||||
describe("TelnyxStreamFrameAdapter", () => {
|
||||
it("parses Telnyx start, media, mark, stop with no streamSid", () => {
|
||||
it("parses Telnyx start with top-level stream_id and start.call_control_id", () => {
|
||||
const adapter = new TelnyxStreamFrameAdapter("call-control-id-123");
|
||||
|
||||
// Telnyx start frame: stream_id is top-level, call_control_id lives in start.
|
||||
expect(
|
||||
adapter.parseInbound(
|
||||
JSON.stringify({
|
||||
event: "start",
|
||||
sequence_number: "1",
|
||||
stream_id: "telnyx-stream-7",
|
||||
start: {
|
||||
call_control_id: "v3:carrier-call-id",
|
||||
call_session_id: "session-1",
|
||||
media_format: { encoding: "PCMU", sample_rate: 8000, channels: 1 },
|
||||
},
|
||||
}),
|
||||
),
|
||||
).toEqual({
|
||||
kind: "start",
|
||||
streamId: "telnyx-stream-7",
|
||||
providerCallId: "v3:carrier-call-id",
|
||||
});
|
||||
});
|
||||
|
||||
it("falls back to the constructor providerCallId when start fields are absent", () => {
|
||||
const adapter = new TelnyxStreamFrameAdapter("call-control-id-123");
|
||||
|
||||
expect(adapter.parseInbound(JSON.stringify({ event: "start", start: {} }))).toEqual({
|
||||
@@ -76,24 +100,16 @@ describe("TelnyxStreamFrameAdapter", () => {
|
||||
streamId: "call-control-id-123",
|
||||
providerCallId: "call-control-id-123",
|
||||
});
|
||||
});
|
||||
|
||||
expect(
|
||||
adapter.parseInbound(
|
||||
JSON.stringify({
|
||||
event: "start",
|
||||
start: { stream_id: "telnyx-stream-7" },
|
||||
}),
|
||||
),
|
||||
).toEqual({
|
||||
kind: "start",
|
||||
streamId: "telnyx-stream-7",
|
||||
providerCallId: "call-control-id-123",
|
||||
});
|
||||
it("parses media, mark, and stop with no streamSid", () => {
|
||||
const adapter = new TelnyxStreamFrameAdapter("call-control-id-123");
|
||||
|
||||
expect(
|
||||
adapter.parseInbound(
|
||||
JSON.stringify({
|
||||
event: "media",
|
||||
stream_id: "telnyx-stream-7",
|
||||
media: { payload: "AAA=", timestamp: 40, track: "inbound_track" },
|
||||
}),
|
||||
),
|
||||
@@ -111,6 +127,29 @@ describe("TelnyxStreamFrameAdapter", () => {
|
||||
expect(adapter.parseInbound(JSON.stringify({ event: "stop" }))).toEqual({ kind: "stop" });
|
||||
});
|
||||
|
||||
it("surfaces Telnyx WS error frames so failures don't get swallowed", () => {
|
||||
const adapter = new TelnyxStreamFrameAdapter("call-control-id-123");
|
||||
|
||||
expect(
|
||||
adapter.parseInbound(
|
||||
JSON.stringify({
|
||||
event: "error",
|
||||
stream_id: "telnyx-stream-7",
|
||||
payload: {
|
||||
code: 100002,
|
||||
title: "WebSocket error",
|
||||
detail: "Unable to decode payload",
|
||||
},
|
||||
}),
|
||||
),
|
||||
).toEqual({
|
||||
kind: "error",
|
||||
code: "100002",
|
||||
title: "WebSocket error",
|
||||
detail: "Unable to decode payload",
|
||||
});
|
||||
});
|
||||
|
||||
it("serializes outbound frames without streamSid", () => {
|
||||
const adapter = new TelnyxStreamFrameAdapter("call-control-id-123");
|
||||
|
||||
|
||||
@@ -23,6 +23,7 @@ export type StreamFrame =
|
||||
}
|
||||
| { kind: "mark"; name?: string }
|
||||
| { kind: "stop" }
|
||||
| { kind: "error"; code?: string; title?: string; detail?: string }
|
||||
| { kind: "ignored" };
|
||||
|
||||
export interface StreamFrameAdapter {
|
||||
@@ -145,10 +146,17 @@ export class TwilioStreamFrameAdapter implements StreamFrameAdapter {
|
||||
/**
|
||||
* Telnyx Media Streaming frame format (PCMU profile).
|
||||
*
|
||||
* Inbound frames do not carry streamSid; the carrier binds the WS to a call
|
||||
* via the per-call auth token in the upgrade URL. Outbound frames are minimal
|
||||
* envelopes: `{event:"media", media:{payload}}`, `{event:"clear"}`,
|
||||
* `{event:"mark", mark:{name}}`.
|
||||
* Inbound frames carry `stream_id` at the top level of the envelope; the
|
||||
* `start` frame additionally carries `start.call_control_id` (the carrier
|
||||
* call id). The WS is bound to a call via the per-call auth token in the
|
||||
* upgrade URL.
|
||||
*
|
||||
* Stream errors arrive as `{event:"error", payload:{...}}` frames over the
|
||||
* WebSocket — not as a webhook event. The adapter surfaces those as a
|
||||
* dedicated frame kind so the realtime bridge can log and tear down cleanly.
|
||||
*
|
||||
* Outbound frames are minimal envelopes: `{event:"media", media:{payload}}`,
|
||||
* `{event:"clear"}`, `{event:"mark", mark:{name}}`.
|
||||
*
|
||||
* Reference: https://developers.telnyx.com/docs/voice/programmable-voice/media-streaming
|
||||
*/
|
||||
@@ -163,19 +171,21 @@ export class TelnyxStreamFrameAdapter implements StreamFrameAdapter {
|
||||
return { kind: "ignored" };
|
||||
}
|
||||
const event = msg.event;
|
||||
const topLevelStreamId =
|
||||
typeof msg.stream_id === "string" && msg.stream_id ? msg.stream_id : undefined;
|
||||
if (event === "start") {
|
||||
const startData =
|
||||
typeof msg.start === "object" && msg.start !== null
|
||||
? (msg.start as Record<string, unknown>)
|
||||
: undefined;
|
||||
const streamId =
|
||||
typeof startData?.stream_id === "string" && startData.stream_id
|
||||
? startData.stream_id
|
||||
const carrierCallControlId =
|
||||
typeof startData?.call_control_id === "string" && startData.call_control_id
|
||||
? startData.call_control_id
|
||||
: this.providerCallId;
|
||||
return {
|
||||
kind: "start",
|
||||
streamId,
|
||||
providerCallId: this.providerCallId,
|
||||
streamId: topLevelStreamId ?? this.providerCallId,
|
||||
providerCallId: carrierCallControlId,
|
||||
};
|
||||
}
|
||||
if (event === "media") {
|
||||
@@ -205,6 +215,21 @@ export class TelnyxStreamFrameAdapter implements StreamFrameAdapter {
|
||||
if (event === "stop") {
|
||||
return { kind: "stop" };
|
||||
}
|
||||
if (event === "error") {
|
||||
const errorData =
|
||||
typeof msg.payload === "object" && msg.payload !== null
|
||||
? (msg.payload as Record<string, unknown>)
|
||||
: undefined;
|
||||
return {
|
||||
kind: "error",
|
||||
code:
|
||||
typeof errorData?.code === "string" || typeof errorData?.code === "number"
|
||||
? String(errorData.code)
|
||||
: undefined,
|
||||
title: typeof errorData?.title === "string" ? errorData.title : undefined,
|
||||
detail: typeof errorData?.detail === "string" ? errorData.detail : undefined,
|
||||
};
|
||||
}
|
||||
return { kind: "ignored" };
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user