From 774cee036be37acf13dd1836713714fae6901592 Mon Sep 17 00:00:00 2001 From: Quentin Fuxa Date: Mon, 30 Jun 2025 18:28:50 +0200 Subject: [PATCH] increase timeout from 2 to 20s for ffmpeg stdin flush and writing --- whisperlivekit/audio_processor.py | 85 ++++++++++++------------------- 1 file changed, 32 insertions(+), 53 deletions(-) diff --git a/whisperlivekit/audio_processor.py b/whisperlivekit/audio_processor.py index b8b2086..9bf097a 100644 --- a/whisperlivekit/audio_processor.py +++ b/whisperlivekit/audio_processor.py @@ -595,77 +595,56 @@ class AudioProcessor: async def process_audio(self, message): """Process incoming audio data.""" - # If already stopping or stdin is closed, ignore further audio, especially residual chunks. - if self.is_stopping or (self.ffmpeg_process and self.ffmpeg_process.stdin and self.ffmpeg_process.stdin.closed): - logger.warning(f"AudioProcessor is stopping or stdin is closed. Ignoring incoming audio message (length: {len(message)}).") - if not message and self.ffmpeg_process and self.ffmpeg_process.stdin and not self.ffmpeg_process.stdin.closed: - logger.info("Received empty message while already in stopping state; ensuring stdin is closed.") - try: - self.ffmpeg_process.stdin.close() - except Exception as e: - logger.warning(f"Error closing ffmpeg stdin on redundant stop signal during stopping state: {e}") - return - - if not message: # primary signal to start stopping + if not message: logger.info("Empty audio message received, initiating stop sequence.") self.is_stopping = True if self.ffmpeg_process and self.ffmpeg_process.stdin and not self.ffmpeg_process.stdin.closed: try: self.ffmpeg_process.stdin.close() - logger.info("FFmpeg stdin closed due to primary stop signal.") + logger.info("FFmpeg stdin closed due to stop signal.") except Exception as e: logger.warning(f"Error closing ffmpeg stdin on stop: {e}") return - retry_count = 0 - max_retries = 3 - - # Log periodic heartbeats showing ongoing audio proc + if self.is_stopping or (self.ffmpeg_process and self.ffmpeg_process.stdin and self.ffmpeg_process.stdin.closed): + logger.warning("AudioProcessor is stopping or stdin is closed. Ignoring incoming audio.") + return + current_time = time() - if not hasattr(self, '_last_heartbeat') or current_time - self._last_heartbeat >= 10: + if not hasattr(self, '_last_heartbeat') or current_time - getattr(self, '_last_heartbeat', 0) >= 10: logger.debug(f"Processing audio chunk, last FFmpeg activity: {current_time - self.last_ffmpeg_activity:.2f}s ago") self._last_heartbeat = current_time - - while retry_count < max_retries: + + for attempt in range(3): try: - if not self.ffmpeg_process or not hasattr(self.ffmpeg_process, 'stdin') or self.ffmpeg_process.poll() is not None: - logger.warning("FFmpeg process not available, restarting...") + if not self.ffmpeg_process or self.ffmpeg_process.poll() is not None: + logger.warning("FFmpeg process not available or has terminated. Restarting...") await self.restart_ffmpeg() + + loop = asyncio.get_running_loop() + + await asyncio.wait_for( + loop.run_in_executor(None, self.ffmpeg_process.stdin.write, message), + timeout=20.0 + ) + await asyncio.wait_for( + loop.run_in_executor(None, self.ffmpeg_process.stdin.flush), + timeout=20.0 + ) - loop = asyncio.get_running_loop() - try: - await asyncio.wait_for( - loop.run_in_executor(None, lambda: self.ffmpeg_process.stdin.write(message)), - timeout=2.0 - ) - except asyncio.TimeoutError: - logger.warning("FFmpeg write operation timed out, restarting...") - await self.restart_ffmpeg() - retry_count += 1 - continue - - try: - await asyncio.wait_for( - loop.run_in_executor(None, self.ffmpeg_process.stdin.flush), - timeout=2.0 - ) - except asyncio.TimeoutError: - logger.warning("FFmpeg flush operation timed out, restarting...") - await self.restart_ffmpeg() - retry_count += 1 - continue - self.last_ffmpeg_activity = time() return - + + except asyncio.TimeoutError as e: + logger.warning(f"FFmpeg operation timed out: {e}. Attempt {attempt + 1}/3.") + if attempt < 2: + await self.restart_ffmpeg() + else: + logger.error("FFmpeg operations failed after multiple retries due to timeouts.") except (BrokenPipeError, AttributeError, OSError) as e: - retry_count += 1 - logger.warning(f"Error writing to FFmpeg: {e}. Retry {retry_count}/{max_retries}...") - - if retry_count < max_retries: + logger.warning(f"Error writing to FFmpeg: {e}. Attempt {attempt + 1}/3.") + if attempt < 2: await self.restart_ffmpeg() await asyncio.sleep(0.5) else: - logger.error("Maximum retries reached for FFmpeg process") - await self.restart_ffmpeg() - return + logger.error("Maximum retries reached for FFmpeg write. Giving up on this chunk.") \ No newline at end of file