From ee448a37e959deb0aae094d8882b3599a2aab2f1 Mon Sep 17 00:00:00 2001 From: Quentin Fuxa Date: Tue, 16 Sep 2025 23:51:00 +0200 Subject: [PATCH] when pcm-input is set, the frontend uses AudioWorklet --- whisperlivekit/audio_processor.py | 119 +++++++++++++- whisperlivekit/basic_server.py | 5 + whisperlivekit/ffmpeg_manager.py | 195 +++++++++++++++++++++++ whisperlivekit/parse_args.py | 2 +- whisperlivekit/web/live_transcription.js | 95 +++++++---- 5 files changed, 382 insertions(+), 34 deletions(-) create mode 100644 whisperlivekit/ffmpeg_manager.py diff --git a/whisperlivekit/audio_processor.py b/whisperlivekit/audio_processor.py index fd9307b..999ddd1 100644 --- a/whisperlivekit/audio_processor.py +++ b/whisperlivekit/audio_processor.py @@ -8,6 +8,7 @@ from whisperlivekit.timed_objects import ASRToken, Silence, Line, FrontData, Sta from whisperlivekit.core import TranscriptionEngine, online_factory, online_diarization_factory, online_translation_factory from whisperlivekit.silero_vad_iterator import FixedVADIterator from whisperlivekit.results_formater import format_output +from whisperlivekit.ffmpeg_manager import FFmpegManager, FFmpegState # Set up logging once logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) @@ -48,7 +49,7 @@ class AudioProcessor: self.bytes_per_sample = 2 self.bytes_per_sec = self.samples_per_sec * self.bytes_per_sample self.max_bytes_per_sec = 32000 * 5 # 5 seconds of audio at 32 kHz - self.is_pcm_input = True + self.is_pcm_input = self.args.pcm_input self.debug = False # State management @@ -74,7 +75,21 @@ class AudioProcessor: self.vac = FixedVADIterator(models.vac_model) else: self.vac = None - + + self.ffmpeg_manager = None + self.ffmpeg_reader_task = None + self._ffmpeg_error = None + + if not self.is_pcm_input: + self.ffmpeg_manager = FFmpegManager( + sample_rate=self.sample_rate, + channels=self.channels + ) + async def handle_ffmpeg_error(error_type: str): + logger.error(f"FFmpeg error: {error_type}") + self._ffmpeg_error = error_type + self.ffmpeg_manager.on_error_callback = handle_ffmpeg_error + self.transcription_queue = asyncio.Queue() if self.args.transcription else None self.diarization_queue = asyncio.Queue() if self.args.diarization else None self.translation_queue = asyncio.Queue() if self.args.target_language else None @@ -155,6 +170,56 @@ class AudioProcessor: self.end_buffer = self.end_attributed_speaker = 0 self.beg_loop = time() + async def ffmpeg_stdout_reader(self): + """Read audio data from FFmpeg stdout and process it into the PCM pipeline.""" + beg = time() + while True: + try: + if self.is_stopping: + logger.info("Stopping ffmpeg_stdout_reader due to stopping flag.") + break + + state = await self.ffmpeg_manager.get_state() if self.ffmpeg_manager else FFmpegState.STOPPED + if state == FFmpegState.FAILED: + logger.error("FFmpeg is in FAILED state, cannot read data") + break + elif state == FFmpegState.STOPPED: + logger.info("FFmpeg is stopped") + break + elif state != FFmpegState.RUNNING: + await asyncio.sleep(0.1) + continue + + current_time = time() + elapsed_time = max(0.0, current_time - beg) + buffer_size = max(int(32000 * elapsed_time), 4096) # dynamic read + beg = current_time + + chunk = await self.ffmpeg_manager.read_data(buffer_size) + if not chunk: + # No data currently available + await asyncio.sleep(0.05) + continue + + self.pcm_buffer.extend(chunk) + await self.handle_pcm_data() + + except asyncio.CancelledError: + logger.info("ffmpeg_stdout_reader cancelled.") + break + except Exception as e: + logger.warning(f"Exception in ffmpeg_stdout_reader: {e}") + logger.debug(f"Traceback: {traceback.format_exc()}") + await asyncio.sleep(0.2) + + logger.info("FFmpeg stdout processing finished. Signaling downstream processors if needed.") + if self.args.transcription and self.transcription_queue: + await self.transcription_queue.put(SENTINEL) + if self.args.diarization and self.diarization_queue: + await self.diarization_queue.put(SENTINEL) + if self.args.target_language and self.translation_queue: + await self.translation_queue.put(SENTINEL) + async def transcription_processor(self): """Process audio chunks for transcription.""" cumulative_pcm_duration_stream_time = 0.0 @@ -337,6 +402,16 @@ class AudioProcessor: """Format processing results for output.""" while True: try: + # If FFmpeg error occurred, notify front-end + if self._ffmpeg_error: + yield FrontData( + status="error", + error=f"FFmpeg error: {self._ffmpeg_error}" + ) + self._ffmpeg_error = None + await asyncio.sleep(1) + continue + # Get current state state = await self.get_current_state() @@ -412,6 +487,21 @@ class AudioProcessor: self.all_tasks_for_cleanup = [] processing_tasks_for_watchdog = [] + # If using FFmpeg (non-PCM input), start it and spawn stdout reader + if not self.is_pcm_input: + success = await self.ffmpeg_manager.start() + if not success: + logger.error("Failed to start FFmpeg manager") + async def error_generator(): + yield FrontData( + status="error", + error="FFmpeg failed to start. Please check that FFmpeg is installed." + ) + return error_generator() + self.ffmpeg_reader_task = asyncio.create_task(self.ffmpeg_stdout_reader()) + self.all_tasks_for_cleanup.append(self.ffmpeg_reader_task) + processing_tasks_for_watchdog.append(self.ffmpeg_reader_task) + if self.args.transcription and self.online: self.transcription_task = asyncio.create_task(self.transcription_processor()) self.all_tasks_for_cleanup.append(self.transcription_task) @@ -466,7 +556,14 @@ class AudioProcessor: if created_tasks: await asyncio.gather(*created_tasks, return_exceptions=True) logger.info("All processing tasks cancelled or finished.") - if self.args.diarization and hasattr(self, 'diarization') and hasattr(self.diarization, 'close'): + + if not self.is_pcm_input and self.ffmpeg_manager: + try: + await self.ffmpeg_manager.stop() + logger.info("FFmpeg manager stopped.") + except Exception as e: + logger.warning(f"Error stopping FFmpeg manager: {e}") + if self.args.diarization and hasattr(self, 'dianization') and hasattr(self.diarization, 'close'): self.diarization.close() logger.info("AudioProcessor cleanup complete.") @@ -480,10 +577,13 @@ class AudioProcessor: if not message: logger.info("Empty audio message received, initiating stop sequence.") self.is_stopping = True - + if self.transcription_queue: await self.transcription_queue.put(SENTINEL) + if not self.is_pcm_input and self.ffmpeg_manager: + await self.ffmpeg_manager.stop() + return if self.is_stopping: @@ -493,6 +593,17 @@ class AudioProcessor: if self.is_pcm_input: self.pcm_buffer.extend(message) await self.handle_pcm_data() + else: + if not self.ffmpeg_manager: + logger.error("FFmpeg manager not initialized for non-PCM input.") + return + success = await self.ffmpeg_manager.write_data(message) + if not success: + ffmpeg_state = await self.ffmpeg_manager.get_state() + if ffmpeg_state == FFmpegState.FAILED: + logger.error("FFmpeg is in FAILED state, cannot process audio") + else: + logger.warning("Failed to write audio data to FFmpeg") async def handle_pcm_data(self): # Process when enough data diff --git a/whisperlivekit/basic_server.py b/whisperlivekit/basic_server.py index cd65829..c246021 100644 --- a/whisperlivekit/basic_server.py +++ b/whisperlivekit/basic_server.py @@ -72,6 +72,11 @@ async def websocket_endpoint(websocket: WebSocket): ) await websocket.accept() logger.info("WebSocket connection opened.") + + try: + await websocket.send_json({"type": "config", "useAudioWorklet": bool(args.pcm_input)}) + except Exception as e: + logger.warning(f"Failed to send config to client: {e}") results_generator = await audio_processor.create_tasks() websocket_task = asyncio.create_task(handle_websocket_results(websocket, results_generator)) diff --git a/whisperlivekit/ffmpeg_manager.py b/whisperlivekit/ffmpeg_manager.py new file mode 100644 index 0000000..1122fdd --- /dev/null +++ b/whisperlivekit/ffmpeg_manager.py @@ -0,0 +1,195 @@ +import asyncio +import logging +from enum import Enum +from typing import Optional, Callable +import contextlib + +logger = logging.getLogger(__name__) +logging.basicConfig(level=logging.INFO) + +ERROR_INSTALL_INSTRUCTIONS = """ +FFmpeg is not installed or not found in your system's PATH. +Please install FFmpeg to enable audio processing. + +Installation instructions: + +# Ubuntu/Debian: +sudo apt update && sudo apt install ffmpeg + +# macOS (using Homebrew): +brew install ffmpeg + +# Windows: +# 1. Download the latest static build from https://ffmpeg.org/download.html +# 2. Extract the archive (e.g., to C:\\FFmpeg). +# 3. Add the 'bin' directory (e.g., C:\\FFmpeg\\bin) to your system's PATH environment variable. + +After installation, please restart the application. +""" + +class FFmpegState(Enum): + STOPPED = "stopped" + STARTING = "starting" + RUNNING = "running" + RESTARTING = "restarting" + FAILED = "failed" + +class FFmpegManager: + def __init__(self, sample_rate: int = 16000, channels: int = 1): + self.sample_rate = sample_rate + self.channels = channels + + self.process: Optional[asyncio.subprocess.Process] = None + self._stderr_task: Optional[asyncio.Task] = None + + self.on_error_callback: Optional[Callable[[str], None]] = None + + self.state = FFmpegState.STOPPED + self._state_lock = asyncio.Lock() + + async def start(self) -> bool: + async with self._state_lock: + if self.state != FFmpegState.STOPPED: + logger.warning(f"FFmpeg already running in state: {self.state}") + return False + self.state = FFmpegState.STARTING + + try: + cmd = [ + "ffmpeg", + "-hide_banner", + "-loglevel", "error", + "-i", "pipe:0", + "-f", "s16le", + "-acodec", "pcm_s16le", + "-ac", str(self.channels), + "-ar", str(self.sample_rate), + "pipe:1" + ] + + self.process = await asyncio.create_subprocess_exec( + *cmd, + stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + + self._stderr_task = asyncio.create_task(self._drain_stderr()) + + async with self._state_lock: + self.state = FFmpegState.RUNNING + + logger.info("FFmpeg started.") + return True + + except FileNotFoundError: + logger.error(ERROR_INSTALL_INSTRUCTIONS) + async with self._state_lock: + self.state = FFmpegState.FAILED + if self.on_error_callback: + await self.on_error_callback("ffmpeg_not_found") + return False + + except Exception as e: + logger.error(f"Error starting FFmpeg: {e}") + async with self._state_lock: + self.state = FFmpegState.FAILED + if self.on_error_callback: + await self.on_error_callback("start_failed") + return False + + async def stop(self): + async with self._state_lock: + if self.state == FFmpegState.STOPPED: + return + self.state = FFmpegState.STOPPED + + if self.process: + if self.process.stdin and not self.process.stdin.is_closing(): + self.process.stdin.close() + await self.process.stdin.wait_closed() + await self.process.wait() + self.process = None + + if self._stderr_task: + self._stderr_task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await self._stderr_task + + logger.info("FFmpeg stopped.") + + async def write_data(self, data: bytes) -> bool: + async with self._state_lock: + if self.state != FFmpegState.RUNNING: + logger.warning(f"Cannot write, FFmpeg state: {self.state}") + return False + + try: + self.process.stdin.write(data) + await self.process.stdin.drain() + return True + except Exception as e: + logger.error(f"Error writing to FFmpeg: {e}") + if self.on_error_callback: + await self.on_error_callback("write_error") + return False + + async def read_data(self, size: int) -> Optional[bytes]: + async with self._state_lock: + if self.state != FFmpegState.RUNNING: + logger.warning(f"Cannot read, FFmpeg state: {self.state}") + return None + + try: + data = await asyncio.wait_for( + self.process.stdout.read(size), + timeout=20.0 + ) + return data + except asyncio.TimeoutError: + logger.warning("FFmpeg read timeout.") + return None + except Exception as e: + logger.error(f"Error reading from FFmpeg: {e}") + if self.on_error_callback: + await self.on_error_callback("read_error") + return None + + async def get_state(self) -> FFmpegState: + async with self._state_lock: + return self.state + + async def restart(self) -> bool: + async with self._state_lock: + if self.state == FFmpegState.RESTARTING: + logger.warning("Restart already in progress.") + return False + self.state = FFmpegState.RESTARTING + + logger.info("Restarting FFmpeg...") + + try: + await self.stop() + await asyncio.sleep(1) # short delay before restarting + return await self.start() + except Exception as e: + logger.error(f"Error during FFmpeg restart: {e}") + async with self._state_lock: + self.state = FFmpegState.FAILED + if self.on_error_callback: + await self.on_error_callback("restart_failed") + return False + + async def _drain_stderr(self): + try: + while True: + if not self.process or not self.process.stderr: + break + line = await self.process.stderr.readline() + if not line: + break + logger.debug(f"FFmpeg stderr: {line.decode(errors='ignore').strip()}") + except asyncio.CancelledError: + logger.info("FFmpeg stderr drain task cancelled.") + except Exception as e: + logger.error(f"Error draining FFmpeg stderr: {e}") diff --git a/whisperlivekit/parse_args.py b/whisperlivekit/parse_args.py index 55d4173..30071c0 100644 --- a/whisperlivekit/parse_args.py +++ b/whisperlivekit/parse_args.py @@ -177,7 +177,7 @@ def parse_args(): "--pcm-input", action="store_true", default=False, - help="If set, raw PCM (s16le) data is expected as input and FFmpeg will be bypassed." + help="If set, raw PCM (s16le) data is expected as input and FFmpeg will be bypassed. Frontend will use AudioWorklet/PCM input from the browser instead of MediaRecorder/FFmpeg on the server." ) # SimulStreaming-specific arguments simulstreaming_group = parser.add_argument_group('SimulStreaming arguments (only used with --backend simulstreaming)') diff --git a/whisperlivekit/web/live_transcription.js b/whisperlivekit/web/live_transcription.js index e649912..714e6a8 100644 --- a/whisperlivekit/web/live_transcription.js +++ b/whisperlivekit/web/live_transcription.js @@ -22,6 +22,9 @@ let lastReceivedData = null; let lastSignature = null; let availableMicrophones = []; let selectedMicrophoneId = null; +let serverUseAudioWorklet = null; +let configReadyResolve; +const configReady = new Promise((r) => (configReadyResolve = r)); waveCanvas.width = 60 * (window.devicePixelRatio || 1); waveCanvas.height = 30 * (window.devicePixelRatio || 1); @@ -228,6 +231,14 @@ function setupWebSocket() { websocket.onmessage = (event) => { const data = JSON.parse(event.data); + if (data.type === "config") { + serverUseAudioWorklet = !!data.useAudioWorklet; + statusText.textContent = serverUseAudioWorklet + ? "Connected. Using AudioWorklet (PCM)." + : "Connected. Using MediaRecorder (WebM)."; + if (configReadyResolve) configReadyResolve(); + return; + } if (data.type === "ready_to_stop") { console.log("Ready to stop received, finalizing display and closing WebSocket."); @@ -459,38 +470,54 @@ async function startRecording() { microphone = audioContext.createMediaStreamSource(stream); microphone.connect(analyser); - if (!audioContext.audioWorklet) { - throw new Error("AudioWorklet is not supported in this browser"); - } - await audioContext.audioWorklet.addModule("/web/pcm_worklet.js"); - workletNode = new AudioWorkletNode(audioContext, "pcm-forwarder", { numberOfInputs: 1, numberOfOutputs: 0, channelCount: 1 }); - microphone.connect(workletNode); - - recorderWorker = new Worker("/web/recorder_worker.js"); - recorderWorker.postMessage({ - command: "init", - config: { - sampleRate: audioContext.sampleRate, - }, - }); - - recorderWorker.onmessage = (e) => { - if (websocket && websocket.readyState === WebSocket.OPEN) { - websocket.send(e.data.buffer); + if (serverUseAudioWorklet) { + if (!audioContext.audioWorklet) { + throw new Error("AudioWorklet is not supported in this browser"); } - }; + await audioContext.audioWorklet.addModule("/web/pcm_worklet.js"); + workletNode = new AudioWorkletNode(audioContext, "pcm-forwarder", { numberOfInputs: 1, numberOfOutputs: 0, channelCount: 1 }); + microphone.connect(workletNode); - workletNode.port.onmessage = (e) => { - const data = e.data; - const ab = data instanceof ArrayBuffer ? data : data.buffer; - recorderWorker.postMessage( - { - command: "record", - buffer: ab, + recorderWorker = new Worker("/web/recorder_worker.js"); + recorderWorker.postMessage({ + command: "init", + config: { + sampleRate: audioContext.sampleRate, }, - [ab] - ); - }; + }); + + recorderWorker.onmessage = (e) => { + if (websocket && websocket.readyState === WebSocket.OPEN) { + websocket.send(e.data.buffer); + } + }; + + workletNode.port.onmessage = (e) => { + const data = e.data; + const ab = data instanceof ArrayBuffer ? data : data.buffer; + recorderWorker.postMessage( + { + command: "record", + buffer: ab, + }, + [ab] + ); + }; + } else { + try { + recorder = new MediaRecorder(stream, { mimeType: "audio/webm" }); + } catch (e) { + recorder = new MediaRecorder(stream); + } + recorder.ondataavailable = (e) => { + if (websocket && websocket.readyState === WebSocket.OPEN) { + if (e.data && e.data.size > 0) { + websocket.send(e.data); + } + } + }; + recorder.start(chunkDuration); + } startTime = Date.now(); timerInterval = setInterval(updateTimer, 1000); @@ -528,6 +555,14 @@ async function stopRecording() { statusText.textContent = "Recording stopped. Processing final audio..."; } + if (recorder) { + try { + recorder.stop(); + } catch (e) { + } + recorder = null; + } + if (recorderWorker) { recorderWorker.terminate(); recorderWorker = null; @@ -586,9 +621,11 @@ async function toggleRecording() { console.log("Connecting to WebSocket"); try { if (websocket && websocket.readyState === WebSocket.OPEN) { + await configReady; await startRecording(); } else { await setupWebSocket(); + await configReady; await startRecording(); } } catch (err) {