mirror of
https://github.com/QuentinFuxa/WhisperLiveKit.git
synced 2026-04-28 09:30:05 +00:00
ffmpeg is managed in a thread in FFmpegManager to prevent the all from crashing when an error occurs
This commit is contained in:
@@ -1,6 +1,5 @@
|
||||
import asyncio
|
||||
import numpy as np
|
||||
import ffmpeg
|
||||
from time import time, sleep
|
||||
import math
|
||||
import logging
|
||||
@@ -9,6 +8,7 @@ from datetime import timedelta
|
||||
from whisperlivekit.timed_objects import ASRToken
|
||||
from whisperlivekit.whisper_streaming_custom.whisper_online import online_factory
|
||||
from whisperlivekit.core import TranscriptionEngine
|
||||
from whisperlivekit.ffmpeg_manager import FFmpegManager, FFmpegState
|
||||
|
||||
# Set up logging once
|
||||
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
|
||||
@@ -64,7 +64,19 @@ class AudioProcessor:
|
||||
self.asr = models.asr
|
||||
self.tokenizer = models.tokenizer
|
||||
self.diarization = models.diarization
|
||||
self.ffmpeg_process = self.start_ffmpeg_decoder()
|
||||
|
||||
self.ffmpeg_manager = FFmpegManager(
|
||||
sample_rate=self.sample_rate,
|
||||
channels=self.channels
|
||||
)
|
||||
|
||||
async def handle_ffmpeg_error(error_type: str):
|
||||
logger.error(f"FFmpeg error: {error_type}")
|
||||
self._ffmpeg_error = error_type
|
||||
|
||||
self.ffmpeg_manager.on_error_callback = handle_ffmpeg_error
|
||||
self._ffmpeg_error = None
|
||||
|
||||
self.transcription_queue = asyncio.Queue() if self.args.transcription else None
|
||||
self.diarization_queue = asyncio.Queue() if self.args.diarization else None
|
||||
self.pcm_buffer = bytearray()
|
||||
@@ -84,83 +96,6 @@ class AudioProcessor:
|
||||
"""Convert PCM buffer in s16le format to normalized NumPy array."""
|
||||
return np.frombuffer(pcm_buffer, dtype=np.int16).astype(np.float32) / 32768.0
|
||||
|
||||
def start_ffmpeg_decoder(self):
|
||||
"""Start FFmpeg process for WebM to PCM conversion."""
|
||||
try:
|
||||
return (ffmpeg.input("pipe:0", format="webm")
|
||||
.output("pipe:1", format="s16le", acodec="pcm_s16le",
|
||||
ac=self.channels, ar=str(self.sample_rate))
|
||||
.run_async(pipe_stdin=True, pipe_stdout=True, pipe_stderr=True))
|
||||
except FileNotFoundError:
|
||||
error = """
|
||||
FFmpeg is not installed or not found in your system's PATH.
|
||||
Please install FFmpeg to enable audio processing.
|
||||
|
||||
Installation instructions:
|
||||
|
||||
# Ubuntu/Debian:
|
||||
sudo apt update && sudo apt install ffmpeg
|
||||
|
||||
# macOS (using Homebrew):
|
||||
brew install ffmpeg
|
||||
|
||||
# Windows:
|
||||
# 1. Download the latest static build from https://ffmpeg.org/download.html
|
||||
# 2. Extract the archive (e.g., to C:\\FFmpeg).
|
||||
# 3. Add the 'bin' directory (e.g., C:\\FFmpeg\\bin) to your system's PATH environment variable.
|
||||
|
||||
After installation, please restart the application.
|
||||
"""
|
||||
logger.error(error)
|
||||
raise FileNotFoundError(error)
|
||||
|
||||
async def restart_ffmpeg(self):
|
||||
"""Restart the FFmpeg process after failure."""
|
||||
logger.warning("Restarting FFmpeg process...")
|
||||
|
||||
if self.ffmpeg_process:
|
||||
try:
|
||||
# we check if process is still running
|
||||
if self.ffmpeg_process.poll() is None:
|
||||
logger.info("Terminating existing FFmpeg process")
|
||||
self.ffmpeg_process.stdin.close()
|
||||
self.ffmpeg_process.terminate()
|
||||
|
||||
# wait for termination with timeout
|
||||
try:
|
||||
await asyncio.wait_for(
|
||||
asyncio.get_event_loop().run_in_executor(None, self.ffmpeg_process.wait),
|
||||
timeout=5.0
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning("FFmpeg process did not terminate, killing forcefully")
|
||||
self.ffmpeg_process.kill()
|
||||
await asyncio.get_event_loop().run_in_executor(None, self.ffmpeg_process.wait)
|
||||
except Exception as e:
|
||||
logger.error(f"Error during FFmpeg process termination: {e}")
|
||||
logger.error(traceback.format_exc())
|
||||
|
||||
# we start new process
|
||||
try:
|
||||
logger.info("Starting new FFmpeg process")
|
||||
self.ffmpeg_process = self.start_ffmpeg_decoder()
|
||||
self.pcm_buffer = bytearray()
|
||||
self.last_ffmpeg_activity = time()
|
||||
logger.info("FFmpeg process restarted successfully")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to restart FFmpeg process: {e}")
|
||||
logger.error(traceback.format_exc())
|
||||
# try again after 5s
|
||||
await asyncio.sleep(5)
|
||||
try:
|
||||
self.ffmpeg_process = self.start_ffmpeg_decoder()
|
||||
self.pcm_buffer = bytearray()
|
||||
self.last_ffmpeg_activity = time()
|
||||
logger.info("FFmpeg process restarted successfully on second attempt")
|
||||
except Exception as e2:
|
||||
logger.critical(f"Failed to restart FFmpeg process on second attempt: {e2}")
|
||||
logger.critical(traceback.format_exc())
|
||||
|
||||
async def update_transcription(self, new_tokens, buffer, end_buffer, full_transcription, sep):
|
||||
"""Thread-safe update of transcription with new data."""
|
||||
async with self.lock:
|
||||
@@ -223,23 +158,35 @@ class AudioProcessor:
|
||||
|
||||
async def ffmpeg_stdout_reader(self):
|
||||
"""Read audio data from FFmpeg stdout and process it."""
|
||||
loop = asyncio.get_event_loop()
|
||||
beg = time()
|
||||
|
||||
while True:
|
||||
try:
|
||||
# Check if FFmpeg is running
|
||||
state = await self.ffmpeg_manager.get_state()
|
||||
if state == FFmpegState.FAILED:
|
||||
logger.error("FFmpeg is in FAILED state, cannot read data")
|
||||
break
|
||||
elif state != FFmpegState.RUNNING:
|
||||
logger.warning(f"FFmpeg is in {state} state, waiting...")
|
||||
await asyncio.sleep(0.5)
|
||||
continue
|
||||
|
||||
current_time = time()
|
||||
elapsed_time = math.floor((current_time - beg) * 10) / 10
|
||||
buffer_size = max(int(32000 * elapsed_time), 4096)
|
||||
beg = current_time
|
||||
|
||||
chunk = await loop.run_in_executor(None, self.ffmpeg_process.stdout.read, buffer_size)
|
||||
if chunk:
|
||||
self.last_ffmpeg_activity = time()
|
||||
chunk = await self.ffmpeg_manager.read_data(buffer_size)
|
||||
|
||||
if not chunk:
|
||||
logger.info("FFmpeg stdout closed, no more data to read.")
|
||||
break
|
||||
if self.is_stopping:
|
||||
logger.info("FFmpeg stdout closed, stopping.")
|
||||
break
|
||||
else:
|
||||
# No data available, but not stopping - FFmpeg might be restarting
|
||||
await asyncio.sleep(0.1)
|
||||
continue
|
||||
|
||||
self.pcm_buffer.extend(chunk)
|
||||
|
||||
@@ -272,7 +219,12 @@ class AudioProcessor:
|
||||
except Exception as e:
|
||||
logger.warning(f"Exception in ffmpeg_stdout_reader: {e}")
|
||||
logger.warning(f"Traceback: {traceback.format_exc()}")
|
||||
break
|
||||
# Try to recover by waiting a bit
|
||||
await asyncio.sleep(1)
|
||||
|
||||
# Check if we should exit
|
||||
if self.is_stopping:
|
||||
break
|
||||
|
||||
logger.info("FFmpeg stdout processing finished. Signaling downstream processors.")
|
||||
if self.args.transcription and self.transcription_queue:
|
||||
@@ -393,6 +345,21 @@ class AudioProcessor:
|
||||
"""Format processing results for output."""
|
||||
while True:
|
||||
try:
|
||||
ffmpeg_state = await self.ffmpeg_manager.get_state()
|
||||
if ffmpeg_state == FFmpegState.FAILED and self._ffmpeg_error:
|
||||
yield {
|
||||
"status": "error",
|
||||
"error": f"FFmpeg error: {self._ffmpeg_error}",
|
||||
"lines": [],
|
||||
"buffer_transcription": "",
|
||||
"buffer_diarization": "",
|
||||
"remaining_time_transcription": 0,
|
||||
"remaining_time_diarization": 0
|
||||
}
|
||||
self._ffmpeg_error = None
|
||||
await asyncio.sleep(1)
|
||||
continue
|
||||
|
||||
# Get current state
|
||||
state = await self.get_current_state()
|
||||
tokens = state["tokens"]
|
||||
@@ -509,6 +476,21 @@ class AudioProcessor:
|
||||
self.all_tasks_for_cleanup = []
|
||||
processing_tasks_for_watchdog = []
|
||||
|
||||
success = await self.ffmpeg_manager.start()
|
||||
if not success:
|
||||
logger.error("Failed to start FFmpeg manager")
|
||||
async def error_generator():
|
||||
yield {
|
||||
"status": "error",
|
||||
"error": "FFmpeg failed to start. Please check that FFmpeg is installed.",
|
||||
"lines": [],
|
||||
"buffer_transcription": "",
|
||||
"buffer_diarization": "",
|
||||
"remaining_time_transcription": 0,
|
||||
"remaining_time_diarization": 0
|
||||
}
|
||||
return error_generator()
|
||||
|
||||
if self.args.transcription and self.online:
|
||||
self.transcription_task = asyncio.create_task(self.transcription_processor())
|
||||
self.all_tasks_for_cleanup.append(self.transcription_task)
|
||||
@@ -534,8 +516,7 @@ class AudioProcessor:
|
||||
while True:
|
||||
try:
|
||||
await asyncio.sleep(10)
|
||||
current_time = time()
|
||||
|
||||
|
||||
for i, task in enumerate(tasks_to_monitor):
|
||||
if task.done():
|
||||
exc = task.exception()
|
||||
@@ -545,12 +526,15 @@ class AudioProcessor:
|
||||
else:
|
||||
logger.info(f"{task_name} completed normally.")
|
||||
|
||||
ffmpeg_idle_time = current_time - self.last_ffmpeg_activity
|
||||
if ffmpeg_idle_time > 10:
|
||||
logger.warning(f"FFmpeg idle for {ffmpeg_idle_time:.2f}s - may need attention.")
|
||||
if ffmpeg_idle_time > 15 and not self.is_stopping:
|
||||
logger.error("FFmpeg idle for too long and not in stopping phase, forcing restart.")
|
||||
await self.restart_ffmpeg()
|
||||
# Check FFmpeg status through the manager
|
||||
ffmpeg_state = await self.ffmpeg_manager.get_state()
|
||||
if ffmpeg_state == FFmpegState.FAILED:
|
||||
logger.error("FFmpeg is in FAILED state, notifying results formatter")
|
||||
# FFmpeg manager will handle its own recovery
|
||||
elif ffmpeg_state == FFmpegState.STOPPED and not self.is_stopping:
|
||||
logger.warning("FFmpeg unexpectedly stopped, attempting restart")
|
||||
await self.ffmpeg_manager.restart()
|
||||
|
||||
except asyncio.CancelledError:
|
||||
logger.info("Watchdog task cancelled.")
|
||||
break
|
||||
@@ -559,7 +543,7 @@ class AudioProcessor:
|
||||
|
||||
async def cleanup(self):
|
||||
"""Clean up resources when processing is complete."""
|
||||
logger.info("Starting cleanup of AudioProcessor resources.")
|
||||
logger.info("Starting cleanup of AudioProcessor resources.")
|
||||
for task in self.all_tasks_for_cleanup:
|
||||
if task and not task.done():
|
||||
task.cancel()
|
||||
@@ -568,26 +552,8 @@ class AudioProcessor:
|
||||
if created_tasks:
|
||||
await asyncio.gather(*created_tasks, return_exceptions=True)
|
||||
logger.info("All processing tasks cancelled or finished.")
|
||||
|
||||
if self.ffmpeg_process:
|
||||
if self.ffmpeg_process.stdin and not self.ffmpeg_process.stdin.closed:
|
||||
try:
|
||||
self.ffmpeg_process.stdin.close()
|
||||
except Exception as e:
|
||||
logger.warning(f"Error closing ffmpeg stdin during cleanup: {e}")
|
||||
|
||||
# Wait for ffmpeg process to terminate
|
||||
if self.ffmpeg_process.poll() is None: # Check if process is still running
|
||||
logger.info("Waiting for FFmpeg process to terminate...")
|
||||
try:
|
||||
# Run wait in executor to avoid blocking async loop
|
||||
await asyncio.get_event_loop().run_in_executor(None, self.ffmpeg_process.wait, 5.0) # 5s timeout
|
||||
except Exception as e: # subprocess.TimeoutExpired is not directly caught by asyncio.wait_for with run_in_executor
|
||||
logger.warning(f"FFmpeg did not terminate gracefully, killing. Error: {e}")
|
||||
self.ffmpeg_process.kill()
|
||||
await asyncio.get_event_loop().run_in_executor(None, self.ffmpeg_process.wait) # Wait for kill
|
||||
logger.info("FFmpeg process terminated.")
|
||||
|
||||
await self.ffmpeg_manager.stop()
|
||||
logger.info("FFmpeg manager stopped.")
|
||||
if self.args.diarization and hasattr(self, 'diarization') and hasattr(self.diarization, 'close'):
|
||||
self.diarization.close()
|
||||
logger.info("AudioProcessor cleanup complete.")
|
||||
@@ -598,53 +564,18 @@ class AudioProcessor:
|
||||
if not message:
|
||||
logger.info("Empty audio message received, initiating stop sequence.")
|
||||
self.is_stopping = True
|
||||
if self.ffmpeg_process and self.ffmpeg_process.stdin and not self.ffmpeg_process.stdin.closed:
|
||||
try:
|
||||
self.ffmpeg_process.stdin.close()
|
||||
logger.info("FFmpeg stdin closed due to stop signal.")
|
||||
except Exception as e:
|
||||
logger.warning(f"Error closing ffmpeg stdin on stop: {e}")
|
||||
# Signal FFmpeg manager to stop accepting data
|
||||
await self.ffmpeg_manager.stop()
|
||||
return
|
||||
|
||||
if self.is_stopping or (self.ffmpeg_process and self.ffmpeg_process.stdin and self.ffmpeg_process.stdin.closed):
|
||||
logger.warning("AudioProcessor is stopping or stdin is closed. Ignoring incoming audio.")
|
||||
if self.is_stopping:
|
||||
logger.warning("AudioProcessor is stopping. Ignoring incoming audio.")
|
||||
return
|
||||
|
||||
current_time = time()
|
||||
if not hasattr(self, '_last_heartbeat') or current_time - getattr(self, '_last_heartbeat', 0) >= 10:
|
||||
logger.debug(f"Processing audio chunk, last FFmpeg activity: {current_time - self.last_ffmpeg_activity:.2f}s ago")
|
||||
self._last_heartbeat = current_time
|
||||
|
||||
for attempt in range(3):
|
||||
try:
|
||||
if not self.ffmpeg_process or self.ffmpeg_process.poll() is not None:
|
||||
logger.warning("FFmpeg process not available or has terminated. Restarting...")
|
||||
await self.restart_ffmpeg()
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
|
||||
await asyncio.wait_for(
|
||||
loop.run_in_executor(None, self.ffmpeg_process.stdin.write, message),
|
||||
timeout=20.0
|
||||
)
|
||||
await asyncio.wait_for(
|
||||
loop.run_in_executor(None, self.ffmpeg_process.stdin.flush),
|
||||
timeout=20.0
|
||||
)
|
||||
|
||||
self.last_ffmpeg_activity = time()
|
||||
return
|
||||
|
||||
except asyncio.TimeoutError as e:
|
||||
logger.warning(f"FFmpeg operation timed out: {e}. Attempt {attempt + 1}/3.")
|
||||
if attempt < 2:
|
||||
await self.restart_ffmpeg()
|
||||
else:
|
||||
logger.error("FFmpeg operations failed after multiple retries due to timeouts.")
|
||||
except (BrokenPipeError, AttributeError, OSError) as e:
|
||||
logger.warning(f"Error writing to FFmpeg: {e}. Attempt {attempt + 1}/3.")
|
||||
if attempt < 2:
|
||||
await self.restart_ffmpeg()
|
||||
await asyncio.sleep(0.5)
|
||||
else:
|
||||
logger.error("Maximum retries reached for FFmpeg write. Giving up on this chunk.")
|
||||
success = await self.ffmpeg_manager.write_data(message)
|
||||
if not success:
|
||||
ffmpeg_state = await self.ffmpeg_manager.get_state()
|
||||
if ffmpeg_state == FFmpegState.FAILED:
|
||||
logger.error("FFmpeg is in FAILED state, cannot process audio")
|
||||
else:
|
||||
logger.warning("Failed to write audio data to FFmpeg")
|
||||
|
||||
Reference in New Issue
Block a user