From 704170ccf369710f8dfda782cbb95944cdab7fca Mon Sep 17 00:00:00 2001 From: Quentin Fuxa Date: Wed, 9 Apr 2025 11:34:27 +0200 Subject: [PATCH] Logs for https://github.com/QuentinFuxa/WhisperLiveKit/issues/110 https://github.com/QuentinFuxa/WhisperLiveKit/issues/106 https://github.com/QuentinFuxa/WhisperLiveKit/issues/90 https://github.com/QuentinFuxa/WhisperLiveKit/issues/87 https://github.com/QuentinFuxa/WhisperLiveKit/issues/81 https://github.com/QuentinFuxa/WhisperLiveKit/issues/2 --- whisperlivekit/audio_processor.py | 140 ++++++++++++++++++++++++++---- 1 file changed, 121 insertions(+), 19 deletions(-) diff --git a/whisperlivekit/audio_processor.py b/whisperlivekit/audio_processor.py index 68ee571..0f2d490 100644 --- a/whisperlivekit/audio_processor.py +++ b/whisperlivekit/audio_processor.py @@ -6,7 +6,6 @@ import math import logging import traceback from datetime import timedelta -from typing import List, Dict, Any from whisperlivekit.timed_objects import ASRToken from whisperlivekit.whisper_streaming_custom.whisper_online import online_factory from whisperlivekit.core import WhisperLiveKit @@ -39,7 +38,10 @@ class AudioProcessor: self.bytes_per_sample = 2 self.bytes_per_sec = self.samples_per_sec * self.bytes_per_sample self.max_bytes_per_sec = 32000 * 5 # 5 seconds of audio at 32 kHz - + self.last_ffmpeg_activity = time() + self.ffmpeg_health_check_interval = 5 + self.ffmpeg_max_idle_time = 10 + # State management self.tokens = [] self.buffer_transcription = "" @@ -78,14 +80,50 @@ class AudioProcessor: async def restart_ffmpeg(self): """Restart the FFmpeg process after failure.""" + logger.warning("Restarting FFmpeg process...") + if self.ffmpeg_process: try: - self.ffmpeg_process.kill() - await asyncio.get_event_loop().run_in_executor(None, self.ffmpeg_process.wait) + # we check if process is still running + if self.ffmpeg_process.poll() is None: + logger.info("Terminating existing FFmpeg process") + self.ffmpeg_process.stdin.close() + self.ffmpeg_process.terminate() + + # wait for termination with timeout + try: + await asyncio.wait_for( + asyncio.get_event_loop().run_in_executor(None, self.ffmpeg_process.wait), + timeout=5.0 + ) + except asyncio.TimeoutError: + logger.warning("FFmpeg process did not terminate, killing forcefully") + self.ffmpeg_process.kill() + await asyncio.get_event_loop().run_in_executor(None, self.ffmpeg_process.wait) except Exception as e: - logger.warning(f"Error killing FFmpeg process: {e}") + logger.error(f"Error during FFmpeg process termination: {e}") + logger.error(traceback.format_exc()) + + # we start new process + try: + logger.info("Starting new FFmpeg process") self.ffmpeg_process = self.start_ffmpeg_decoder() self.pcm_buffer = bytearray() + self.last_ffmpeg_activity = time() + logger.info("FFmpeg process restarted successfully") + except Exception as e: + logger.error(f"Failed to restart FFmpeg process: {e}") + logger.error(traceback.format_exc()) + # try again after 5s + await asyncio.sleep(5) + try: + self.ffmpeg_process = self.start_ffmpeg_decoder() + self.pcm_buffer = bytearray() + self.last_ffmpeg_activity = time() + logger.info("FFmpeg process restarted successfully on second attempt") + except Exception as e2: + logger.critical(f"Failed to restart FFmpeg process on second attempt: {e2}") + logger.critical(traceback.format_exc()) async def update_transcription(self, new_tokens, buffer, end_buffer, full_transcription, sep): """Thread-safe update of transcription with new data.""" @@ -154,21 +192,33 @@ class AudioProcessor: while True: try: - # Calculate buffer size based on elapsed time - elapsed_time = math.floor((time() - beg) * 10) / 10 # Round to 0.1 sec + current_time = time() + elapsed_time = math.floor((current_time - beg) * 10) / 10 buffer_size = max(int(32000 * elapsed_time), 4096) - beg = time() + beg = current_time - # Read chunk with timeout + # Detect idle state much more quickly + if current_time - self.last_ffmpeg_activity > self.ffmpeg_max_idle_time: + logger.warning(f"FFmpeg process idle for {current_time - self.last_ffmpeg_activity:.2f}s. Restarting...") + await self.restart_ffmpeg() + beg = time() + self.last_ffmpeg_activity = time() + continue + + # Reduce timeout for reading from FFmpeg try: chunk = await asyncio.wait_for( loop.run_in_executor(None, self.ffmpeg_process.stdout.read, buffer_size), - timeout=15.0 + timeout=5.0 # Shorter timeout (5 seconds instead of 15) ) + if chunk: + self.last_ffmpeg_activity = time() + except asyncio.TimeoutError: logger.warning("FFmpeg read timeout. Restarting...") await self.restart_ffmpeg() beg = time() + self.last_ffmpeg_activity = time() continue if not chunk: @@ -366,7 +416,7 @@ class AudioProcessor: logger.warning(f"Exception in results_formatter: {e}") logger.warning(f"Traceback: {traceback.format_exc()}") await asyncio.sleep(0.5) # Back off on error - + async def create_tasks(self): """Create and start processing tasks.""" @@ -378,6 +428,35 @@ class AudioProcessor: tasks.append(asyncio.create_task(self.diarization_processor(self.diarization))) tasks.append(asyncio.create_task(self.ffmpeg_stdout_reader())) + + # Monitor overall system health + async def watchdog(): + while True: + try: + await asyncio.sleep(10) # Check every 10 seconds instead of 60 + + current_time = time() + # Check for stalled tasks + for i, task in enumerate(tasks): + if task.done(): + exc = task.exception() if task.done() else None + task_name = task.get_name() if hasattr(task, 'get_name') else f"Task {i}" + logger.error(f"{task_name} unexpectedly completed with exception: {exc}") + + # Check for FFmpeg process health with shorter thresholds + ffmpeg_idle_time = current_time - self.last_ffmpeg_activity + if ffmpeg_idle_time > 15: # 15 seconds instead of 180 + logger.warning(f"FFmpeg idle for {ffmpeg_idle_time:.2f}s - may need attention") + + # Force restart after 30 seconds of inactivity (instead of 600) + if ffmpeg_idle_time > 30: + logger.error("FFmpeg idle for too long, forcing restart") + await self.restart_ffmpeg() + + except Exception as e: + logger.error(f"Error in watchdog task: {e}") + + tasks.append(asyncio.create_task(watchdog())) self.tasks = tasks return self.results_formatter() @@ -399,11 +478,34 @@ class AudioProcessor: async def process_audio(self, message): """Process incoming audio data.""" - try: - self.ffmpeg_process.stdin.write(message) - self.ffmpeg_process.stdin.flush() - except (BrokenPipeError, AttributeError) as e: - logger.warning(f"Error writing to FFmpeg: {e}. Restarting...") - await self.restart_ffmpeg() - self.ffmpeg_process.stdin.write(message) - self.ffmpeg_process.stdin.flush() \ No newline at end of file + retry_count = 0 + max_retries = 3 + + # Log periodic heartbeats showing ongoing audio proc + current_time = time() + if not hasattr(self, '_last_heartbeat') or current_time - self._last_heartbeat >= 10: + logger.debug(f"Processing audio chunk, last FFmpeg activity: {current_time - self.last_ffmpeg_activity:.2f}s ago") + self._last_heartbeat = current_time + + while retry_count < max_retries: + try: + if not self.ffmpeg_process or not hasattr(self.ffmpeg_process, 'stdin') or self.ffmpeg_process.poll() is not None: + logger.warning("FFmpeg process not available, restarting...") + await self.restart_ffmpeg() + + self.ffmpeg_process.stdin.write(message) + self.ffmpeg_process.stdin.flush() + self.last_ffmpeg_activity = time() # Update activity timestamp + return + + except (BrokenPipeError, AttributeError, OSError) as e: + retry_count += 1 + logger.warning(f"Error writing to FFmpeg: {e}. Retry {retry_count}/{max_retries}...") + + if retry_count < max_retries: + await self.restart_ffmpeg() + await asyncio.sleep(0.5) # Shorter pause between retries + else: + logger.error("Maximum retries reached for FFmpeg process") + await self.restart_ffmpeg() + return \ No newline at end of file