diff --git a/docs/plugins/google-meet.md b/docs/plugins/google-meet.md index e03a944cae4..2e30e2eb480 100644 --- a/docs/plugins/google-meet.md +++ b/docs/plugins/google-meet.md @@ -445,7 +445,7 @@ Enable the Voice Call plugin on the Gateway host, not on the Chrome node: ```json5 { plugins: { - allow: ["google-meet", "voice-call"], + allow: ["google-meet", "voice-call", "google"], entries: { "google-meet": { enabled: true, @@ -458,8 +458,24 @@ Enable the Voice Call plugin on the Gateway host, not on the Chrome node: enabled: true, config: { provider: "twilio", + inboundPolicy: "allowlist", + realtime: { + enabled: true, + provider: "google", + instructions: "Join this Google Meet as an OpenClaw agent. Be brief.", + toolPolicy: "safe-read-only", + providers: { + google: { + silenceDurationMs: 500, + startSensitivity: "high", + }, + }, + }, }, }, + google: { + enabled: true, + }, }, }, } @@ -472,8 +488,12 @@ secrets out of `openclaw.json`: export TWILIO_ACCOUNT_SID=AC... export TWILIO_AUTH_TOKEN=... export TWILIO_FROM_NUMBER=+15550001234 +export GEMINI_API_KEY=... ``` +Use `realtime.provider: "openai"` with the OpenAI provider plugin and +`OPENAI_API_KEY` instead if that is your realtime voice provider. + Restart or reload the Gateway after enabling `voice-call`; plugin config changes do not appear in an already running Gateway process until it reloads. diff --git a/docs/plugins/voice-call.md b/docs/plugins/voice-call.md index 8715d654184..04222a7d177 100644 --- a/docs/plugins/voice-call.md +++ b/docs/plugins/voice-call.md @@ -250,6 +250,9 @@ Current runtime behaviour: Defaults: API key from `realtime.providers.google.apiKey`, `GEMINI_API_KEY`, or `GOOGLE_GENERATIVE_AI_API_KEY`; model `gemini-2.5-flash-native-audio-preview-12-2025`; voice `Kore`. + `sessionResumption` and `contextWindowCompression` default on for longer, + reconnectable calls. Use `silenceDurationMs`, `startSensitivity`, and + `endSensitivity` to tune faster turn-taking on telephony audio. ```json5 { @@ -270,6 +273,8 @@ Current runtime behaviour: apiKey: "${GEMINI_API_KEY}", model: "gemini-2.5-flash-native-audio-preview-12-2025", voice: "Kore", + silenceDurationMs: 500, + startSensitivity: "high", }, }, }, diff --git a/extensions/google/realtime-voice-provider.test.ts b/extensions/google/realtime-voice-provider.test.ts index 814d4cf727a..92dedd00e42 100644 --- a/extensions/google/realtime-voice-provider.test.ts +++ b/extensions/google/realtime-voice-provider.test.ts @@ -107,6 +107,8 @@ describe("buildGoogleRealtimeVoiceProvider", () => { turnCoverage: "only-activity", automaticActivityDetectionDisabled: false, enableAffectiveDialog: undefined, + sessionResumption: undefined, + contextWindowCompression: undefined, thinkingLevel: undefined, thinkingBudget: undefined, }); @@ -181,6 +183,8 @@ describe("buildGoogleRealtimeVoiceProvider", () => { }, turnCoverage: "TURN_INCLUDES_ONLY_ACTIVITY", }, + sessionResumption: {}, + contextWindowCompression: { slidingWindow: {} }, tools: [ { functionDeclarations: [ @@ -312,6 +316,42 @@ describe("buildGoogleRealtimeVoiceProvider", () => { }); }); + it("can opt out of Google Live session resumption and context compression", async () => { + const provider = buildGoogleRealtimeVoiceProvider(); + const bridge = provider.createBridge({ + providerConfig: { + apiKey: "gemini-key", + contextWindowCompression: false, + sessionResumption: false, + }, + onAudio: vi.fn(), + onClearAudio: vi.fn(), + }); + + await bridge.connect(); + + expect(lastConnectParams().config).not.toHaveProperty("contextWindowCompression"); + expect(lastConnectParams().config).not.toHaveProperty("sessionResumption"); + }); + + it("captures Google Live resumption handles and reuses them on reconnect", async () => { + const provider = buildGoogleRealtimeVoiceProvider(); + const bridge = provider.createBridge({ + providerConfig: { apiKey: "gemini-key" }, + onAudio: vi.fn(), + onClearAudio: vi.fn(), + }); + + await bridge.connect(); + lastConnectParams().callbacks.onmessage({ + sessionResumptionUpdate: { resumable: true, newHandle: "resume-1" }, + }); + + await bridge.connect(); + + expect(lastConnectParams().config.sessionResumption).toEqual({ handle: "resume-1" }); + }); + it("waits for setup completion before draining audio and firing ready", async () => { const provider = buildGoogleRealtimeVoiceProvider(); const onReady = vi.fn(); diff --git a/extensions/google/realtime-voice-provider.ts b/extensions/google/realtime-voice-provider.ts index 4afed379c03..4514b0990a0 100644 --- a/extensions/google/realtime-voice-provider.ts +++ b/extensions/google/realtime-voice-provider.ts @@ -1,20 +1,20 @@ import { randomUUID } from "node:crypto"; -import { +import type { ActivityHandling, Behavior, EndSensitivity, + FunctionDeclaration, + FunctionResponse, FunctionResponseScheduling, + LiveConnectConfig, + LiveServerContent, + LiveServerMessage, + LiveServerToolCall, Modality, + RealtimeInputConfig, StartSensitivity, + ThinkingConfig, TurnCoverage, - type FunctionDeclaration, - type FunctionResponse, - type LiveConnectConfig, - type LiveServerContent, - type LiveServerMessage, - type LiveServerToolCall, - type RealtimeInputConfig, - type ThinkingConfig, } from "@google/genai"; import type { OpenClawConfig } from "openclaw/plugin-sdk/provider-onboard"; import type { @@ -47,7 +47,7 @@ const GOOGLE_REALTIME_BROWSER_API_VERSION = "v1alpha"; const GOOGLE_REALTIME_BROWSER_WEBSOCKET_URL = "wss://generativelanguage.googleapis.com/ws/google.ai.generativelanguage.v1alpha.GenerativeService.BidiGenerateContentConstrained"; const MAX_PENDING_AUDIO_CHUNKS = 320; -const DEFAULT_AUDIO_STREAM_END_SILENCE_MS = 700; +const DEFAULT_AUDIO_STREAM_END_SILENCE_MS = 500; const GOOGLE_REALTIME_BROWSER_SESSION_TTL_MS = 30 * 60 * 1000; const GOOGLE_REALTIME_BROWSER_NEW_SESSION_TTL_MS = 60 * 1000; @@ -70,6 +70,8 @@ type GoogleRealtimeVoiceProviderConfig = { turnCoverage?: GoogleRealtimeTurnCoverage; automaticActivityDetectionDisabled?: boolean; enableAffectiveDialog?: boolean; + sessionResumption?: boolean; + contextWindowCompression?: boolean; thinkingLevel?: GoogleRealtimeThinkingLevel; thinkingBudget?: number; }; @@ -90,6 +92,8 @@ type GoogleRealtimeLiveConfig = { turnCoverage?: GoogleRealtimeTurnCoverage; automaticActivityDetectionDisabled?: boolean; enableAffectiveDialog?: boolean; + sessionResumption?: boolean; + contextWindowCompression?: boolean; thinkingLevel?: GoogleRealtimeThinkingLevel; thinkingBudget?: number; }; @@ -209,6 +213,8 @@ function normalizeProviderConfig( turnCoverage: asTurnCoverage(raw?.turnCoverage), automaticActivityDetectionDisabled: asBoolean(raw?.automaticActivityDetectionDisabled), enableAffectiveDialog: asBoolean(raw?.enableAffectiveDialog), + sessionResumption: asBoolean(raw?.sessionResumption), + contextWindowCompression: asBoolean(raw?.contextWindowCompression), thinkingLevel: asThinkingLevel(raw?.thinkingLevel), thinkingBudget: asFiniteNumber(raw?.thinkingBudget), }; @@ -223,9 +229,9 @@ function mapStartSensitivity( ): StartSensitivity | undefined { switch (value) { case "high": - return StartSensitivity.START_SENSITIVITY_HIGH; + return "START_SENSITIVITY_HIGH" as StartSensitivity; case "low": - return StartSensitivity.START_SENSITIVITY_LOW; + return "START_SENSITIVITY_LOW" as StartSensitivity; default: return undefined; } @@ -236,9 +242,9 @@ function mapEndSensitivity( ): EndSensitivity | undefined { switch (value) { case "high": - return EndSensitivity.END_SENSITIVITY_HIGH; + return "END_SENSITIVITY_HIGH" as EndSensitivity; case "low": - return EndSensitivity.END_SENSITIVITY_LOW; + return "END_SENSITIVITY_LOW" as EndSensitivity; default: return undefined; } @@ -249,9 +255,9 @@ function mapActivityHandling( ): ActivityHandling | undefined { switch (value) { case "no-interruption": - return ActivityHandling.NO_INTERRUPTION; + return "NO_INTERRUPTION" as ActivityHandling; case "start-of-activity-interrupts": - return ActivityHandling.START_OF_ACTIVITY_INTERRUPTS; + return "START_OF_ACTIVITY_INTERRUPTS" as ActivityHandling; default: return undefined; } @@ -260,11 +266,11 @@ function mapActivityHandling( function mapTurnCoverage(value: GoogleRealtimeTurnCoverage | undefined): TurnCoverage | undefined { switch (value) { case "only-activity": - return TurnCoverage.TURN_INCLUDES_ONLY_ACTIVITY; + return "TURN_INCLUDES_ONLY_ACTIVITY" as TurnCoverage; case "all-input": - return TurnCoverage.TURN_INCLUDES_ALL_INPUT; + return "TURN_INCLUDES_ALL_INPUT" as TurnCoverage; case "audio-activity-and-all-video": - return TurnCoverage.TURN_INCLUDES_AUDIO_ACTIVITY_AND_ALL_VIDEO; + return "TURN_INCLUDES_AUDIO_ACTIVITY_AND_ALL_VIDEO" as TurnCoverage; default: return undefined; } @@ -316,7 +322,7 @@ function buildFunctionDeclarations(tools: RealtimeVoiceTool[] | undefined): Func parametersJsonSchema: tool.parameters, }; if (tool.name === REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME) { - declaration.behavior = Behavior.NON_BLOCKING; + declaration.behavior = "NON_BLOCKING" as Behavior; } return declaration; }); @@ -325,7 +331,7 @@ function buildFunctionDeclarations(tools: RealtimeVoiceTool[] | undefined): Func function buildGoogleLiveConnectConfig(config: GoogleRealtimeLiveConfig): LiveConnectConfig { const functionDeclarations = buildFunctionDeclarations(config.tools); return { - responseModalities: [Modality.AUDIO], + responseModalities: ["AUDIO" as Modality], ...(typeof config.temperature === "number" && config.temperature > 0 ? { temperature: config.temperature } : {}), @@ -359,7 +365,7 @@ function buildBrowserInitialSetup(model: string) { setup: { model: toGoogleModelResource(model), generationConfig: { - responseModalities: [Modality.AUDIO], + responseModalities: ["AUDIO" as Modality], }, inputAudioTranscription: {}, outputAudioTranscription: {}, @@ -403,6 +409,7 @@ class GoogleRealtimeVoiceBridge implements RealtimeVoiceBridge { private audioStreamEnded = false; private pendingFunctionNames = new Map(); private readonly audioFormat: RealtimeVoiceAudioFormat; + private resumptionHandle: string | undefined; constructor(private readonly config: GoogleRealtimeVoiceBridgeConfig) { this.audioFormat = config.audioFormat ?? REALTIME_VOICE_AUDIO_FORMAT_G711_ULAW_8KHZ; @@ -425,7 +432,17 @@ class GoogleRealtimeVoiceBridge implements RealtimeVoiceBridge { this.session = (await ai.live.connect({ model: this.config.model ?? GOOGLE_REALTIME_DEFAULT_MODEL, - config: buildGoogleLiveConnectConfig(this.config), + config: { + ...buildGoogleLiveConnectConfig(this.config), + ...(this.config.sessionResumption === false + ? {} + : { + sessionResumption: this.resumptionHandle ? { handle: this.resumptionHandle } : {}, + }), + ...(this.config.contextWindowCompression === false + ? {} + : { contextWindowCompression: { slidingWindow: {} } }), + }, callbacks: { onopen: () => { this.connected = true; @@ -548,7 +565,7 @@ class GoogleRealtimeVoiceBridge implements RealtimeVoiceBridge { : { output: result }, }; if (isConsultTool) { - functionResponse.scheduling = FunctionResponseScheduling.WHEN_IDLE; + functionResponse.scheduling = "WHEN_IDLE" as FunctionResponseScheduling; if (options?.willContinue === true) { functionResponse.willContinue = true; } @@ -607,6 +624,7 @@ class GoogleRealtimeVoiceBridge implements RealtimeVoiceBridge { } private handleMessage(message: LiveServerMessage): void { + this.captureSessionLifecycle(message); if (message.setupComplete) { this.handleSetupComplete(); } @@ -618,6 +636,20 @@ class GoogleRealtimeVoiceBridge implements RealtimeVoiceBridge { } } + private captureSessionLifecycle(message: LiveServerMessage): void { + const raw = message as unknown as { + goAway?: { timeLeft?: string }; + sessionResumptionUpdate?: { newHandle?: string; resumable?: boolean }; + }; + const update = raw.sessionResumptionUpdate; + if (update?.resumable && update.newHandle) { + this.resumptionHandle = update.newHandle; + } + if (raw.goAway?.timeLeft) { + this.config.onError?.(new Error(`Google Live session goAway: ${raw.goAway.timeLeft}`)); + } + } + private handleSetupComplete(): void { this.sessionConfigured = true; for (const chunk of this.pendingAudio.splice(0)) { @@ -784,6 +816,8 @@ export function buildGoogleRealtimeVoiceProvider(): RealtimeVoiceProviderPlugin turnCoverage: config.turnCoverage, automaticActivityDetectionDisabled: config.automaticActivityDetectionDisabled, enableAffectiveDialog: config.enableAffectiveDialog, + sessionResumption: config.sessionResumption, + contextWindowCompression: config.contextWindowCompression, thinkingLevel: config.thinkingLevel, thinkingBudget: config.thinkingBudget, }); diff --git a/extensions/voice-call/index.ts b/extensions/voice-call/index.ts index a97c779e740..b82e500f953 100644 --- a/extensions/voice-call/index.ts +++ b/extensions/voice-call/index.ts @@ -589,9 +589,9 @@ export default definePluginEntry({ respondError(respond, "to required", ErrorCodes.INVALID_REQUEST); return; } - const rt = await ensureRuntime(); const mode = params?.mode === "notify" || params?.mode === "conversation" ? params.mode : undefined; + const rt = await ensureRuntime(); await initiateCallAndRespond({ rt, respond, diff --git a/extensions/voice-call/src/webhook/realtime-audio-pacer.test.ts b/extensions/voice-call/src/webhook/realtime-audio-pacer.test.ts new file mode 100644 index 00000000000..f87a7e78f99 --- /dev/null +++ b/extensions/voice-call/src/webhook/realtime-audio-pacer.test.ts @@ -0,0 +1,86 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; +import { + RealtimeMulawSpeechStartDetector, + RealtimeTwilioAudioPacer, + calculateMulawRms, +} from "./realtime-audio-pacer.js"; + +describe("RealtimeTwilioAudioPacer", () => { + afterEach(() => { + vi.useRealTimers(); + }); + + it("paces realtime audio as 20ms telephony frames before marks", async () => { + vi.useFakeTimers(); + const sent: unknown[] = []; + const pacer = new RealtimeTwilioAudioPacer({ + streamSid: "MZ-test", + sendJson: (message) => { + sent.push(message); + return true; + }, + }); + + pacer.sendAudio(Buffer.alloc(320, 0x7f)); + pacer.sendMark("audio-1"); + + expect(sent).toHaveLength(1); + expect( + Buffer.from((sent[0] as { media: { payload: string } }).media.payload, "base64"), + ).toHaveLength(160); + + await vi.advanceTimersByTimeAsync(20); + expect(sent).toHaveLength(2); + expect( + Buffer.from((sent[1] as { media: { payload: string } }).media.payload, "base64"), + ).toHaveLength(160); + + await vi.advanceTimersByTimeAsync(20); + expect(sent[2]).toEqual({ + event: "mark", + streamSid: "MZ-test", + mark: { name: "audio-1" }, + }); + }); + + it("clears queued audio immediately", async () => { + vi.useFakeTimers(); + const sent: unknown[] = []; + const pacer = new RealtimeTwilioAudioPacer({ + streamSid: "MZ-test", + sendJson: (message) => { + sent.push(message); + return true; + }, + }); + + pacer.sendAudio(Buffer.alloc(480, 0x7f)); + pacer.clearAudio(); + await vi.advanceTimersByTimeAsync(100); + + expect(sent).toHaveLength(2); + expect(sent[1]).toEqual({ event: "clear", streamSid: "MZ-test" }); + }); +}); + +describe("RealtimeMulawSpeechStartDetector", () => { + it("detects a speech start after consecutive loud chunks and resets after quiet", () => { + const detector = new RealtimeMulawSpeechStartDetector({ + requiredLoudChunks: 2, + requiredQuietChunks: 2, + rmsThreshold: 0.02, + }); + const silence = Buffer.alloc(160, 0xff); + const speech = Buffer.alloc(160, 0x00); + + expect(calculateMulawRms(silence)).toBeLessThan(0.02); + expect(calculateMulawRms(speech)).toBeGreaterThan(0.02); + expect(detector.accept(speech)).toBe(false); + expect(detector.accept(speech)).toBe(true); + expect(detector.accept(speech)).toBe(false); + expect(detector.accept(silence)).toBe(false); + expect(detector.accept(silence)).toBe(false); + expect(detector.accept(speech)).toBe(false); + expect(detector.accept(speech)).toBe(true); + }); +}); diff --git a/extensions/voice-call/src/webhook/realtime-audio-pacer.ts b/extensions/voice-call/src/webhook/realtime-audio-pacer.ts new file mode 100644 index 00000000000..ad3efa08a6d --- /dev/null +++ b/extensions/voice-call/src/webhook/realtime-audio-pacer.ts @@ -0,0 +1,176 @@ +import { mulawToPcm } from "openclaw/plugin-sdk/realtime-voice"; + +const TELEPHONY_SAMPLE_RATE = 8_000; +const TELEPHONY_CHUNK_BYTES = 160; +const TELEPHONY_CHUNK_MS = 20; +const DEFAULT_SPEECH_RMS_THRESHOLD = 0.02; +const DEFAULT_REQUIRED_LOUD_CHUNKS = 2; +const DEFAULT_REQUIRED_QUIET_CHUNKS = 10; + +type RealtimeTwilioAudioQueueItem = + | { + chunk: Buffer; + durationMs: number; + type: "audio"; + } + | { + name: string; + type: "mark"; + }; + +export type RealtimeTwilioAudioPacerSendJson = (message: unknown) => boolean; + +export class RealtimeTwilioAudioPacer { + private queue: RealtimeTwilioAudioQueueItem[] = []; + private timer: ReturnType | null = null; + private closed = false; + + constructor( + private readonly params: { + sendJson: RealtimeTwilioAudioPacerSendJson; + streamSid: string; + }, + ) {} + + sendAudio(muLaw: Buffer): void { + if (this.closed || muLaw.length === 0) { + return; + } + for (let offset = 0; offset < muLaw.length; offset += TELEPHONY_CHUNK_BYTES) { + const chunk = Buffer.from(muLaw.subarray(offset, offset + TELEPHONY_CHUNK_BYTES)); + this.queue.push({ + type: "audio", + chunk, + durationMs: Math.max(1, Math.round((chunk.length / TELEPHONY_SAMPLE_RATE) * 1000)), + }); + } + this.ensurePump(); + } + + sendMark(name: string): void { + if (this.closed || !name) { + return; + } + this.queue.push({ type: "mark", name }); + this.ensurePump(); + } + + clearAudio(): void { + if (this.closed) { + return; + } + this.clearTimer(); + this.queue = []; + this.params.sendJson({ event: "clear", streamSid: this.params.streamSid }); + } + + close(): void { + this.closed = true; + this.clearTimer(); + this.queue = []; + } + + private clearTimer(): void { + if (!this.timer) { + return; + } + clearTimeout(this.timer); + this.timer = null; + } + + private ensurePump(): void { + if (!this.timer) { + this.pump(); + } + } + + private pump(): void { + this.timer = null; + if (this.closed) { + return; + } + const item = this.queue.shift(); + if (!item) { + return; + } + + let delayMs = 0; + let sent = true; + if (item.type === "audio") { + sent = this.params.sendJson({ + event: "media", + streamSid: this.params.streamSid, + media: { payload: item.chunk.toString("base64") }, + }); + delayMs = item.durationMs || TELEPHONY_CHUNK_MS; + } else { + sent = this.params.sendJson({ + event: "mark", + streamSid: this.params.streamSid, + mark: { name: item.name }, + }); + } + + if (!sent) { + this.queue = []; + return; + } + if (this.queue.length > 0) { + this.timer = setTimeout(() => this.pump(), delayMs); + } + } +} + +export function calculateMulawRms(muLaw: Buffer): number { + if (muLaw.length === 0) { + return 0; + } + const pcm = mulawToPcm(muLaw); + const samples = Math.floor(pcm.length / 2); + if (samples === 0) { + return 0; + } + let sum = 0; + for (let i = 0; i < samples; i += 1) { + const normalized = pcm.readInt16LE(i * 2) / 32768; + sum += normalized * normalized; + } + return Math.sqrt(sum / samples); +} + +export class RealtimeMulawSpeechStartDetector { + private loudChunks = 0; + private quietChunks = DEFAULT_REQUIRED_QUIET_CHUNKS; + private speaking = false; + + constructor( + private readonly params: { + requiredLoudChunks?: number; + requiredQuietChunks?: number; + rmsThreshold?: number; + } = {}, + ) {} + + accept(muLaw: Buffer): boolean { + const rms = calculateMulawRms(muLaw); + const threshold = this.params.rmsThreshold ?? DEFAULT_SPEECH_RMS_THRESHOLD; + if (rms >= threshold) { + this.quietChunks = 0; + this.loudChunks += 1; + const requiredLoudChunks = this.params.requiredLoudChunks ?? DEFAULT_REQUIRED_LOUD_CHUNKS; + if (!this.speaking && this.loudChunks >= requiredLoudChunks) { + this.speaking = true; + return true; + } + return false; + } + + this.loudChunks = 0; + this.quietChunks += 1; + const requiredQuietChunks = this.params.requiredQuietChunks ?? DEFAULT_REQUIRED_QUIET_CHUNKS; + if (this.quietChunks >= requiredQuietChunks) { + this.speaking = false; + } + return false; + } +} diff --git a/extensions/voice-call/src/webhook/realtime-handler.ts b/extensions/voice-call/src/webhook/realtime-handler.ts index 98793eeccbd..9d2961c1588 100644 --- a/extensions/voice-call/src/webhook/realtime-handler.ts +++ b/extensions/voice-call/src/webhook/realtime-handler.ts @@ -16,6 +16,10 @@ import type { CallManager } from "../manager.js"; import type { VoiceCallProvider } from "../providers/base.js"; import type { CallRecord, NormalizedEvent } from "../types.js"; import type { WebhookResponsePayload } from "../webhook.types.js"; +import { + RealtimeMulawSpeechStartDetector, + RealtimeTwilioAudioPacer, +} from "./realtime-audio-pacer.js"; export type ToolHandlerContext = { partialUserTranscript?: string; @@ -29,6 +33,7 @@ export type ToolHandlerFn = ( const STREAM_TOKEN_TTL_MS = 30_000; const DEFAULT_HOST = "localhost:8443"; const MAX_REALTIME_MESSAGE_BYTES = 256 * 1024; +const MAX_REALTIME_WS_BUFFERED_BYTES = 1024 * 1024; function normalizePath(pathname: string): string { const trimmed = pathname.trim(); @@ -179,7 +184,8 @@ export class RealtimeCallHandler { ? (msg.media as Record) : undefined; if (msg.event === "media" && typeof mediaData?.payload === "string") { - bridge.sendAudio(Buffer.from(mediaData.payload, "base64")); + const audio = Buffer.from(mediaData.payload, "base64"); + bridge.sendAudio(audio); if (typeof mediaData.timestamp === "number") { bridge.setMediaTimestamp(mediaData.timestamp); } else if (typeof mediaData.timestamp === "string") { @@ -278,7 +284,24 @@ export class RealtimeCallHandler { this.endCallInManager(callSid, callId, reason); }; - const bridge = createRealtimeVoiceBridgeSession({ + const sendJson = (message: unknown): boolean => { + if (ws.readyState !== WebSocket.OPEN) { + return false; + } + if (ws.bufferedAmount > MAX_REALTIME_WS_BUFFERED_BYTES) { + ws.close(1013, "Backpressure: send buffer exceeded"); + return false; + } + ws.send(JSON.stringify(message)); + if (ws.bufferedAmount > MAX_REALTIME_WS_BUFFERED_BYTES) { + ws.close(1013, "Backpressure: send buffer exceeded"); + return false; + } + return true; + }; + const audioPacer = new RealtimeTwilioAudioPacer({ streamSid, sendJson }); + const speechDetector = new RealtimeMulawSpeechStartDetector(); + const session = createRealtimeVoiceBridgeSession({ provider: this.realtimeProvider, providerConfig: this.providerConfig, instructions: this.config.instructions, @@ -288,19 +311,13 @@ export class RealtimeCallHandler { audioSink: { isOpen: () => ws.readyState === WebSocket.OPEN, sendAudio: (muLaw) => { - ws.send( - JSON.stringify({ - event: "media", - streamSid, - media: { payload: muLaw.toString("base64") }, - }), - ); + audioPacer.sendAudio(muLaw); }, clearAudio: () => { - ws.send(JSON.stringify({ event: "clear", streamSid })); + audioPacer.clearAudio(); }, sendMark: (markName) => { - ws.send(JSON.stringify({ event: "mark", streamSid, mark: { name: markName } })); + audioPacer.sendMark(markName); }, }, onTranscript: (role, text, isFinal) => { @@ -367,24 +384,32 @@ export class RealtimeCallHandler { }); }, }); - this.activeBridgesByCallId.set(callId, bridge); - this.activeBridgesByCallId.set(callSid, bridge); - const closeBridge = bridge.close.bind(bridge); - bridge.close = () => { + this.activeBridgesByCallId.set(callId, session); + this.activeBridgesByCallId.set(callSid, session); + const sendAudioToSession = session.sendAudio.bind(session); + session.sendAudio = (audio) => { + if (speechDetector.accept(audio)) { + audioPacer.clearAudio(); + } + sendAudioToSession(audio); + }; + const closeSession = session.close.bind(session); + session.close = () => { this.activeBridgesByCallId.delete(callId); this.activeBridgesByCallId.delete(callSid); this.partialUserTranscriptsByCallId.delete(callId); - closeBridge(); + audioPacer.close(); + closeSession(); }; - bridge.connect().catch((error: Error) => { + session.connect().catch((error: Error) => { console.error("[voice-call] Failed to connect realtime bridge:", error); - bridge.close(); + session.close(); emitCallEnd("error"); ws.close(1011, "Failed to connect"); }); - return bridge; + return session; } private registerCallInManager(