Quentin Fuxa
2025-04-09 11:34:27 +02:00
parent 23e41f993f
commit 704170ccf3

View File

@@ -6,7 +6,6 @@ import math
import logging
import traceback
from datetime import timedelta
from typing import List, Dict, Any
from whisperlivekit.timed_objects import ASRToken
from whisperlivekit.whisper_streaming_custom.whisper_online import online_factory
from whisperlivekit.core import WhisperLiveKit
@@ -39,7 +38,10 @@ class AudioProcessor:
self.bytes_per_sample = 2
self.bytes_per_sec = self.samples_per_sec * self.bytes_per_sample
self.max_bytes_per_sec = 32000 * 5 # 5 seconds of audio at 32 kHz
self.last_ffmpeg_activity = time()
self.ffmpeg_health_check_interval = 5
self.ffmpeg_max_idle_time = 10
# State management
self.tokens = []
self.buffer_transcription = ""
@@ -78,14 +80,50 @@ class AudioProcessor:
async def restart_ffmpeg(self):
"""Restart the FFmpeg process after failure."""
logger.warning("Restarting FFmpeg process...")
if self.ffmpeg_process:
try:
self.ffmpeg_process.kill()
await asyncio.get_event_loop().run_in_executor(None, self.ffmpeg_process.wait)
# we check if process is still running
if self.ffmpeg_process.poll() is None:
logger.info("Terminating existing FFmpeg process")
self.ffmpeg_process.stdin.close()
self.ffmpeg_process.terminate()
# wait for termination with timeout
try:
await asyncio.wait_for(
asyncio.get_event_loop().run_in_executor(None, self.ffmpeg_process.wait),
timeout=5.0
)
except asyncio.TimeoutError:
logger.warning("FFmpeg process did not terminate, killing forcefully")
self.ffmpeg_process.kill()
await asyncio.get_event_loop().run_in_executor(None, self.ffmpeg_process.wait)
except Exception as e:
logger.warning(f"Error killing FFmpeg process: {e}")
logger.error(f"Error during FFmpeg process termination: {e}")
logger.error(traceback.format_exc())
# we start new process
try:
logger.info("Starting new FFmpeg process")
self.ffmpeg_process = self.start_ffmpeg_decoder()
self.pcm_buffer = bytearray()
self.last_ffmpeg_activity = time()
logger.info("FFmpeg process restarted successfully")
except Exception as e:
logger.error(f"Failed to restart FFmpeg process: {e}")
logger.error(traceback.format_exc())
# try again after 5s
await asyncio.sleep(5)
try:
self.ffmpeg_process = self.start_ffmpeg_decoder()
self.pcm_buffer = bytearray()
self.last_ffmpeg_activity = time()
logger.info("FFmpeg process restarted successfully on second attempt")
except Exception as e2:
logger.critical(f"Failed to restart FFmpeg process on second attempt: {e2}")
logger.critical(traceback.format_exc())
async def update_transcription(self, new_tokens, buffer, end_buffer, full_transcription, sep):
"""Thread-safe update of transcription with new data."""
@@ -154,21 +192,33 @@ class AudioProcessor:
while True:
try:
# Calculate buffer size based on elapsed time
elapsed_time = math.floor((time() - beg) * 10) / 10 # Round to 0.1 sec
current_time = time()
elapsed_time = math.floor((current_time - beg) * 10) / 10
buffer_size = max(int(32000 * elapsed_time), 4096)
beg = time()
beg = current_time
# Read chunk with timeout
# Detect idle state much more quickly
if current_time - self.last_ffmpeg_activity > self.ffmpeg_max_idle_time:
logger.warning(f"FFmpeg process idle for {current_time - self.last_ffmpeg_activity:.2f}s. Restarting...")
await self.restart_ffmpeg()
beg = time()
self.last_ffmpeg_activity = time()
continue
# Reduce timeout for reading from FFmpeg
try:
chunk = await asyncio.wait_for(
loop.run_in_executor(None, self.ffmpeg_process.stdout.read, buffer_size),
timeout=15.0
timeout=5.0 # Shorter timeout (5 seconds instead of 15)
)
if chunk:
self.last_ffmpeg_activity = time()
except asyncio.TimeoutError:
logger.warning("FFmpeg read timeout. Restarting...")
await self.restart_ffmpeg()
beg = time()
self.last_ffmpeg_activity = time()
continue
if not chunk:
@@ -366,7 +416,7 @@ class AudioProcessor:
logger.warning(f"Exception in results_formatter: {e}")
logger.warning(f"Traceback: {traceback.format_exc()}")
await asyncio.sleep(0.5) # Back off on error
async def create_tasks(self):
"""Create and start processing tasks."""
@@ -378,6 +428,35 @@ class AudioProcessor:
tasks.append(asyncio.create_task(self.diarization_processor(self.diarization)))
tasks.append(asyncio.create_task(self.ffmpeg_stdout_reader()))
# Monitor overall system health
async def watchdog():
while True:
try:
await asyncio.sleep(10) # Check every 10 seconds instead of 60
current_time = time()
# Check for stalled tasks
for i, task in enumerate(tasks):
if task.done():
exc = task.exception() if task.done() else None
task_name = task.get_name() if hasattr(task, 'get_name') else f"Task {i}"
logger.error(f"{task_name} unexpectedly completed with exception: {exc}")
# Check for FFmpeg process health with shorter thresholds
ffmpeg_idle_time = current_time - self.last_ffmpeg_activity
if ffmpeg_idle_time > 15: # 15 seconds instead of 180
logger.warning(f"FFmpeg idle for {ffmpeg_idle_time:.2f}s - may need attention")
# Force restart after 30 seconds of inactivity (instead of 600)
if ffmpeg_idle_time > 30:
logger.error("FFmpeg idle for too long, forcing restart")
await self.restart_ffmpeg()
except Exception as e:
logger.error(f"Error in watchdog task: {e}")
tasks.append(asyncio.create_task(watchdog()))
self.tasks = tasks
return self.results_formatter()
@@ -399,11 +478,34 @@ class AudioProcessor:
async def process_audio(self, message):
"""Process incoming audio data."""
try:
self.ffmpeg_process.stdin.write(message)
self.ffmpeg_process.stdin.flush()
except (BrokenPipeError, AttributeError) as e:
logger.warning(f"Error writing to FFmpeg: {e}. Restarting...")
await self.restart_ffmpeg()
self.ffmpeg_process.stdin.write(message)
self.ffmpeg_process.stdin.flush()
retry_count = 0
max_retries = 3
# Log periodic heartbeats showing ongoing audio proc
current_time = time()
if not hasattr(self, '_last_heartbeat') or current_time - self._last_heartbeat >= 10:
logger.debug(f"Processing audio chunk, last FFmpeg activity: {current_time - self.last_ffmpeg_activity:.2f}s ago")
self._last_heartbeat = current_time
while retry_count < max_retries:
try:
if not self.ffmpeg_process or not hasattr(self.ffmpeg_process, 'stdin') or self.ffmpeg_process.poll() is not None:
logger.warning("FFmpeg process not available, restarting...")
await self.restart_ffmpeg()
self.ffmpeg_process.stdin.write(message)
self.ffmpeg_process.stdin.flush()
self.last_ffmpeg_activity = time() # Update activity timestamp
return
except (BrokenPipeError, AttributeError, OSError) as e:
retry_count += 1
logger.warning(f"Error writing to FFmpeg: {e}. Retry {retry_count}/{max_retries}...")
if retry_count < max_retries:
await self.restart_ffmpeg()
await asyncio.sleep(0.5) # Shorter pause between retries
else:
logger.error("Maximum retries reached for FFmpeg process")
await self.restart_ffmpeg()
return