diff --git a/whisperlivekit/audio_processor.py b/whisperlivekit/audio_processor.py index 9bf097a..f3238f1 100644 --- a/whisperlivekit/audio_processor.py +++ b/whisperlivekit/audio_processor.py @@ -1,6 +1,5 @@ import asyncio import numpy as np -import ffmpeg from time import time, sleep import math import logging @@ -9,6 +8,7 @@ from datetime import timedelta from whisperlivekit.timed_objects import ASRToken from whisperlivekit.whisper_streaming_custom.whisper_online import online_factory from whisperlivekit.core import TranscriptionEngine +from whisperlivekit.ffmpeg_manager import FFmpegManager, FFmpegState # Set up logging once logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") @@ -64,7 +64,19 @@ class AudioProcessor: self.asr = models.asr self.tokenizer = models.tokenizer self.diarization = models.diarization - self.ffmpeg_process = self.start_ffmpeg_decoder() + + self.ffmpeg_manager = FFmpegManager( + sample_rate=self.sample_rate, + channels=self.channels + ) + + async def handle_ffmpeg_error(error_type: str): + logger.error(f"FFmpeg error: {error_type}") + self._ffmpeg_error = error_type + + self.ffmpeg_manager.on_error_callback = handle_ffmpeg_error + self._ffmpeg_error = None + self.transcription_queue = asyncio.Queue() if self.args.transcription else None self.diarization_queue = asyncio.Queue() if self.args.diarization else None self.pcm_buffer = bytearray() @@ -84,83 +96,6 @@ class AudioProcessor: """Convert PCM buffer in s16le format to normalized NumPy array.""" return np.frombuffer(pcm_buffer, dtype=np.int16).astype(np.float32) / 32768.0 - def start_ffmpeg_decoder(self): - """Start FFmpeg process for WebM to PCM conversion.""" - try: - return (ffmpeg.input("pipe:0", format="webm") - .output("pipe:1", format="s16le", acodec="pcm_s16le", - ac=self.channels, ar=str(self.sample_rate)) - .run_async(pipe_stdin=True, pipe_stdout=True, pipe_stderr=True)) - except FileNotFoundError: - error = """ - FFmpeg is not installed or not found in your system's PATH. - Please install FFmpeg to enable audio processing. - - Installation instructions: - - # Ubuntu/Debian: - sudo apt update && sudo apt install ffmpeg - - # macOS (using Homebrew): - brew install ffmpeg - - # Windows: - # 1. Download the latest static build from https://ffmpeg.org/download.html - # 2. Extract the archive (e.g., to C:\\FFmpeg). - # 3. Add the 'bin' directory (e.g., C:\\FFmpeg\\bin) to your system's PATH environment variable. - - After installation, please restart the application. - """ - logger.error(error) - raise FileNotFoundError(error) - - async def restart_ffmpeg(self): - """Restart the FFmpeg process after failure.""" - logger.warning("Restarting FFmpeg process...") - - if self.ffmpeg_process: - try: - # we check if process is still running - if self.ffmpeg_process.poll() is None: - logger.info("Terminating existing FFmpeg process") - self.ffmpeg_process.stdin.close() - self.ffmpeg_process.terminate() - - # wait for termination with timeout - try: - await asyncio.wait_for( - asyncio.get_event_loop().run_in_executor(None, self.ffmpeg_process.wait), - timeout=5.0 - ) - except asyncio.TimeoutError: - logger.warning("FFmpeg process did not terminate, killing forcefully") - self.ffmpeg_process.kill() - await asyncio.get_event_loop().run_in_executor(None, self.ffmpeg_process.wait) - except Exception as e: - logger.error(f"Error during FFmpeg process termination: {e}") - logger.error(traceback.format_exc()) - - # we start new process - try: - logger.info("Starting new FFmpeg process") - self.ffmpeg_process = self.start_ffmpeg_decoder() - self.pcm_buffer = bytearray() - self.last_ffmpeg_activity = time() - logger.info("FFmpeg process restarted successfully") - except Exception as e: - logger.error(f"Failed to restart FFmpeg process: {e}") - logger.error(traceback.format_exc()) - # try again after 5s - await asyncio.sleep(5) - try: - self.ffmpeg_process = self.start_ffmpeg_decoder() - self.pcm_buffer = bytearray() - self.last_ffmpeg_activity = time() - logger.info("FFmpeg process restarted successfully on second attempt") - except Exception as e2: - logger.critical(f"Failed to restart FFmpeg process on second attempt: {e2}") - logger.critical(traceback.format_exc()) - async def update_transcription(self, new_tokens, buffer, end_buffer, full_transcription, sep): """Thread-safe update of transcription with new data.""" async with self.lock: @@ -223,23 +158,35 @@ class AudioProcessor: async def ffmpeg_stdout_reader(self): """Read audio data from FFmpeg stdout and process it.""" - loop = asyncio.get_event_loop() beg = time() while True: try: + # Check if FFmpeg is running + state = await self.ffmpeg_manager.get_state() + if state == FFmpegState.FAILED: + logger.error("FFmpeg is in FAILED state, cannot read data") + break + elif state != FFmpegState.RUNNING: + logger.warning(f"FFmpeg is in {state} state, waiting...") + await asyncio.sleep(0.5) + continue + current_time = time() elapsed_time = math.floor((current_time - beg) * 10) / 10 buffer_size = max(int(32000 * elapsed_time), 4096) beg = current_time - chunk = await loop.run_in_executor(None, self.ffmpeg_process.stdout.read, buffer_size) - if chunk: - self.last_ffmpeg_activity = time() + chunk = await self.ffmpeg_manager.read_data(buffer_size) if not chunk: - logger.info("FFmpeg stdout closed, no more data to read.") - break + if self.is_stopping: + logger.info("FFmpeg stdout closed, stopping.") + break + else: + # No data available, but not stopping - FFmpeg might be restarting + await asyncio.sleep(0.1) + continue self.pcm_buffer.extend(chunk) @@ -272,7 +219,12 @@ class AudioProcessor: except Exception as e: logger.warning(f"Exception in ffmpeg_stdout_reader: {e}") logger.warning(f"Traceback: {traceback.format_exc()}") - break + # Try to recover by waiting a bit + await asyncio.sleep(1) + + # Check if we should exit + if self.is_stopping: + break logger.info("FFmpeg stdout processing finished. Signaling downstream processors.") if self.args.transcription and self.transcription_queue: @@ -393,6 +345,21 @@ class AudioProcessor: """Format processing results for output.""" while True: try: + ffmpeg_state = await self.ffmpeg_manager.get_state() + if ffmpeg_state == FFmpegState.FAILED and self._ffmpeg_error: + yield { + "status": "error", + "error": f"FFmpeg error: {self._ffmpeg_error}", + "lines": [], + "buffer_transcription": "", + "buffer_diarization": "", + "remaining_time_transcription": 0, + "remaining_time_diarization": 0 + } + self._ffmpeg_error = None + await asyncio.sleep(1) + continue + # Get current state state = await self.get_current_state() tokens = state["tokens"] @@ -509,6 +476,21 @@ class AudioProcessor: self.all_tasks_for_cleanup = [] processing_tasks_for_watchdog = [] + success = await self.ffmpeg_manager.start() + if not success: + logger.error("Failed to start FFmpeg manager") + async def error_generator(): + yield { + "status": "error", + "error": "FFmpeg failed to start. Please check that FFmpeg is installed.", + "lines": [], + "buffer_transcription": "", + "buffer_diarization": "", + "remaining_time_transcription": 0, + "remaining_time_diarization": 0 + } + return error_generator() + if self.args.transcription and self.online: self.transcription_task = asyncio.create_task(self.transcription_processor()) self.all_tasks_for_cleanup.append(self.transcription_task) @@ -534,8 +516,7 @@ class AudioProcessor: while True: try: await asyncio.sleep(10) - current_time = time() - + for i, task in enumerate(tasks_to_monitor): if task.done(): exc = task.exception() @@ -545,12 +526,15 @@ class AudioProcessor: else: logger.info(f"{task_name} completed normally.") - ffmpeg_idle_time = current_time - self.last_ffmpeg_activity - if ffmpeg_idle_time > 10: - logger.warning(f"FFmpeg idle for {ffmpeg_idle_time:.2f}s - may need attention.") - if ffmpeg_idle_time > 15 and not self.is_stopping: - logger.error("FFmpeg idle for too long and not in stopping phase, forcing restart.") - await self.restart_ffmpeg() + # Check FFmpeg status through the manager + ffmpeg_state = await self.ffmpeg_manager.get_state() + if ffmpeg_state == FFmpegState.FAILED: + logger.error("FFmpeg is in FAILED state, notifying results formatter") + # FFmpeg manager will handle its own recovery + elif ffmpeg_state == FFmpegState.STOPPED and not self.is_stopping: + logger.warning("FFmpeg unexpectedly stopped, attempting restart") + await self.ffmpeg_manager.restart() + except asyncio.CancelledError: logger.info("Watchdog task cancelled.") break @@ -559,7 +543,7 @@ class AudioProcessor: async def cleanup(self): """Clean up resources when processing is complete.""" - logger.info("Starting cleanup of AudioProcessor resources.") + logger.info("Starting cleanup of AudioProcessor resources.") for task in self.all_tasks_for_cleanup: if task and not task.done(): task.cancel() @@ -568,26 +552,8 @@ class AudioProcessor: if created_tasks: await asyncio.gather(*created_tasks, return_exceptions=True) logger.info("All processing tasks cancelled or finished.") - - if self.ffmpeg_process: - if self.ffmpeg_process.stdin and not self.ffmpeg_process.stdin.closed: - try: - self.ffmpeg_process.stdin.close() - except Exception as e: - logger.warning(f"Error closing ffmpeg stdin during cleanup: {e}") - - # Wait for ffmpeg process to terminate - if self.ffmpeg_process.poll() is None: # Check if process is still running - logger.info("Waiting for FFmpeg process to terminate...") - try: - # Run wait in executor to avoid blocking async loop - await asyncio.get_event_loop().run_in_executor(None, self.ffmpeg_process.wait, 5.0) # 5s timeout - except Exception as e: # subprocess.TimeoutExpired is not directly caught by asyncio.wait_for with run_in_executor - logger.warning(f"FFmpeg did not terminate gracefully, killing. Error: {e}") - self.ffmpeg_process.kill() - await asyncio.get_event_loop().run_in_executor(None, self.ffmpeg_process.wait) # Wait for kill - logger.info("FFmpeg process terminated.") - + await self.ffmpeg_manager.stop() + logger.info("FFmpeg manager stopped.") if self.args.diarization and hasattr(self, 'diarization') and hasattr(self.diarization, 'close'): self.diarization.close() logger.info("AudioProcessor cleanup complete.") @@ -598,53 +564,18 @@ class AudioProcessor: if not message: logger.info("Empty audio message received, initiating stop sequence.") self.is_stopping = True - if self.ffmpeg_process and self.ffmpeg_process.stdin and not self.ffmpeg_process.stdin.closed: - try: - self.ffmpeg_process.stdin.close() - logger.info("FFmpeg stdin closed due to stop signal.") - except Exception as e: - logger.warning(f"Error closing ffmpeg stdin on stop: {e}") + # Signal FFmpeg manager to stop accepting data + await self.ffmpeg_manager.stop() return - if self.is_stopping or (self.ffmpeg_process and self.ffmpeg_process.stdin and self.ffmpeg_process.stdin.closed): - logger.warning("AudioProcessor is stopping or stdin is closed. Ignoring incoming audio.") + if self.is_stopping: + logger.warning("AudioProcessor is stopping. Ignoring incoming audio.") return - current_time = time() - if not hasattr(self, '_last_heartbeat') or current_time - getattr(self, '_last_heartbeat', 0) >= 10: - logger.debug(f"Processing audio chunk, last FFmpeg activity: {current_time - self.last_ffmpeg_activity:.2f}s ago") - self._last_heartbeat = current_time - - for attempt in range(3): - try: - if not self.ffmpeg_process or self.ffmpeg_process.poll() is not None: - logger.warning("FFmpeg process not available or has terminated. Restarting...") - await self.restart_ffmpeg() - - loop = asyncio.get_running_loop() - - await asyncio.wait_for( - loop.run_in_executor(None, self.ffmpeg_process.stdin.write, message), - timeout=20.0 - ) - await asyncio.wait_for( - loop.run_in_executor(None, self.ffmpeg_process.stdin.flush), - timeout=20.0 - ) - - self.last_ffmpeg_activity = time() - return - - except asyncio.TimeoutError as e: - logger.warning(f"FFmpeg operation timed out: {e}. Attempt {attempt + 1}/3.") - if attempt < 2: - await self.restart_ffmpeg() - else: - logger.error("FFmpeg operations failed after multiple retries due to timeouts.") - except (BrokenPipeError, AttributeError, OSError) as e: - logger.warning(f"Error writing to FFmpeg: {e}. Attempt {attempt + 1}/3.") - if attempt < 2: - await self.restart_ffmpeg() - await asyncio.sleep(0.5) - else: - logger.error("Maximum retries reached for FFmpeg write. Giving up on this chunk.") \ No newline at end of file + success = await self.ffmpeg_manager.write_data(message) + if not success: + ffmpeg_state = await self.ffmpeg_manager.get_state() + if ffmpeg_state == FFmpegState.FAILED: + logger.error("FFmpeg is in FAILED state, cannot process audio") + else: + logger.warning("Failed to write audio data to FFmpeg") diff --git a/whisperlivekit/ffmpeg_manager.py b/whisperlivekit/ffmpeg_manager.py new file mode 100644 index 0000000..0a3552b --- /dev/null +++ b/whisperlivekit/ffmpeg_manager.py @@ -0,0 +1,467 @@ +import asyncio +import ffmpeg +import logging +import time +import traceback +from typing import Optional, Callable +from concurrent.futures import ThreadPoolExecutor +from enum import Enum + +logger = logging.getLogger(__name__) + +class FFmpegState(Enum): + STOPPED = "stopped" + STARTING = "starting" + RUNNING = "running" + RESTARTING = "restarting" + FAILED = "failed" + +class CircuitBreaker: + """to prevent endless restart loops.""" + def __init__(self, failure_threshold: int = 5, recovery_timeout: float = 60.0): + self.failure_threshold = failure_threshold + self.recovery_timeout = recovery_timeout + self.failure_count = 0 + self.last_failure_time = 0 + self.is_open = False + + def record_success(self): + self.failure_count = 0 + self.is_open = False + + def record_failure(self): + self.failure_count += 1 + self.last_failure_time = time.time() + + if self.failure_count >= self.failure_threshold: + self.is_open = True + logger.error(f"Circuit breaker opened after {self.failure_count} failures") + + def can_attempt(self) -> bool: + if not self.is_open: + return True + + if time.time() - self.last_failure_time > self.recovery_timeout: + logger.info("Circuit breaker recovery timeout reached, attempting reset") + self.is_open = False + self.failure_count = 0 + return True + + return False + +class FFmpegManager: + + def __init__(self, sample_rate: int = 16000, channels: int = 1, + max_retries: int = 3, restart_delay: float = 1.0): + self.sample_rate = sample_rate + self.channels = channels + self.max_retries = max_retries + self.restart_delay = restart_delay + + self.process: Optional[object] = None + self.state = FFmpegState.STOPPED + self.state_lock = asyncio.Lock() + + self.circuit_breaker = CircuitBreaker() + self.executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix="ffmpeg") + + self._write_queue = asyncio.Queue(maxsize=100) + self._write_task: Optional[asyncio.Task] = None + self._monitor_task: Optional[asyncio.Task] = None + + self.last_activity = time.time() + self.on_data_callback: Optional[Callable] = None + self.on_error_callback: Optional[Callable] = None + + self._restart_lock = asyncio.Lock() + self._restart_in_progress = False + + async def start(self): + async with self.state_lock: + if self.state != FFmpegState.STOPPED: + logger.warning(f"Cannot start FFmpeg, current state: {self.state}") + return False + + self.state = FFmpegState.STARTING + + try: + if not self.circuit_breaker.can_attempt(): + logger.error("CB is open, cannot start FFmpeg") + async with self.state_lock: + self.state = FFmpegState.FAILED + return False + + # Start FFmpeg process + success = await self._start_process() + if not success: + self.circuit_breaker.record_failure() + async with self.state_lock: + self.state = FFmpegState.FAILED + return False + + self.circuit_breaker.record_success() + + # Start background tasks + self._write_task = asyncio.create_task(self._write_worker()) + self._monitor_task = asyncio.create_task(self._monitor_health()) + + async with self.state_lock: + self.state = FFmpegState.RUNNING + + logger.info("FFmpeg manager started ok") + return True + + except Exception as e: + logger.error(f"Failed to start FFmpeg manager: {e}") + logger.error(traceback.format_exc()) + self.circuit_breaker.record_failure() + async with self.state_lock: + self.state = FFmpegState.FAILED + return False + + async def stop(self): + logger.info("Stopping FFmpeg manager") + + async with self.state_lock: + if self.state == FFmpegState.STOPPED: + return + self.state = FFmpegState.STOPPED + + # Cancel background tasks + if self._write_task and not self._write_task.done(): + self._write_task.cancel() + try: + await self._write_task + except asyncio.CancelledError: + pass + + if self._monitor_task and not self._monitor_task.done(): + self._monitor_task.cancel() + try: + await self._monitor_task + except asyncio.CancelledError: + pass + + await self._stop_process() + while not self._write_queue.empty(): + try: + self._write_queue.get_nowait() + except asyncio.QueueEmpty: + break + + logger.info("FFmpeg manager stopped") + + async def write_data(self, data: bytes) -> bool: + current_state = await self.get_state() + if current_state != FFmpegState.RUNNING: + logger.warning(f"Cannot write data, FFmpeg state: {current_state}") + return False + + try: + # Use nowait to avoid blocking + self._write_queue.put_nowait(data) + return True + except asyncio.QueueFull: + logger.warning("FFmpeg write queue is full, dropping data") + if self.on_error_callback: + await self.on_error_callback("write_queue_full") + return False + + async def read_data(self, size: int) -> Optional[bytes]: + """Read data from FFmpeg stdout (non-blocking).""" + if not self.process or self.process.poll() is not None: + return None + + try: + loop = asyncio.get_event_loop() + data = await asyncio.wait_for( + loop.run_in_executor(self.executor, self.process.stdout.read, size), + timeout=5.0 + ) + + if data: + self.last_activity = time.time() + + return data + + except asyncio.TimeoutError: + logger.warning("FFmpeg read timeout") + return None + except Exception as e: + logger.error(f"Error reading from FFmpeg: {e}") + return None + + async def get_state(self) -> FFmpegState: + """Get the current FFmpeg state.""" + async with self.state_lock: + return self.state + + async def restart(self, is_external_kill: bool = False) -> bool: + """Restart the FFmpeg process.""" + async with self._restart_lock: + if self._restart_in_progress: + logger.info("Restart already in progress, skipping") + return False + + self._restart_in_progress = True + + try: + logger.warning("Restarting FFmpeg process") + + async with self.state_lock: + if self.state == FFmpegState.RESTARTING: + logger.warning("FFmpeg is already restarting") + return False + + prev_state = self.state + self.state = FFmpegState.RESTARTING + + if is_external_kill: + logger.warning("External kill detected, resetting circuit breaker. (Check the exit code) ") + self.circuit_breaker.failure_count = 0 + self.circuit_breaker.is_open = False + + logger.warning("Clearing write queue") + while not self._write_queue.empty(): + try: + self._write_queue.get_nowait() + except asyncio.QueueEmpty: + break + + await self._stop_process() + await asyncio.sleep(self.restart_delay) + for attempt in range(self.max_retries): + if not self.circuit_breaker.can_attempt(): + logger.error("Circuit breaker is open, cannot restart") + async with self.state_lock: + self.state = FFmpegState.FAILED + return False + + success = await self._start_process() + if success: + self.circuit_breaker.record_success() + async with self.state_lock: + self.state = FFmpegState.RUNNING + logger.info("FFmpeg restarted successfully") + return True + + self.circuit_breaker.record_failure() + + if attempt < self.max_retries - 1: + delay = self.restart_delay * (2 ** attempt) # test + logger.warning(f"Restart attempt {attempt + 1} failed, waiting {delay}s") + await asyncio.sleep(delay) + + logger.error("Failed to restart FFmpeg after all attempts") + async with self.state_lock: + self.state = FFmpegState.FAILED + + if self.on_error_callback: + await self.on_error_callback("restart_failed") + + return False + + finally: + async with self._restart_lock: + self._restart_in_progress = False + + async def _start_process(self) -> bool: + """Start the FFmpeg process.""" + try: + self.process = ( + ffmpeg.input("pipe:0", format="webm") + .output("pipe:1", format="s16le", acodec="pcm_s16le", + ac=self.channels, ar=str(self.sample_rate)) + .run_async(pipe_stdin=True, pipe_stdout=True, pipe_stderr=True) + ) + + await asyncio.sleep(0.1) + if self.process.poll() is not None: + logger.error("FFmpeg process died immediately after starting") + return False + + self.last_activity = time.time() + logger.info("FFmpeg process started successfully") + return True + + except FileNotFoundError: + error = """ + FFmpeg is not installed or not found in your system's PATH. + Please install FFmpeg to enable audio processing. + + Installation instructions: + + # Ubuntu/Debian: + sudo apt update && sudo apt install ffmpeg + + # macOS (using Homebrew): + brew install ffmpeg + + # Windows: + # 1. Download the latest static build from https://ffmpeg.org/download.html + # 2. Extract the archive (e.g., to C:\\FFmpeg). + # 3. Add the 'bin' directory (e.g., C:\\FFmpeg\\bin) to your system's PATH environment variable. + + After installation, please restart the application. + """ + logger.error(error) + return False + except Exception as e: + logger.error(f"Failed to start FFmpeg process: {e}") + logger.error(traceback.format_exc()) + return False + + async def _stop_process(self): + """Stop the FFmpeg process gracefully.""" + if not self.process: + return + + try: + # Close stdin + if self.process.stdin and not self.process.stdin.closed: + loop = asyncio.get_event_loop() + await loop.run_in_executor(self.executor, self.process.stdin.close) + + # Wait for process to terminate + if self.process.poll() is None: + try: + await asyncio.wait_for( + asyncio.get_event_loop().run_in_executor( + self.executor, self.process.wait + ), + timeout=5.0 + ) + except asyncio.TimeoutError: + logger.warning("FFmpeg did not terminate gracefully, killing") + self.process.kill() + await asyncio.get_event_loop().run_in_executor( + self.executor, self.process.wait + ) + + logger.info("FFmpeg process stopped") + + except Exception as e: + logger.error(f"Error stopping FFmpeg process: {e}") + logger.error(traceback.format_exc()) + finally: + self.process = None + + async def _write_worker(self): + """Background worker to write data to FFmpeg.""" + logger.info("FFmpeg write worker started") + consecutive_failures = 0 + + while True: + try: + # Get data from queue + data = await self._write_queue.get() + + # Check if we're in a restart state + current_state = await self.get_state() + if current_state == FFmpegState.RESTARTING: + # Drop data during restart + continue + + if not self.process or self.process.poll() is not None: + consecutive_failures += 1 + + # If we have multiple consecutive failures, it might be an external kill + if consecutive_failures > 3: + logger.warning("Multiple write failures detected, possible external kill") + if self.on_error_callback: + await self.on_error_callback("process_not_available") + + # Let the health monitor handle the restart + await asyncio.sleep(0.5) + continue + + # Write data in executor to avoid blocking + loop = asyncio.get_event_loop() + try: + await asyncio.wait_for( + loop.run_in_executor( + self.executor, + self._write_to_process, + data + ), + timeout=5.0 + ) + self.last_activity = time.time() + consecutive_failures = 0 # Reset on success + + except asyncio.TimeoutError: + logger.error("FFmpeg write timeout") + if self.on_error_callback: + await self.on_error_callback("write_timeout") + # Trigger restart only if not already restarting + if await self.get_state() != FFmpegState.RESTARTING: + asyncio.create_task(self.restart()) + + except Exception as e: + logger.error(f"Error writing to FFmpeg: {e}") + if self.on_error_callback: + await self.on_error_callback("write_error") + # Trigger restart only if not already restarting + if await self.get_state() != FFmpegState.RESTARTING: + asyncio.create_task(self.restart()) + + except asyncio.CancelledError: + logger.info("FFmpeg write worker cancelled") + break + except Exception as e: + logger.error(f"Unexpected error in write worker: {e}") + logger.error(traceback.format_exc()) + await asyncio.sleep(1) + + def _write_to_process(self, data: bytes): + if self.process and self.process.stdin and not self.process.stdin.closed: + self.process.stdin.write(data) + self.process.stdin.flush() + + async def _monitor_health(self): + logger.info("FFmpeg health monitor started") + last_known_pid = None + + while True: + try: + await asyncio.sleep(5) + + current_state = await self.get_state() + if current_state not in [FFmpegState.RUNNING, FFmpegState.RESTARTING]: + continue + + if not self.process or self.process.poll() is not None: + if current_state != FFmpegState.RESTARTING: + logger.error(f"FFmpeg process died unexpectedly with exit code: {self.process.poll()}") + is_external_kill = last_known_pid is not None + + if is_external_kill: + logger.info("Detected possible external kill (e.g., pkill)") + + if self.on_error_callback: + await self.on_error_callback("process_died") + await self.restart(is_external_kill=is_external_kill) + continue + + if self.process and hasattr(self.process, 'pid'): + last_known_pid = self.process.pid + idle_time = time.time() - self.last_activity + if idle_time > 30: + logger.warning(f"FFmpeg idle for {idle_time:.1f}s") + if idle_time > 60 and current_state != FFmpegState.RESTARTING: + logger.error("FFmpeg idle timeout, restarting") + if self.on_error_callback: + await self.on_error_callback("idle_timeout") + await self.restart() + + except asyncio.CancelledError: + logger.info("FFmpeg health monitor cancelled") + break + except Exception as e: + logger.error(f"Error in health monitor: {e}") + logger.error(traceback.format_exc()) + await asyncio.sleep(5) + + def __del__(self): + """Cleanup resources.""" + self.executor.shutdown(wait=False)