mirror of
https://github.com/QuentinFuxa/WhisperLiveKit.git
synced 2026-03-07 22:33:36 +00:00
increase timeout from 2 to 20s for ffmpeg stdin flush and writing
This commit is contained in:
@@ -595,77 +595,56 @@ class AudioProcessor:
|
||||
|
||||
async def process_audio(self, message):
|
||||
"""Process incoming audio data."""
|
||||
# If already stopping or stdin is closed, ignore further audio, especially residual chunks.
|
||||
if self.is_stopping or (self.ffmpeg_process and self.ffmpeg_process.stdin and self.ffmpeg_process.stdin.closed):
|
||||
logger.warning(f"AudioProcessor is stopping or stdin is closed. Ignoring incoming audio message (length: {len(message)}).")
|
||||
if not message and self.ffmpeg_process and self.ffmpeg_process.stdin and not self.ffmpeg_process.stdin.closed:
|
||||
logger.info("Received empty message while already in stopping state; ensuring stdin is closed.")
|
||||
try:
|
||||
self.ffmpeg_process.stdin.close()
|
||||
except Exception as e:
|
||||
logger.warning(f"Error closing ffmpeg stdin on redundant stop signal during stopping state: {e}")
|
||||
return
|
||||
|
||||
if not message: # primary signal to start stopping
|
||||
if not message:
|
||||
logger.info("Empty audio message received, initiating stop sequence.")
|
||||
self.is_stopping = True
|
||||
if self.ffmpeg_process and self.ffmpeg_process.stdin and not self.ffmpeg_process.stdin.closed:
|
||||
try:
|
||||
self.ffmpeg_process.stdin.close()
|
||||
logger.info("FFmpeg stdin closed due to primary stop signal.")
|
||||
logger.info("FFmpeg stdin closed due to stop signal.")
|
||||
except Exception as e:
|
||||
logger.warning(f"Error closing ffmpeg stdin on stop: {e}")
|
||||
return
|
||||
|
||||
retry_count = 0
|
||||
max_retries = 3
|
||||
|
||||
# Log periodic heartbeats showing ongoing audio proc
|
||||
if self.is_stopping or (self.ffmpeg_process and self.ffmpeg_process.stdin and self.ffmpeg_process.stdin.closed):
|
||||
logger.warning("AudioProcessor is stopping or stdin is closed. Ignoring incoming audio.")
|
||||
return
|
||||
|
||||
current_time = time()
|
||||
if not hasattr(self, '_last_heartbeat') or current_time - self._last_heartbeat >= 10:
|
||||
if not hasattr(self, '_last_heartbeat') or current_time - getattr(self, '_last_heartbeat', 0) >= 10:
|
||||
logger.debug(f"Processing audio chunk, last FFmpeg activity: {current_time - self.last_ffmpeg_activity:.2f}s ago")
|
||||
self._last_heartbeat = current_time
|
||||
|
||||
while retry_count < max_retries:
|
||||
|
||||
for attempt in range(3):
|
||||
try:
|
||||
if not self.ffmpeg_process or not hasattr(self.ffmpeg_process, 'stdin') or self.ffmpeg_process.poll() is not None:
|
||||
logger.warning("FFmpeg process not available, restarting...")
|
||||
if not self.ffmpeg_process or self.ffmpeg_process.poll() is not None:
|
||||
logger.warning("FFmpeg process not available or has terminated. Restarting...")
|
||||
await self.restart_ffmpeg()
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
|
||||
await asyncio.wait_for(
|
||||
loop.run_in_executor(None, self.ffmpeg_process.stdin.write, message),
|
||||
timeout=20.0
|
||||
)
|
||||
await asyncio.wait_for(
|
||||
loop.run_in_executor(None, self.ffmpeg_process.stdin.flush),
|
||||
timeout=20.0
|
||||
)
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
try:
|
||||
await asyncio.wait_for(
|
||||
loop.run_in_executor(None, lambda: self.ffmpeg_process.stdin.write(message)),
|
||||
timeout=2.0
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning("FFmpeg write operation timed out, restarting...")
|
||||
await self.restart_ffmpeg()
|
||||
retry_count += 1
|
||||
continue
|
||||
|
||||
try:
|
||||
await asyncio.wait_for(
|
||||
loop.run_in_executor(None, self.ffmpeg_process.stdin.flush),
|
||||
timeout=2.0
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning("FFmpeg flush operation timed out, restarting...")
|
||||
await self.restart_ffmpeg()
|
||||
retry_count += 1
|
||||
continue
|
||||
|
||||
self.last_ffmpeg_activity = time()
|
||||
return
|
||||
|
||||
|
||||
except asyncio.TimeoutError as e:
|
||||
logger.warning(f"FFmpeg operation timed out: {e}. Attempt {attempt + 1}/3.")
|
||||
if attempt < 2:
|
||||
await self.restart_ffmpeg()
|
||||
else:
|
||||
logger.error("FFmpeg operations failed after multiple retries due to timeouts.")
|
||||
except (BrokenPipeError, AttributeError, OSError) as e:
|
||||
retry_count += 1
|
||||
logger.warning(f"Error writing to FFmpeg: {e}. Retry {retry_count}/{max_retries}...")
|
||||
|
||||
if retry_count < max_retries:
|
||||
logger.warning(f"Error writing to FFmpeg: {e}. Attempt {attempt + 1}/3.")
|
||||
if attempt < 2:
|
||||
await self.restart_ffmpeg()
|
||||
await asyncio.sleep(0.5)
|
||||
else:
|
||||
logger.error("Maximum retries reached for FFmpeg process")
|
||||
await self.restart_ffmpeg()
|
||||
return
|
||||
logger.error("Maximum retries reached for FFmpeg write. Giving up on this chunk.")
|
||||
Reference in New Issue
Block a user