From 2dd974ade0644f5f73b5682a915d8e73f3b7e45c Mon Sep 17 00:00:00 2001 From: Quentin Fuxa Date: Mon, 5 May 2025 09:30:18 +0200 Subject: [PATCH] Add support for PyAudioWPatch audio input on Windows - Updated README.md to include installation instructions for PyAudioWPatch. - Modified setup.py to add PyAudioWPatch as an optional dependency. - Enhanced audio_processor.py to initialize and handle PyAudioWPatch for system audio capture. - Updated basic_server.py to manage audio input modes and integrate PyAudioWPatch processing. - Refactored core.py to include audio input argument parsing. --- README.md | 23 +++- setup.py | 1 + whisperlivekit/audio_processor.py | 201 ++++++++++++++++++++++++++---- whisperlivekit/basic_server.py | 142 +++++++++++++++++---- whisperlivekit/core.py | 54 +++++--- 5 files changed, 348 insertions(+), 73 deletions(-) diff --git a/README.md b/README.md index 1a0c1e5..695cc66 100644 --- a/README.md +++ b/README.md @@ -112,6 +112,9 @@ pip install whisperlivekit[whisper] # Original Whisper pip install whisperlivekit[whisper-timestamped] # Improved timestamps pip install whisperlivekit[mlx-whisper] # Apple Silicon optimization pip install whisperlivekit[openai] # OpenAI API + +# System audio capture (Windows only) +pip install whisperlivekit[pyaudiowpatch] # Use PyAudioWPatch for system audio loopback ``` ### 🎹 Pyannote Models Setup @@ -139,6 +142,9 @@ whisperlivekit-server --model tiny.en # Advanced configuration with diarization whisperlivekit-server --host 0.0.0.0 --port 8000 --model medium --diarization --language auto + +# Using PyAudioWPatch for system audio input (Windows only) +whisperlivekit-server --model tiny.en --audio-input pyaudiowpatch ``` ### Python API Integration (Backend) @@ -209,6 +215,7 @@ WhisperLiveKit offers extensive configuration options: | `--no-vad` | Disable Voice Activity Detection | `False` | | `--buffer_trimming` | Buffer trimming strategy (`sentence` or `segment`) | `segment` | | `--warmup-file` | Audio file path for model warmup | `jfk.wav` | +| `--audio-input` | Source of audio (`websocket` or `pyaudiowpatch`) | `websocket` | | `--ssl-certfile` | Path to the SSL certificate file (for HTTPS support) | `None` | | `--ssl-keyfile` | Path to the SSL private key file (for HTTPS support) | `None` | @@ -218,12 +225,16 @@ WhisperLiveKit offers extensive configuration options: WhisperLiveKit in Action

-1. **Audio Capture**: Browser's MediaRecorder API captures audio in webm/opus format -2. **Streaming**: Audio chunks are sent to the server via WebSocket -3. **Processing**: Server decodes audio with FFmpeg and streams into Whisper for transcription -4. **Real-time Output**: - - Partial transcriptions appear immediately in light gray (the 'aperçu') - - Finalized text appears in normal color +1. **Audio Input**: + - **WebSocket (Default)**: Browser's MediaRecorder API captures audio (webm/opus), streams via WebSocket. + - **PyAudioWPatch (Windows Only)**: Captures system audio output directly using WASAPI loopback. Requires `--audio-input pyaudiowpatch`. +2. **Processing**: + - **WebSocket**: Server decodes webm/opus audio with FFmpeg. + - **PyAudioWPatch**: Server receives raw PCM audio directly. + - Audio is streamed into Whisper for transcription. +3. **Real-time Output**: + - Partial transcriptions appear immediately in light gray (the 'aperçu'). + - Finalized text appears in normal color. - (When enabled) Different speakers are identified and highlighted ## 🚀 Deployment Guide diff --git a/setup.py b/setup.py index 2c95878..0baec49 100644 --- a/setup.py +++ b/setup.py @@ -25,6 +25,7 @@ setup( "whisper-timestamped": ["whisper-timestamped"], "mlx-whisper": ["mlx-whisper"], "openai": ["openai"], + "pyaudiowpatch": ["PyAudioWPatch"], }, package_data={ 'whisperlivekit': ['web/*.html'], diff --git a/whisperlivekit/audio_processor.py b/whisperlivekit/audio_processor.py index 2fc5ce5..a43911a 100644 --- a/whisperlivekit/audio_processor.py +++ b/whisperlivekit/audio_processor.py @@ -2,6 +2,14 @@ import asyncio import numpy as np import ffmpeg from time import time, sleep +import platform # To check OS + +try: + import pyaudiowpatch as pyaudio + PYAUDIOWPATCH_AVAILABLE = True +except ImportError: + pyaudio = None + PYAUDIOWPATCH_AVAILABLE = False import math import logging import traceback @@ -13,7 +21,6 @@ from whisperlivekit.core import WhisperLiveKit # Set up logging once logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) -logger.setLevel(logging.DEBUG) def format_time(seconds: float) -> str: """Format seconds as HH:MM:SS.""" @@ -58,18 +65,80 @@ class AudioProcessor: self.asr = models.asr self.tokenizer = models.tokenizer self.diarization = models.diarization - self.ffmpeg_process = self.start_ffmpeg_decoder() self.transcription_queue = asyncio.Queue() if self.args.transcription else None self.diarization_queue = asyncio.Queue() if self.args.diarization else None self.pcm_buffer = bytearray() + self.ffmpeg_process = None + self.pyaudio_instance = None + self.pyaudio_stream = None + + # Initialize audio input based on args + if self.args.audio_input == "websocket": + self.ffmpeg_process = self.start_ffmpeg_decoder() + elif self.args.audio_input == "pyaudiowpatch": + if not PYAUDIOWPATCH_AVAILABLE: + logger.error("PyAudioWPatch selected but not installed. Please install it: pip install whisperlivekit[pyaudiowpatch]") + raise ImportError("PyAudioWPatch not found.") + if platform.system() != "Windows": + logger.error("PyAudioWPatch is only supported on Windows.") + raise OSError("PyAudioWPatch requires Windows.") + self.initialize_pyaudiowpatch() + else: + raise ValueError(f"Unsupported audio input type: {self.args.audio_input}") # Initialize transcription engine if enabled if self.args.transcription: self.online = online_factory(self.args, models.asr, models.tokenizer) + def initialize_pyaudiowpatch(self): + """Initialize PyAudioWPatch for audio input.""" + logger.info("Initializing PyAudioWPatch...") + try: + self.pyaudio_instance = pyaudio.PyAudio() + # Find the default WASAPI loopback device + wasapi_info = self.pyaudio_instance.get_host_api_info_by_type(pyaudio.paWASAPI) + default_speakers = self.pyaudio_instance.get_device_info_by_index(wasapi_info["defaultOutputDevice"]) + + if not default_speakers["isLoopbackDevice"]: + for loopback in self.pyaudio_instance.get_loopback_device_info_generator(): + if default_speakers["name"] in loopback["name"]: + default_speakers = loopback + break + else: + logger.error("Default loopback output device not found.") + raise OSError("Default loopback output device not found.") + + logger.info(f"Using loopback device: {default_speakers['name']}") + self.pyaudio_stream = self.pyaudio_instance.open( + format=pyaudio.paInt16, + channels=default_speakers["maxInputChannels"], + rate=int(default_speakers["defaultSampleRate"]), + input=True, + input_device_index=default_speakers["index"], + frames_per_buffer=int(self.sample_rate * self.args.min_chunk_size) + ) + self.sample_rate = int(default_speakers["defaultSampleRate"]) + self.channels = default_speakers["maxInputChannels"] + self.samples_per_sec = int(self.sample_rate * self.args.min_chunk_size) + self.bytes_per_sample = 2 + self.bytes_per_sec = self.samples_per_sec * self.bytes_per_sample + logger.info(f"PyAudioWPatch initialized with {self.channels} channels and {self.sample_rate} Hz sample rate.") + + except Exception as e: + logger.error(f"Failed to initialize PyAudioWPatch: {e}") + logger.error(traceback.format_exc()) + if self.pyaudio_instance: + self.pyaudio_instance.terminate() + raise + def convert_pcm_to_float(self, pcm_buffer): """Convert PCM buffer in s16le format to normalized NumPy array.""" - return np.frombuffer(pcm_buffer, dtype=np.int16).astype(np.float32) / 32768.0 + if isinstance(pcm_buffer, (bytes, bytearray)): + return np.frombuffer(pcm_buffer, dtype=np.int16).astype(np.float32) / 32768.0 + else: + logger.error(f"Invalid buffer type for PCM conversion: {type(pcm_buffer)}") + return np.array([], dtype=np.float32) + def start_ffmpeg_decoder(self): """Start FFmpeg process for WebM to PCM conversion.""" @@ -125,6 +194,45 @@ class AudioProcessor: logger.critical(f"Failed to restart FFmpeg process on second attempt: {e2}") logger.critical(traceback.format_exc()) + async def pyaudiowpatch_reader(self): + """Read audio data from PyAudioWPatch stream and process it.""" + logger.info("Starting PyAudioWPatch reader task.") + loop = asyncio.get_event_loop() + + while True: + try: + chunk = await loop.run_in_executor( + None, + self.pyaudio_stream.read, + int(self.sample_rate * self.args.min_chunk_size), + False + ) + + if not chunk: + logger.info("PyAudioWPatch stream closed or read empty chunk.") + await asyncio.sleep(0.1) + continue + + pcm_array = self.convert_pcm_to_float(chunk) + + if self.args.diarization and self.diarization_queue: + await self.diarization_queue.put(pcm_array.copy()) + + if self.args.transcription and self.transcription_queue: + await self.transcription_queue.put(pcm_array.copy()) + + except OSError as e: + logger.error(f"PyAudioWPatch stream error: {e}") + logger.error(traceback.format_exc()) + break + except Exception as e: + logger.error(f"Exception in pyaudiowpatch_reader: {e}") + logger.error(traceback.format_exc()) + await asyncio.sleep(1) # Wait before retrying or breaking + break + logger.info("PyAudioWPatch reader task finished.") + + async def update_transcription(self, new_tokens, buffer, end_buffer, full_transcription, sep): """Thread-safe update of transcription with new data.""" async with self.lock: @@ -411,12 +519,15 @@ class AudioProcessor: tasks = [] if self.args.transcription and self.online: tasks.append(asyncio.create_task(self.transcription_processor())) - + if self.args.diarization and self.diarization: - tasks.append(asyncio.create_task(self.diarization_processor(self.diarization))) - - tasks.append(asyncio.create_task(self.ffmpeg_stdout_reader())) - + tasks.append(asyncio.create_task(self.diarization_processor(self.diarization))) # Corrected indentation + + if self.args.audio_input == "websocket": + tasks.append(asyncio.create_task(self.ffmpeg_stdout_reader())) + elif self.args.audio_input == "pyaudiowpatch": + tasks.append(asyncio.create_task(self.pyaudiowpatch_reader())) + # Monitor overall system health async def watchdog(): while True: @@ -431,18 +542,23 @@ class AudioProcessor: task_name = task.get_name() if hasattr(task, 'get_name') else f"Task {i}" logger.error(f"{task_name} unexpectedly completed with exception: {exc}") - # Check for FFmpeg process health with shorter thresholds - ffmpeg_idle_time = current_time - self.last_ffmpeg_activity - if ffmpeg_idle_time > 15: # 15 seconds instead of 180 - logger.warning(f"FFmpeg idle for {ffmpeg_idle_time:.2f}s - may need attention") - - # Force restart after 30 seconds of inactivity (instead of 600) - if ffmpeg_idle_time > 30: - logger.error("FFmpeg idle for too long, forcing restart") - await self.restart_ffmpeg() - + if self.args.audio_input == "websocket": + ffmpeg_idle_time = current_time - self.last_ffmpeg_activity + if ffmpeg_idle_time > 15: # 15 seconds instead of 180 + logger.warning(f"FFmpeg idle for {ffmpeg_idle_time:.2f}s - may need attention") + + # Force restart after 30 seconds of inactivity (instead of 600) + if ffmpeg_idle_time > 30: + logger.error("FFmpeg idle for too long, forcing restart") + await self.restart_ffmpeg() + + elif self.args.audio_input == "pyaudiowpatch": + if self.pyaudio_stream and not self.pyaudio_stream.is_active(): + logger.warning("PyAudioWPatch stream is not active. Attempting to restart or handle.") + except Exception as e: logger.error(f"Error in watchdog task: {e}") + logger.error(traceback.format_exc()) tasks.append(asyncio.create_task(watchdog())) self.tasks = tasks @@ -456,10 +572,22 @@ class AudioProcessor: try: await asyncio.gather(*self.tasks, return_exceptions=True) - self.ffmpeg_process.stdin.close() - self.ffmpeg_process.wait() + if self.args.audio_input == "websocket" and self.ffmpeg_process: + if self.ffmpeg_process.stdin: + self.ffmpeg_process.stdin.close() + if self.ffmpeg_process.poll() is None: + self.ffmpeg_process.wait() + elif self.args.audio_input == "pyaudiowpatch": + if self.pyaudio_stream: + self.pyaudio_stream.stop_stream() + self.pyaudio_stream.close() + logger.info("PyAudioWPatch stream closed.") + if self.pyaudio_instance: + self.pyaudio_instance.terminate() + logger.info("PyAudioWPatch instance terminated.") except Exception as e: logger.warning(f"Error during cleanup: {e}") + logger.warning(traceback.format_exc()) if self.args.diarization and hasattr(self, 'diarization'): self.diarization.close() @@ -474,14 +602,37 @@ class AudioProcessor: if not hasattr(self, '_last_heartbeat') or current_time - self._last_heartbeat >= 10: logger.debug(f"Processing audio chunk, last FFmpeg activity: {current_time - self.last_ffmpeg_activity:.2f}s ago") self._last_heartbeat = current_time - + + if self.args.audio_input != "websocket": + # logger.debug("Audio input is not WebSocket, skipping process_audio.") + return # Do nothing if input is not WebSocket + while retry_count < max_retries: try: - if not self.ffmpeg_process or not hasattr(self.ffmpeg_process, 'stdin') or self.ffmpeg_process.poll() is not None: - logger.warning("FFmpeg process not available, restarting...") + + if not self.ffmpeg_process or self.ffmpeg_process.poll() is not None: + logger.warning("FFmpeg process not running or unavailable, attempting restart...") await self.restart_ffmpeg() - - loop = asyncio.get_running_loop() + + if not self.ffmpeg_process or self.ffmpeg_process.poll() is not None: + logger.error("FFmpeg restart failed or process terminated immediately.") + # maybe raise an error or break after retries + await asyncio.sleep(1) + retry_count += 1 + continue + + # Ensure stdin is available + if not hasattr(self.ffmpeg_process, 'stdin') or self.ffmpeg_process.stdin.closed: + logger.warning("FFmpeg stdin is not available or closed. Restarting...") + await self.restart_ffmpeg() + if not hasattr(self.ffmpeg_process, 'stdin') or self.ffmpeg_process.stdin.closed: + logger.error("FFmpeg stdin still unavailable after restart.") + await asyncio.sleep(1) + retry_count += 1 + continue + + + loop = asyncio.get_running_loop() try: await asyncio.wait_for( loop.run_in_executor(None, lambda: self.ffmpeg_process.stdin.write(message)), diff --git a/whisperlivekit/basic_server.py b/whisperlivekit/basic_server.py index 87e2409..e6480f5 100644 --- a/whisperlivekit/basic_server.py +++ b/whisperlivekit/basic_server.py @@ -3,27 +3,47 @@ from fastapi import FastAPI, WebSocket, WebSocketDisconnect from fastapi.responses import HTMLResponse from fastapi.middleware.cors import CORSMiddleware -from whisperlivekit import WhisperLiveKit, parse_args +from whisperlivekit import WhisperLiveKit, get_parsed_args from whisperlivekit.audio_processor import AudioProcessor import asyncio import logging import os, sys -import argparse logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logging.getLogger().setLevel(logging.WARNING) logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) -kit = None - @asynccontextmanager async def lifespan(app: FastAPI): - global kit + logger.info("Starting up...") kit = WhisperLiveKit() + app.state.kit = kit + logger.info(f"Audio Input mode: {kit.args.audio_input}") + + audio_processor = AudioProcessor() + app.state.audio_processor = audio_processor + app.state.results_generator = None # Initialize + + if kit.args.audio_input == "pyaudiowpatch": + logger.info("Starting PyAudioWPatch processing tasks...") + try: + app.state.results_generator = await audio_processor.create_tasks() + except Exception as e: + logger.critical(f"Failed to start PyAudioWPatch processing: {e}", exc_info=True) + else: + logger.info("WebSocket input mode selected. Processing will start on client connection.") + yield + logger.info("Shutting down...") + if hasattr(app.state, 'audio_processor') and app.state.audio_processor: + logger.info("Cleaning up AudioProcessor...") + await app.state.audio_processor.cleanup() + logger.info("Shutdown complete.") + + app = FastAPI(lifespan=lifespan) app.add_middleware( CORSMiddleware, @@ -36,10 +56,10 @@ app.add_middleware( @app.get("/") async def get(): - return HTMLResponse(kit.web_interface()) + return HTMLResponse(app.state.kit.web_interface()) -async def handle_websocket_results(websocket, results_generator): +async def handle_websocket_results(websocket: WebSocket, results_generator): """Consumes results from the audio processor and sends them via WebSocket.""" try: async for response in results_generator: @@ -50,37 +70,109 @@ async def handle_websocket_results(websocket, results_generator): @app.websocket("/asr") async def websocket_endpoint(websocket: WebSocket): - audio_processor = AudioProcessor() - await websocket.accept() - logger.info("WebSocket connection opened.") - - results_generator = await audio_processor.create_tasks() - websocket_task = asyncio.create_task(handle_websocket_results(websocket, results_generator)) + logger.info("WebSocket connection accepted.") + + audio_processor = app.state.audio_processor + kit_args = app.state.kit.args + results_generator = None + websocket_task = None + receive_task = None try: - while True: - message = await websocket.receive_bytes() - await audio_processor.process_audio(message) + if kit_args.audio_input == "websocket": + logger.info("WebSocket mode: Starting processing tasks for this connection.") + results_generator = await audio_processor.create_tasks() + websocket_task = asyncio.create_task(handle_websocket_results(websocket, results_generator)) + + async def receive_audio(): + try: + while True: + message = await websocket.receive_bytes() + await audio_processor.process_audio(message) + except WebSocketDisconnect: + logger.info("WebSocket disconnected by client (receive_audio).") + except Exception as e: + logger.error(f"Error receiving audio: {e}", exc_info=True) + finally: + logger.debug("Receive audio task finished.") + + + receive_task = asyncio.create_task(receive_audio()) + done, pending = await asyncio.wait( + {websocket_task, receive_task}, + return_when=asyncio.FIRST_COMPLETED, + ) + for task in pending: + task.cancel() # Cancel the other task + + elif kit_args.audio_input == "pyaudiowpatch": + logger.info("PyAudioWPatch mode: Streaming existing results.") + results_generator = app.state.results_generator + if results_generator is None: + logger.error("PyAudioWPatch results generator not available. Was startup successful?") + await websocket.close(code=1011, reason="Server error: Audio processing not started.") + return + + websocket_task = asyncio.create_task(handle_websocket_results(websocket, results_generator)) + await websocket_task + + else: + logger.error(f"Unsupported audio input mode configured: {kit_args.audio_input}") + await websocket.close(code=1011, reason="Server configuration error.") + except WebSocketDisconnect: - logger.warning("WebSocket disconnected.") + logger.info("WebSocket disconnected by client.") + except Exception as e: + logger.error(f"Error in WebSocket endpoint: {e}", exc_info=True) + # Attempt to close gracefully + try: + await websocket.close(code=1011, reason=f"Server error: {e}") + except Exception: + pass # Ignore errors during close after another error finally: - websocket_task.cancel() - await audio_processor.cleanup() - logger.info("WebSocket endpoint cleaned up.") + logger.info("Cleaning up WebSocket connection...") + if websocket_task and not websocket_task.done(): + websocket_task.cancel() + if receive_task and not receive_task.done(): + receive_task.cancel() + + if kit_args.audio_input == "websocket": + pass + + logger.info("WebSocket connection closed.") def main(): """Entry point for the CLI command.""" import uvicorn - - args = parse_args() - + + # Get the globally parsed arguments + args = get_parsed_args() + + # Set logger level based on args + log_level_name = args.log_level.upper() + # Ensure the level name is valid for the logging module + numeric_level = getattr(logging, log_level_name, None) + if not isinstance(numeric_level, int): + logging.warning(f"Invalid log level: {args.log_level}. Defaulting to INFO.") + numeric_level = logging.INFO + logging.getLogger().setLevel(numeric_level) # Set root logger level + # Set our specific logger level too + logger.setLevel(numeric_level) + logger.info(f"Log level set to: {log_level_name}") + + # Determine uvicorn log level (map CRITICAL to critical, etc.) + uvicorn_log_level = log_level_name.lower() + if uvicorn_log_level == "debug": # Uvicorn uses 'trace' for more verbose than debug + uvicorn_log_level = "trace" + + uvicorn_kwargs = { "app": "whisperlivekit.basic_server:app", "host":args.host, - "port":args.port, + "port":args.port, "reload": False, - "log_level": "info", + "log_level": uvicorn_log_level, "lifespan": "on", } diff --git a/whisperlivekit/core.py b/whisperlivekit/core.py index d6f90cf..b05cb26 100644 --- a/whisperlivekit/core.py +++ b/whisperlivekit/core.py @@ -1,10 +1,13 @@ +import sys +from argparse import Namespace, ArgumentParser try: from whisperlivekit.whisper_streaming_custom.whisper_online import backend_factory, warmup_asr except ImportError: - from .whisper_streaming_custom.whisper_online import backend_factory, warmup_asr -from argparse import Namespace, ArgumentParser + if '.' not in sys.path: + sys.path.insert(0, '.') + from whisperlivekit.whisper_streaming_custom.whisper_online import backend_factory, warmup_asr -def parse_args(): +def _parse_args_internal(): parser = ArgumentParser(description="Whisper FastAPI Online Server") parser.add_argument( "--host", @@ -130,38 +133,55 @@ def parse_args(): help="Set the log level", default="DEBUG", ) + parser.add_argument( + "--audio-input", + type=str, + default="websocket", + choices=["websocket", "pyaudiowpatch"], + help="Source of the audio input. 'websocket' expects audio via WebSocket (default). 'pyaudiowpatch' uses PyAudioWPatch to capture system audio output.", + ) parser.add_argument("--ssl-certfile", type=str, help="Path to the SSL certificate file.", default=None) parser.add_argument("--ssl-keyfile", type=str, help="Path to the SSL private key file.", default=None) args = parser.parse_args() - + args.transcription = not args.no_transcription - args.vad = not args.no_vad + args.vad = not args.no_vad delattr(args, 'no_transcription') delattr(args, 'no_vad') - + return args +_cli_args = _parse_args_internal() + +def get_parsed_args() -> Namespace: + """Returns the globally parsed command-line arguments.""" + return _cli_args + +# --- WhisperLiveKit Class --- class WhisperLiveKit: _instance = None _initialized = False - - def __new__(cls, *args, **kwargs): + + def __new__(cls, args: Namespace = None, **kwargs): if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance - - def __init__(self, **kwargs): + + def __init__(self, args: Namespace = None, **kwargs): + """ + Initializes WhisperLiveKit. + + Args: + args (Namespace, optional): Pre-parsed arguments. If None, uses globally parsed args. + Defaults to None. + **kwargs: Additional keyword arguments (currently not used directly but captured). + """ if WhisperLiveKit._initialized: return - - default_args = vars(parse_args()) - - merged_args = {**default_args, **kwargs} - - self.args = Namespace(**merged_args) - + + self.args = args if args is not None else get_parsed_args() self.asr = None self.tokenizer = None self.diarization = None