mirror of
https://github.com/QuentinFuxa/WhisperLiveKit.git
synced 2026-03-21 16:40:35 +00:00
Solves stdin flushes blocking IOhttps://github.com/QuentinFuxa/WhisperLiveKit/issues/110
https://github.com/QuentinFuxa/WhisperLiveKit/issues/106 https://github.com/QuentinFuxa/WhisperLiveKit/issues/90 https://github.com/QuentinFuxa/WhisperLiveKit/issues/87 https://github.com/QuentinFuxa/WhisperLiveKit/issues/81 https://github.com/QuentinFuxa/WhisperLiveKit/issues/2
This commit is contained in:
@@ -205,22 +205,10 @@ class AudioProcessor:
|
||||
self.last_ffmpeg_activity = time()
|
||||
continue
|
||||
|
||||
# Reduce timeout for reading from FFmpeg
|
||||
try:
|
||||
chunk = await asyncio.wait_for(
|
||||
loop.run_in_executor(None, self.ffmpeg_process.stdout.read, buffer_size),
|
||||
timeout=5.0 # Shorter timeout (5 seconds instead of 15)
|
||||
)
|
||||
if chunk:
|
||||
self.last_ffmpeg_activity = time()
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning("FFmpeg read timeout. Restarting...")
|
||||
await self.restart_ffmpeg()
|
||||
beg = time()
|
||||
chunk = await loop.run_in_executor(None, self.ffmpeg_process.stdout.read, buffer_size)
|
||||
if chunk:
|
||||
self.last_ffmpeg_activity = time()
|
||||
continue
|
||||
|
||||
|
||||
if not chunk:
|
||||
logger.info("FFmpeg stdout closed.")
|
||||
break
|
||||
@@ -233,7 +221,7 @@ class AudioProcessor:
|
||||
self.convert_pcm_to_float(self.pcm_buffer).copy()
|
||||
)
|
||||
|
||||
# Process when we have enough data
|
||||
# Process when enough data
|
||||
if len(self.pcm_buffer) >= self.bytes_per_sec:
|
||||
if len(self.pcm_buffer) > self.max_bytes_per_sec:
|
||||
logger.warning(
|
||||
@@ -492,19 +480,40 @@ class AudioProcessor:
|
||||
if not self.ffmpeg_process or not hasattr(self.ffmpeg_process, 'stdin') or self.ffmpeg_process.poll() is not None:
|
||||
logger.warning("FFmpeg process not available, restarting...")
|
||||
await self.restart_ffmpeg()
|
||||
|
||||
self.ffmpeg_process.stdin.write(message)
|
||||
self.ffmpeg_process.stdin.flush()
|
||||
self.last_ffmpeg_activity = time() # Update activity timestamp
|
||||
return
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
try:
|
||||
await asyncio.wait_for(
|
||||
loop.run_in_executor(None, lambda: self.ffmpeg_process.stdin.write(message)),
|
||||
timeout=2.0
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning("FFmpeg write operation timed out, restarting...")
|
||||
await self.restart_ffmpeg()
|
||||
retry_count += 1
|
||||
continue
|
||||
|
||||
try:
|
||||
await asyncio.wait_for(
|
||||
loop.run_in_executor(None, self.ffmpeg_process.stdin.flush),
|
||||
timeout=2.0
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning("FFmpeg flush operation timed out, restarting...")
|
||||
await self.restart_ffmpeg()
|
||||
retry_count += 1
|
||||
continue
|
||||
|
||||
self.last_ffmpeg_activity = time()
|
||||
return
|
||||
|
||||
except (BrokenPipeError, AttributeError, OSError) as e:
|
||||
retry_count += 1
|
||||
logger.warning(f"Error writing to FFmpeg: {e}. Retry {retry_count}/{max_retries}...")
|
||||
|
||||
if retry_count < max_retries:
|
||||
await self.restart_ffmpeg()
|
||||
await asyncio.sleep(0.5) # Shorter pause between retries
|
||||
await asyncio.sleep(0.5)
|
||||
else:
|
||||
logger.error("Maximum retries reached for FFmpeg process")
|
||||
await self.restart_ffmpeg()
|
||||
|
||||
Reference in New Issue
Block a user