diff --git a/whisperlivekit/audio_processor.py b/whisperlivekit/audio_processor.py index 0f2d490..2fc5ce5 100644 --- a/whisperlivekit/audio_processor.py +++ b/whisperlivekit/audio_processor.py @@ -205,22 +205,10 @@ class AudioProcessor: self.last_ffmpeg_activity = time() continue - # Reduce timeout for reading from FFmpeg - try: - chunk = await asyncio.wait_for( - loop.run_in_executor(None, self.ffmpeg_process.stdout.read, buffer_size), - timeout=5.0 # Shorter timeout (5 seconds instead of 15) - ) - if chunk: - self.last_ffmpeg_activity = time() - - except asyncio.TimeoutError: - logger.warning("FFmpeg read timeout. Restarting...") - await self.restart_ffmpeg() - beg = time() + chunk = await loop.run_in_executor(None, self.ffmpeg_process.stdout.read, buffer_size) + if chunk: self.last_ffmpeg_activity = time() - continue - + if not chunk: logger.info("FFmpeg stdout closed.") break @@ -233,7 +221,7 @@ class AudioProcessor: self.convert_pcm_to_float(self.pcm_buffer).copy() ) - # Process when we have enough data + # Process when enough data if len(self.pcm_buffer) >= self.bytes_per_sec: if len(self.pcm_buffer) > self.max_bytes_per_sec: logger.warning( @@ -492,19 +480,40 @@ class AudioProcessor: if not self.ffmpeg_process or not hasattr(self.ffmpeg_process, 'stdin') or self.ffmpeg_process.poll() is not None: logger.warning("FFmpeg process not available, restarting...") await self.restart_ffmpeg() - - self.ffmpeg_process.stdin.write(message) - self.ffmpeg_process.stdin.flush() - self.last_ffmpeg_activity = time() # Update activity timestamp - return + loop = asyncio.get_running_loop() + try: + await asyncio.wait_for( + loop.run_in_executor(None, lambda: self.ffmpeg_process.stdin.write(message)), + timeout=2.0 + ) + except asyncio.TimeoutError: + logger.warning("FFmpeg write operation timed out, restarting...") + await self.restart_ffmpeg() + retry_count += 1 + continue + + try: + await asyncio.wait_for( + loop.run_in_executor(None, self.ffmpeg_process.stdin.flush), + timeout=2.0 + ) + except asyncio.TimeoutError: + logger.warning("FFmpeg flush operation timed out, restarting...") + await self.restart_ffmpeg() + retry_count += 1 + continue + + self.last_ffmpeg_activity = time() + return + except (BrokenPipeError, AttributeError, OSError) as e: retry_count += 1 logger.warning(f"Error writing to FFmpeg: {e}. Retry {retry_count}/{max_retries}...") if retry_count < max_retries: await self.restart_ffmpeg() - await asyncio.sleep(0.5) # Shorter pause between retries + await asyncio.sleep(0.5) else: logger.error("Maximum retries reached for FFmpeg process") await self.restart_ffmpeg()